今天选用的样例是最简单的【NetworkWordCount】,原始代码地址:https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
代码内容如下:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
// scalastyle:off println package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
这段代码的总思路是:
启动一个StreamingContext,然后通过socketTextStream来监听每1秒,从对应从socket发来的数据。再使用flatMap对数据做一次split,再对每个单词做reduce,再打印。
今天是想通过一个简单样例来窥视内部,并教你怎么读源码。
第一步:初始化Spark Streaming上下文
// Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
1.创建SparkConf
SparkConf 是配置管理类,可以配置各种文件的加载(这部分,今天不详细说,等在RDD里说详细说),常规的,我们配置的 spark-defaults.conf,spark-env.sh,再有是通过类似
sparkConf.setAppName("NetworkWordCount")
sparkConf.set("spark.app.name", "NetworkWordCount")
他们是从后往前覆盖的。
2.创建 StreamingContext ,及Spark Streaming应用程序上下文
两个参数,一个是配置实例,一个是间隔1秒。
要详细看的是 new StreamingContext () 这个里面做了什么事情。
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
其中:StreamingContext.createNewSparkContext(conf) 是做什么呢?
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
直接创建一个SparkContext,parkContext是spark cores程序运行的上下文环境,其实spark程序的运行都是建立在spark cores的基础上的。
思考:
竟然是创建 SparkContext, 当我们在使用时,如果已经有一个 SparkContext 会出现什么问题?
还可以创建吗?因为一个jvm中只能有一个SparkContext。
如果不可以,那是否可以通过SparkContext来创建StreamingContext呢?
以上操作完之后,然后在StreamingContext中会初始化一些必要的变量和执行一些初始化的操作:
//检查SparkContext and checkpoint 是否为空,目的:至少要能创建SparkContext
//因为通过checkpoint 也可以创建出SparkContext
require(_sc != null || _cp != null,
"Spark Streaming cannot be initialized with both SparkContext and checkpoint as null")
//判断当前的Spark Streaming程序是否checkpoint
private[streaming] val isCheckpointPresent: Boolean = _cp != null
//sc的赋值,如没有配置checkpoint,则直接返回前面创建的
//SparkContext,若配置了checkpoint,则需从当前应用中读取properties信息,
//生成新的sparkconf,并根据这个sparkconf创建一个新的SparkContext
private[streaming] val sc: SparkContext = {
if (_sc != null) {
_sc
} else if (isCheckpointPresent) {
SparkContext.getOrCreate(_cp.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
}
//若当前程序运行在本地,则必须为其至少分配两个core,
//因为其中肯定需要有一个core运行reciever线程
//这里也是为什么有些同学在本地运行的时候,为什么没反应
//你应该这么写:val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") {
logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" +
" to get data, otherwise Spark jobs will not get resources to process the received data.")
}
//配置内部使用的conf
private[streaming] val conf = sc.conf
//配置内部使用的env
private[streaming] val env = sc.env
//DStreamGraph功能类似于Spark Core的DAGScheduler,负责上层pipeline的生成
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
_cp.graph.setContext(this)
//从checkpoint目录中恢复数据,其中主要是恢复outputStreams的处理信息
//即读取checkpoint的信息,并为其及其dependencies生成ReliableCheckpointRDD
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
//输入流的id,一个Spark Streaming程序中可能有多个输入流
private val nextInputStreamId = new AtomicInteger(0)
//checkpointDir即checkpoint的目录,
//如果该程序运行在集群上则必须是一个HDFS的路径
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(_cp.checkpointDir)
_cp.checkpointDir
} else {
null
}
}
//执行checkpoint的时间间隔,取初化时,对应的值
private[streaming] val checkpointDuration: Duration = {
if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
}
//负责生成job
private[streaming] val scheduler = new JobScheduler(this)
//主要构建一个线程的同步锁对象,在执行停止时会被调用
//对应初始化部分的ContextWaiter,其主要作用:
//等待执行停止,执行期间发生的任何异常将被扔在这个线程。
private[streaming] val waiter = new ContextWaiter
//这个变量功能就是监听Spark Streaming程序的运行,其内部的变量有
/*
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
*/
//这个类是继承SparkListener,那么他也是Spark系统消息系统的一部分
//现在基本上所有的分布式系统都是通过消息驱动的方式来进行并发执行的
private[streaming] val progressListener = new StreamingJobProgressListener(this)
//这个变量主要是为程序生成ui界面
private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}
//这个变量主要是为我们的监控系统Ganglia来对该程序进行监控的,
//它属于spark测量系统的一部分,可以获取当前程序运行基本信息,
//如receivers,totalCompletedBatches,totalReceivedRecords,runningBatches,lastCompletedBatch_submissionTime等信息。
/* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)
//当前Spark Streaming应用的状态,包括:INITIALIZED,ACTIVE,STOPPED三种
private var state: StreamingContextState = INITIALIZED
//用户代码中调用spark接口的堆栈信息方法中
//其具体功能实现在Utils.scala的getCallSite
//功能描述:获取当前SparkContext的当前调用堆栈,
//将栈里最靠近栈底的属于spark或者Scala核心的类压入callStack的栈顶,
//并将该类的方法存入lastSparkMethod;
//将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,
//类名存入firstUserFile,最终返回的样例类CallSite存储了最短栈和长度默认为20的最长栈的样例类。
//在JavaWordCount例子中,获得的数据如下:
//最短栈:JavaSparkContext at JavaWordCount.java:44;
//最长栈:org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
//org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:44)。
//这个变量其实主要是给开发者看的,对于具体程序的运行没有必然的影响。
//当然它的信息也会反应在Spark ui的界面上的
private val startSite = new AtomicReference[CallSite](null)
// Copy of thread-local properties from SparkContext.
// These properties will be set in all tasks
// submitted by this StreamingContext after start.
// 从SparkContext中复制一份参数配置到本地线程,这些参数将会设置到所有的tasks
// 在启动后由此StreamingContext提交。
private[streaming] val savedProperties = new AtomicReference[Properties](new Properties)
//获取当前的状态
private[streaming] def getStartSite(): CallSite = startSite.get()
//当应用程序停止时会调用该对象,不管是正常退出或异常退出,
//stopOnShutdown()方法都会被回调,然后调用stop方法。
//stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,
//生产环境需要配置为true.
private var shutdownHookRef: AnyRef = _
conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
第二步 Spark Streaming应用的业务逻辑(Lazy级别)
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
这里是构建一个InputDStream,对应先看一下SparkStreamingContext中该方法的实现:
/** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @see [[socketStream]] */
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
接着是调用socketStream方法
/** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param converter Function to convert the byte stream to objects * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
接着是SocketInputDStream方法
private[streaming]
class SocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
这里看SocketInputDStream的继承类ReceiverInputDStream,而ReceiverInputDStream又继承自InputDStream
/** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which are called by Spark Streaming system to start and stop * receiving data, respectively. * Input streams that can generate RDDs from new data by running a service/thread only on * the driver node (that is, without running a receiver on worker nodes), can be * implemented by directly inheriting this InputDStream. For example, * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for * new files and generates RDDs with the new files. For implementing input streams * that requires running a receiver on the worker nodes, use * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class. * * @param _ssc Streaming context that will execute this input stream */
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
ssc.graph.addInputStream(this)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
// Keep track of the freshest rate for this stream using the rateEstimator protected[streaming] val rateController: Option[RateController] = None
/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
// e.g. FlumePollingDStream -> "Flume polling stream" val newName = Utils.getFormattedClassName(this)
.replaceAll("InputDStream", "Stream")
.split("(?=[A-Z])")
.filter(_.nonEmpty)
.mkString(" ")
.toLowerCase
.capitalize
s"$newName [$id]"
}
其中最重要的是ssc.graph.addInputStream(this),把SocketInputDStream添加到ssc中的DStreamGraph中,而在Spark Streaming 中的DStreamGraph功能类似于Spark Core中的DAGScheduler,DStreamGraph是Spark Streaming的程序中job的high level 级别的实现。
所以这一段代码具体实现了三个重要的功能:
(1)提供一个InputDStream,用来处理receiver接收到的数据
(2)提供一个获取receiver的方法
(3)将该DStream添加到DStreamGragh中
有了以上的功能之后,再就是真正的业务处理逻辑:
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
这里每进行一个操作就会生成一个新的DSream,最终这段代码会生成一个DStream链,在这里你可以简单的认为最后的print操作是action级别的,其余的都是transformation级别。
注意:DStream的操作和RDD是一样的(只是DStream没有RDD实现的算法多),而唯一的区别就是RDD是处理固定的数据(不变的数据),而DSream处理的是固定时间内生成的数据(固定时间段内不变的数据)。阅读过源码就会发现其实Spark Streaming程序和Spark Core均是处理的RDD,所以DStream可以直接理解成RDD的模版,只是每次任务调用时才真正的实例化RDD,但是最终的操作还是对RDD的操作。
下面看一下print这个方法的具体实现:
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println }
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
最后调用的是foreachRDD
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
上面的代码最后是调用 register()
/** * Register this streaming as an output stream. This would ensure that RDDs of this * DStream will be generated. */
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
把这个DStream当作OutputStream添加到DStreamGragh中,然后在程序启动后根据DStreamGragh来生成业务的具体实现。其过程就是先反向生成业务计算的pipeline,然后在真正计算的时候在根据pipeline正向进行(方式和RDD一样)。
代码执行到这,整个Spark Streaming应用根本不会执行任何操作,因为上面的那些操作都是lazy级别的,只有在程序启动后才会去真正执行。
第三步 Spark Streaming应用的启动
ssc.start()
ssc.awaitTermination()
执行start方法启动执行,关于这个执行过程,它还有很多的步骤,这个下一个文章中具体谈。
ssc.awaitTermination()方法,觉得挺有用的,然后就说一下:
/** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */
def awaitTermination() {
waiter.waitForStopOrError()
}
再调用 waitForStopOrError,其中这个waiter就是在SparkStreamingContext初始化的时候来赋值 private[streaming] val waiter = new ContextWaiter
/** * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or * `false` if the waiting time detectably elapsed before return from the method. */
def waitForStopOrError(timeout: Long = -1): Boolean = {
lock.lock()
try {
if (timeout < 0) {
while (!stopped && error == null) {
condition.await()
}
} else {
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
while (!stopped && error == null && nanos > 0) {
nanos = condition.awaitNanos(nanos)
}
}
// If already had error, then throw it if (error != null) throw error
// already stopped or timeout stopped
} finally {
lock.unlock()
}
}
用来等待Spark Streaming程序正常或者异常退出。这里它有两个方法,用来解这个锁:
def notifyError(e: Throwable): Unit = {
lock.lock()
try {
error = e
condition.signalAll()
} finally {
lock.unlock()
}
}
def notifyStop(): Unit = {
lock.lock()
try {
stopped = true
condition.signalAll()
} finally {
lock.unlock()
}
}
这里主要感觉是以后我们开发Spark程序时,如果有需要的话可以直接使用。
今天先到这里,一个程序的基本过程已经讲的非常清楚,接下来讲它这个start方法是怎么来真正驱动spark Streaming应用进行的。