目录
天小天:(一)Spark Streaming 算子梳理 — 简单介绍streaming运行逻辑
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
前言
本文主要讲解foreachRDD算子的实现,关于最佳使用方式在这里不会讲到,如果要了解可以查阅相关资料。
看例子
首先看一个简单地foreachRDD
的例子
package streaming
import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/** * @date 2019/01/21 */
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()
val ssc = new StreamingContext(sparkConf, Seconds(2))
// consume from rddQueue val lines = ssc.queueStream(rddQueue)
// foreachRDD lines.foreachRDD(rdd => {
val values = rdd.take(10)
for (value <- values) println("foreachRDD == " + value)
})
ssc.start()
// produce to rddQueue for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}
def iteratorAdd(input: Iterator[Int]) : Iterator[String] = {
val output = ListBuffer[String]()
for (t <- input){
output += t.toString + " map"
}
output.iterator
}
}
这个例子中foreachRDD
的作用是从每个批次的RDD
中取出前10个元素,并打印出来。
从这里我们可以看出来,foreachRDD
的作用是对每个批次的RDD做自定义操作。并且从这个的位置我们也可以看出,这个一个action算子。
源码实现
我们知道了foreachRDD
的作用,接下来我们详细看下是如何实现这个算子的。
调用方法
首先我们看下用户调用的方法可以有哪些。
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean foreachRDD(foreachFunc, displayInnerRDDOps = true)
}
这两个foreachRDD
就是用户可以调用的两个方法。他们两个的差别起始只有入参foreachFunc
的入参是不同的。下面的多了一个Time。这个Time代表批次时间,具体用法在后面会说。
私有的foreachRDD
我们可以看到上面两个方法都调用了foreachRDD
方法,那么这里的方法是是什么那?我们来看下此方法的代码:
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
首先解释下入参:
foreachFunc
:用户定义的方法,可以包含RDD
和Time
两个参数;displayInnerRDDOps
:foreachFunc中的算子是否显示在UI中,这个和我们的业务逻辑关系不大,可以先不关注
我们可以看到这里最终会实例化ForEachDStream
对象。
到这里,DStream的逻辑已经生成。
ForEachDStream实现
看下每个批次中RDD是如何处理的。由于foreachRDD
所以我们这里只关注generateJob
方法。
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {// 通过批次时间获取RDD case Some(rdd) => // 取到RDD // 构造本批次job要执行的函数 val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time) // 把RDD和批次时间作为入参传给用户定义的方法 }
Some(new Job(time, jobFunc)) //返回Job case None => None // 如果没有取得,则放回None }
}
从代码和注释已经简单地解释了,foreachRDD
是如何工作的。其中有两个地方要说下。
foreachFunc(rdd, time)
这段代码,用户定义的方法入参没有time则用户不能对批次时间做处理,只有定义的方法同时包含rdd和time两个参数用户才能对time做处理。这也是上面调用方法中留下的问题的解释。Some(new Job(time, jobFunc))
这里生成的Job最终会提交到Spark集群上执行。之后的逻辑这里不做详解。
总结
至此foreachRDD
就讲解完了。从上面的代码来看其实这里的逻辑还是不难的。