(五)Spark Streaming 算子梳理 — foreachRDD

目录

天小天:(一)Spark Streaming 算子梳理 — 简单介绍streaming运行逻辑

天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 算子梳理 — transform算子

天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream

天小天:(五)Spark Streaming 算子梳理 — foreachRDD

天小天:(六)Spark Streaming 算子梳理 — glom算子

天小天:(七)Spark Streaming 算子梳理 — repartition算子

天小天:(八)Spark Streaming 算子梳理 — window算子

前言

本文主要讲解foreachRDD算子的实现,关于最佳使用方式在这里不会讲到,如果要了解可以查阅相关资料。

看例子

首先看一个简单地foreachRDD的例子

package streaming

import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**  * @date 2019/01/21  */
object Api {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("api").setMaster("local[2]")
    val rddQueue = new mutable.Queue[RDD[Int]]()

    val ssc = new StreamingContext(sparkConf, Seconds(2))
    // consume from rddQueue     val lines = ssc.queueStream(rddQueue)
    // foreachRDD     lines.foreachRDD(rdd => {
      val values = rdd.take(10)
      for (value <- values) println("foreachRDD == " + value)
    })
    
    ssc.start()

    // produce to rddQueue     for (i <- 1 to 30) {
      rddQueue.synchronized {
        rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
      }
      Thread.sleep(1000)
    }
    ssc.stop()
  }

  def iteratorAdd(input: Iterator[Int]) : Iterator[String] = {
    val output = ListBuffer[String]()
    for (t <- input){
      output += t.toString + " map"
    }
    output.iterator
  }

}

这个例子中foreachRDD的作用是从每个批次的RDD中取出前10个元素,并打印出来。

从这里我们可以看出来,foreachRDD的作用是对每个批次的RDD做自定义操作。并且从这个的位置我们也可以看出,这个一个action算子。

源码实现

我们知道了foreachRDD的作用,接下来我们详细看下是如何实现这个算子的。

调用方法

首先我们看下用户调用的方法可以有哪些。

/**  * Apply a function to each RDD in this DStream. This is an output operator, so  * 'this' DStream will be registered as an output stream and therefore materialized.  */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
    val cleanedF = context.sparkContext.clean(foreachFunc, false)
    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
  }

  /**  * Apply a function to each RDD in this DStream. This is an output operator, so  * 'this' DStream will be registered as an output stream and therefore materialized.  */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because     // DStreams can't be serialized with closures, we can't proactively check     // it for serializability and so we pass the optional false to SparkContext.clean     foreachRDD(foreachFunc, displayInnerRDDOps = true)
  }

这两个foreachRDD就是用户可以调用的两个方法。他们两个的差别起始只有入参foreachFunc的入参是不同的。下面的多了一个Time。这个Time代表批次时间,具体用法在后面会说。

私有的foreachRDD

我们可以看到上面两个方法都调用了foreachRDD方法,那么这里的方法是是什么那?我们来看下此方法的代码:

/**  * Apply a function to each RDD in this DStream. This is an output operator, so  * 'this' DStream will be registered as an output stream and therefore materialized.  * @param foreachFunc foreachRDD function  * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated  * in the `foreachFunc` to be displayed in the UI. If `false`, then  * only the scopes and callsites of `foreachRDD` will override those  * of the RDDs on the display.  */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

首先解释下入参:

  • foreachFunc:用户定义的方法,可以包含RDDTime两个参数;
  • displayInnerRDDOps:foreachFunc中的算子是否显示在UI中,这个和我们的业务逻辑关系不大,可以先不关注

我们可以看到这里最终会实例化ForEachDStream对象。

到这里,DStream的逻辑已经生成。

ForEachDStream实现

看下每个批次中RDD是如何处理的。由于foreachRDD所以我们这里只关注generateJob方法。

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {// 通过批次时间获取RDD       case Some(rdd) => // 取到RDD         // 构造本批次job要执行的函数         val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { 
          foreachFunc(rdd, time) // 把RDD和批次时间作为入参传给用户定义的方法         }
        Some(new Job(time, jobFunc)) //返回Job       case None => None // 如果没有取得,则放回None     }
  }

从代码和注释已经简单地解释了,foreachRDD是如何工作的。其中有两个地方要说下。

  1. foreachFunc(rdd, time) 这段代码,用户定义的方法入参没有time则用户不能对批次时间做处理,只有定义的方法同时包含rdd和time两个参数用户才能对time做处理。这也是上面调用方法中留下的问题的解释。
  2. Some(new Job(time, jobFunc))这里生成的Job最终会提交到Spark集群上执行。之后的逻辑这里不做详解。

总结

至此foreachRDD就讲解完了。从上面的代码来看其实这里的逻辑还是不难的。

    原文作者:天小天
    原文地址: https://zhuanlan.zhihu.com/p/64273163
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞