目录
天小天:(一)Spark Streaming 算子梳理 — 简单介绍streaming运行逻辑
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
前言
本章主要介绍transform算子的实现逻辑和作用。
看例子
首先看下transform算子代码例子。
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/** * @date 2019/01/21 */
object Api {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
val rddQueue = new mutable.Queue[RDD[Int]]()
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 消费RDD队列作为数据源 val lines = ssc.queueStream(rddQueue)
// transform example val transform1 = lines.transform(rdd => {
println("transform1: id: " + rdd.id)
rdd
})
val transform2 = transform1.transform((rdd, time) => {
println("transform2: id: " + rdd.id + " time: " + time)
rdd
})
transform2.print()
ssc.start()
// 向RDD队列生成数据 for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
}
}
以上代码输出如下:
transform1: id: 6
transform2: id: 6 time: 1552061836000 ms
-------------------------------------------
Time: 1552061836000 ms
-------------------------------------------
2 map
20 map
30 map
40 map
3 map
20 map
30 map
40 map
4 map
20 map
...
这段代码主要逻辑是:以Rdd队列为数据源,之后第一个transform算子会打印rdd id,第二个transform算子会打印rdd id和批次时间,最终打印Rdd内容。
从这个例子中我们能够知道,transform算子是对RDD操作,如果有的一些功能是Rdd本身没有提供的,可以通过自定义实现。那么接下来我们看下Rdd是如何实现的。
源码实现
先贴出transform算子的代码源码
/** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean // 清理掉无用的并且不需要序列化的属性,减少不必要的序列化异常。 val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], _: Time) => cleanedF(r))
}
/** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean // 清理掉无用的并且不需要序列化的属性,减少不必要的序列化异常。 val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
}
DStream提供了两个transform算子,上面的transform的func入参只有RDD,下一个的func入参有两个,分别是RDD和Time即批次时间。并且上面的最终调用的也是下面的transform方法。下面的相比上面的除了可以获取到RDD信息,可以对RDD处理外,还可以获取到批次时间,可以依据批次时间做处理。
继续看下面的transform方法,首先是清理对序列化清理操作。接下来是把用户提供的function增加一行逻辑,即判断RDD序列是否只有一个RDD,如果不等于1则报错。其实这里理论上是不会出现RDD序列长度大于1的情况的。最后就是实例化TransformedDStream
。
接下来看下TransformedDStream
的执行逻辑
package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class TransformedDStream[U: ClassTag] (
parents: Seq[DStream[_]], // DStream序列,通过阅读上一级源码得知队列长度为1 transformFunc: (Seq[RDD[_]], Time) => RDD[U] // 用户定义的函数 ) extends DStream[U](parents.head.ssc) {
// 一些检验 require(parents.nonEmpty, "List of DStreams to transform is empty")
require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
override def dependencies: List[DStream[_]] = parents.toList
override def slideDuration: Duration = parents.head.slideDuration
// 此方法为执行逻辑 override def compute(validTime: Time): Option[RDD[U]] = {
// 首先遍历parent序列,由于序列中DStream长度为1,所以只会取出一个DStream。 // 之后通过批次时间从DStream中取出当前批次时间的RDD并返回,如果没有对应RDD则抛出SparkException val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
// Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
}
// 以取出的RDD序列和批次时间作为入参,执行用户定义的函数。 // 这里需要回顾下上一段代码加的RDD序列长度为1的判断就是在这里执行的, // 这个方法主要放置parent序列内有多个DStream,导致RDD序列也有多个,造成的异常。 // 但是从代码整体上来看在Spark代码没有bug的情况下不会出现报错。 val transformedRDD = transformFunc(parentRDDs, validTime)
// 判断返回的RDD是否为空,如果为空则报错 if (transformedRDD == null) {
throw new SparkException("Transform function must not return null. " +
"Return SparkContext.emptyRDD() instead to represent no element " +
"as the result of transformation.")
}
// 将RDD包装在Some中返回 Some(transformedRDD)
}
/** * Wrap a body of code such that the call site and operation scope * information are passed to the RDDs created in this body properly. * This has been overridden to make sure that `displayInnerRDDOps` is always `true`, that is, * the inner scopes and callsites of RDDs generated in `DStream.transform` are always * displayed in the UI. */
override protected[streaming] def createRDDWithLocalProperties[U](
time: Time,
displayInnerRDDOps: Boolean)(body: => U): U = {
super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
}
}
以上就是TransformedDStream
的全部代码,我们只先关注compute
方法,其他的逻辑本章暂不关注。详细的解释看代码注释,可以很好理解执行逻辑。
总结
以上就是transform算子的全部内容,整体上来看逻辑不复杂。核心的逻辑只是调用用户提供的function。
如文章开头所讲,transform主要对RDD功能的增强。可以方便开发者提供获取RDD信息和批次时间信息,并且已经这些信息做一下处理。
至此,关于transform算子的全部内容就结束了。
附上例子的源代码链接:https://github.com/youtNa/all-practice/blob/master/spark-test/src/main/scala/streaming/Api.scala