第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验

本期内容 :
spark streaming另类在线实验
瞬间理解spark streaming本质

踏上Spark学习旅途,目标是要像Spark官方机构那样有能力去定制Spark。
一. 我们最开始将从Spark Streaming入手
为何从Spark Streaming切入Spark定制?Spark的子框架已有若干,为何选择Spark Streaming?让我们细细道来。

  1. Spark最开始只有Spark Core,没有目前的这些子框架。这些子框架是构建于Spark Core之上的。没有哪个子框架能摆脱Spark Core。我们通过对一个框架的彻底研究,肯定可以领会Spark力量的源泉,并精通所有问题的解决之道。
  2. 我们再看看目前的这些子框架。Spark SQL有太多语法,研究这些会太浪费精力。SparkR还没完善。Spark GraphX已无太多可改进之处,图计算相关的数学知识也不是目前重点。Spark MLlib中的机器学习也有太多算法是与数学相关,也不是做改进的好的选择 。所以我们选择了Spark Streaming。

二 . 对Spark Streaming的理解

  1. Spark Streaming是流式计算,当今时代是一个流处理时代,一切数据如果不是流式处理, 或者说和流式处理不相关的话,都是无效的数据。
  2. 流式处理才是我们对大数据的初步印象,而不是批处理和数据挖掘,当然Spark强悍的地方在于,他的流式处理可以在线的直接使用机器学习、图计算、SparkSQL、Spark R的成果。
  3. 整个Spark的程序,基于Spark Streaming的最容易出问题,也是最受关注的地方,也是最需要人才的地方。
  4. Spark Streaming和其他子框架的不同之处,Spark Streaming很像基于Spark Core之上的应用程序。
  5. 寻龙点穴,Spark就是龙脉,Spark Streaming就是穴位

2015年是流式处理的一年。大家考虑用Spark,主要也是因为Spark Streaming。这是一个流处理的时代,一切数据如果与流式处理不相关的话,都是无效的数据。Spark之所以强悍的一个重要原因在于,它的流式处理可以在线使用图计算、机器学习或者SparkR的成果,这得益于Spark一体化、多元化的基础架构设计。也就是在Spark Streaming中可以调用其它子框架,无需任何设置。这是Spark的无可匹敌之处,也是Spark Streaming必将一统天下的根源。但Spark的应用中,Spark Streaming也是最容易出问题的。
Spark Streaming与其它子框架不同之处在于,它更像是Spark Core之上的一个应用程序。所以如果要做Spark的定制开发,Spark Streaming则提供了最好的参考。你想掌握Spark Streaming,但你不去精通Spark Core的话,那是不可能的。所以我们选择Spark Streaming来提升自己,是找到了关键点。

三 .Spark Streaming另类在线实验
我们在研究Spark Streaming的过程中,会有困惑的事情:如何清晰的看到数据的流入、被处理的过程?
使用一个小技巧,通过调节放大Batch Interval的方式,来降低批处理次数,以方便看清楚各个环节。
我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手。
  
  1. 案例代码 :

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
   /**
    *
    * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
    */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称
  //conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
    conf.setMaster("local[6]")
    //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)
    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
        (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))
  //val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
  //(v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))
    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
      _-_, Seconds(60), Seconds(20))
    categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(category, item, click_count)
        })

        val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("item", StringType, true),
          StructField("click_count", IntegerType, true)
        ))

        val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

        categoryItemDF.registerTempTable("categoryItemTable")

        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
                record.getAs("item") + "'," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

            })
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse

          }
        }
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()

  }
 }
}

2. 把程序的Batch Interval设置成300秒:
   1 val ssc = new StreamingContext(conf, Seconds(300))
  3. 重新生成一下jar包 。
  4. 启动Spark的History Server,打开数据发送的端口 : nc -lk 9999
  5. 用spark-submit运行前面生成的jar包 。
  6. 在数据发送端口输入若干数据,比如:
    1375864674543 Tom
    1375864674553 Spy
    1375864674571 Andy
    1375864688436 Cheater
    1375864784240 Kelvin
    1375864853892 Steven
    1375864979347 John
  
  7. 打开浏览器,看History Server的日志信息:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

  
   点击最新的应用,看我们目前运行的应用程序中有些什么Job:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   总共竟然有5个Job。这完全不是我们此前做Spark SQL之类的应用程序时看到的样子。

   我们接下来看一看这些Job的内容,主要揭示一些现象,不会做完全深入的剖析,只是为了先让大家进行一些思考。

 Job 0:此Job不体现我们的业务逻辑代码。这个Job是出于对后面计算的负载均衡的考虑。

   
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   Job 0包含有Stage 0、Stage 1。随便看一个Stage,比如Stage 1。看看其中的Aggregated Metrics by Executor部分:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   发现此Stage在所有Executor上都存在。 

   Job 1:运行时间比较长,耗时1.5分钟。

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   点击Stage 2的链接,进去看看Aggregated Metrics By Executor部分:

   
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

可以知道,Stage 2只在Worker4上的一个Executor执行,而且执行了1.5分钟。
   是否会觉得奇怪:从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢?
  从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   原来Receiver是通过一个Job来启动的。那肯定有一个Action来触发它。

看看Tasks部分: 
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

只有一个Worker运行此Job。是用于接收数据。
   Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。
   看来,Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。
   重要启示:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。

Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中。

   
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

我们看Stag3、Stage4的详情,可以知道这2个Stage都是用4个Executor执行的。所有数据处理是在4台机器上进行的。
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   Stag 5只在Worker4上。这是因为这个Stage有Shuffle操作。

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳过。

    
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   看看Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

   Job4:也体现了我们应用程序中的业务逻辑 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage 10被跳过。

    
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

看看Stage 11的详情。可以看到,数据处理是在Worker4之外的其它3台机器上进行的:

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

    
   综合以上的现象可以知道,Spark Streaming的一个应用中,运行了这么多Job,远不是我们从网络博客或者书籍上看的那么简单。
   我们有必要通过这些现象,反过来回溯去寻根问源。不过这次暂不做深入分析。
   我们的神奇之旅才刚刚开始。

四. 瞬间理解Spark Streaming本质
  

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

  以上的连续4个图,分别对应以下4个段落的描述:

    Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。

    Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。

    Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。

    Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。

在我们前面的实验中,每300秒会产生一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。
  DStream是一个没有边界的集合,没有大小的限制。
  DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。
  锁定到时间段后,就是空间的操作。也就是对本时间段的对应批次的数据的处理。

下面用实例来讲解数据处理过程。

  数据处理会有若干个对DStream的操作,这些操作之间的依赖关系,构成了DStreamGraph。如以下图例所示:   
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

  上图中每个foreach都会触发一个作业,就会顺着依赖从后往前回溯,形成DAG,如下图所示:

  
《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

  空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。

现在再去读官方的Spark Streaming的文档,就好理解多了。

《第1课:通过案例对 spark streaming 透彻理解三板斧之一: spark streaming 另类实验》

  

  看来我们的学习,将从Spark Streaming的现象开始,深入到Spark Core和Spark Streaming的本质。

备注:
资料来源于:DT_大数据梦工厂
更多私密内容,请关注微信公众号:DT_Spark
新浪微博:http://www.weibo.com/ilovepains
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

    原文作者:kaden
    原文地址: https://www.jianshu.com/p/5699b449a772
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞