Spark Streaming学习

以下内容主要基于Spark2.1.0版本的Spark Streaming内容学习得到。

还是先把Maven的依赖加入进去:

《Spark Streaming学习》 https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10

Overview

Spark Streaming是core Spark API的一个扩展,对实时数据流进行scalable, high-throughput, fault-tolerant流处理。数据可以从多个来源得到,比如:Kafka,Flume,Kinesis或者TCP socket,并提供高级别的函数诸如map,reduce,join和window这样复合的算法。最终处理后的数据可以通过文件系统、数据库和实时dashboards输出。事实上还支持Spark的机器学习图形处理算法在数据流上。

《Spark Streaming学习》 接收的数据流来源以及输出的数据结果

内部是按照如下方式处理的。Spark Streaming接收实时输入数据并将数据分成多个batches,然后Spark engine会产生最终的结果batches流。

《Spark Streaming学习》 按照batch处理

Spark Streaming提供一个高级别的抽象概念:discretized streamDStream),来代表一个连续的数据流。DStream既可以从Kafka、Flume和Kinesis输入数据流中创建,也可以从其他DStream上通过高级别的操作产生。在内部DStream是由一连串的RDDs来表示。

A Quick Example

在进一步学习Spark Streaming细节之前,可以先通过一个简单的例子看下大致的处理过程。下面这个例子就是从在TCP socket上监听数据服务器的文本数据并统计word个数。

下面逐步分解Example中的各段语句的作用。

《Spark Streaming学习》 创建一个StringContext,这也是所有Streaming功能的主入口,并设置batch间隔为1s

《Spark Streaming学习》 使用这个context创建一个DStream来表示来自TCP源的数据流。其中arg[0]为输入的入参hostname,在本地运行时为localhost;arg[1]为输入的第二个入参为端口号,比如9999

《Spark Streaming学习》 使用flatMap可以创建另一个DStream出来,目的是把源DStream的每条记录生成新DStream多条记录。其中FlatMapFunction的对象用于实现Transformation
《Spark Streaming学习》 将FlatMapFunction以lambda表达式的方式实现

《Spark Streaming学习》 然后将words这个DStream通过mapToPair和reduceByKey转换为对单词的统计的DStream变量
《Spark Streaming学习》 同样改成lambda表达式的方式实现

《Spark Streaming学习》 前面的代码只是创建了计算过程,但是只有start之后才开始执行。

执行过程需要建立两个终端,其中一个运行Netcat作为数据服务器,另一个正常使用spark-submit来提交Maven package出来的jar。

《Spark Streaming学习》 这是个示意,具体运行自己的application的时候不需要用example。

基础概念

Linking

在Maven中加入Spark-Streaming的依赖就不再赘述了。不过如果在代码中使用Kafka、Flume或者Kinesis则需要额外加入他们的artifact到pom.xml的依赖中。最新的版本在Maven repository中查找。

《Spark Streaming学习》 Kinesis的库在spark中没有找到

初始化StreamingContext

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

JavaStreamingContext ssc = new JavaStreamingContext(conf,newDuration(1000));

这个初始化类似上面example中的代码,不过在集群使用中更倾向于在spark-submit的命令行中输入具体的master:Spark、Mesos或者YARN等。也就是master = args[0]之类。需要主要的是JavaStreamingContext在内部会创建一个JavaSparkContext,可以通过ssc.sparkContext访问。

batch的时长间隔是一个必须的配置入参。具体的细节可以在下面的调优章节中看到。

还可以通过已经创建的JavaSparkContext获得:

JavaSparkContext sc=…//existing JavaSparkContext

JavaStreamingContext ssc= new JavaStreamingContext(sc,Durations.seconds(1));

在创建好StreamingContext之后接下来要做的内容与example的相仿:

1、定义一个输入源——DStream

2、定义对DStream的计算:通过transformation输出DStream

3、启动接收数据并处理:streamingContext.start()

4、等待处理过程的停止信号:streamingContext.awaitTermination()

5、代码中控制停止:streamingContext.stop()

注意事项:

1、一旦Streaming context被启动,那么就不能修改或者添加任何新的DStream的transformation。

2、一旦Streaming context被停止,将不能被重启。

3、同一时刻在一个JVM上只能有一个StreamingContext在运行。

4、StreamingContext.stop()同时也停止了SparkContext。如果不想把SparkContext停止,需要在stop方法中设置可选参数ssc.stop(false);

5、SparkContext可以多次创建StreamingContext,只要在创建新的之前老的StreamingContext已经被停止,而且没有停止SparkContext。

Discretized Streams (DStreams)

DStream是一连串的RDD,其中每个RDD都包含一个时间间隔中的数据。DStream既是输入的数据流,也是transformation后的处理过的数据流。下图是DStream的表示图示:

《Spark Streaming学习》

其实对DStream的transformation操作就是对具体RDD的操作,比如前面那个example中的情况:

《Spark Streaming学习》

Input DStreams and Receivers

每一个输入的DStream(除了file stream)都与一个Receiver对象相关联。Receiver是从源接收数据并存入内存中。如果想在Application中并行接收多个Stream,那么就需要创建多个input DStream,这也将同时创建多个Receiver来接收多个数据stream。不过需要注意的一点是需要有足够的core来处理这些数据。

注意事项:

1、当在本地运行Spark Streaming程序,不要使用”local”/”local[1]”作为master参数。因为这表示只有本地的一个线程在运行这个任务,这时这个线程会运行input DStream所基于的Receiver,就没有额外的线程资源来进行数据的处理了。所以需要设置master参数为local[n],其中n > Receiver的个数。具体的如何设置master的参数参考Spark Properties

2、同样在集群中运行Streaming任务也需要设置core的个数大于Receiver的个数。不然系统就只会接收数据而不能处理数据了。

Basic Sources

在前面的例子中,我们看到通过使用ssc.socketTextStream(…)来创建一个DStream接收从TCP socket来的数据。其实除了socket,StreamingContext API还可以将文件作为数据源来创建DStream。

File Stream

为了能从任何文件系统中读取数据使用HDFS API(HDFS、S3、NFS等),DStream的创建方式为:

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming将会监视目录dataDirectory并且处理任何新在这个目录中创建的文件(不支持子目录)。需要注意的是:

1、文件必须是统一的格式

2、文件必须是mv到这个文件目录下的(move或者rename)

3、一旦mv过去文件不能修改,所以如果一个文件被连续的修改,那么新加的内容将不会读取出来。

对于简单的text文件,有一种简单的创建DStream的方法:streamingContext.textFileStream(dataDirectory). file stream不需要运行一个Receiver,所以也不需要分配core资源。

Streams based on Custom Receivers

DStream还可以创建处理用户自定义的Receiver接收到的数据流。详见Custom Receiver Guide

Queue of RDDs as a Stream

为了用测试数据测试Spark Streaming,还可以基于一个队列的RDD创建DStream,通过方法streamingContext.queueStream(queueOfRDDs)。其中每一个RDD被认为是DStream的一个batch数据。

Advanced Sources

Kafka:Spark Streaming 2.1.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.

Flume:Spark Streaming 2.1.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.

Kinesis:Spark Streaming 2.1.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.

Custom Sources

Input DStreams同样可以创建为用户自定义的数据源。所有需要做的事情就是实现一个自定义的Receiver用以从用户的源中接收数据并推给Spark。详见Custom Receiver Guide

Receiver的可靠性

基于可靠性有两种数据源一种是可靠的,另一种是不可靠的。可靠的是指诸如Kafka或者Flume这种有ACK反馈的,不可靠的则没有。

可靠的Receiver:会在接收到数据后给源发送ACK

不可靠的Receiver:不会给源发送ACK

Transformations on DStreams

类似于RDD,DStream也可以通过transformation对输入的DStream进行处理。下面列出了常用的一些:

《Spark Streaming学习》 DStream的Transformation

上面一些transformation将会重点介绍:

UpdateStateByKey Operation

流计算往往是7*24小时不间断的,所以需要中间保存一些状态。

《Spark Streaming学习》 注意:使用updateStateByKey必须配置checkpoint目录,否则会报错

Transform Operation

transform主要是对一些RDD支持的Transformation而DStream中没有支持的做扩展。也类似与对DStream的每个RDD进行操作。

《Spark Streaming学习》 这个代码是利用transform对join操作的实现

Window Operations

如下图所示,Spark Streaming提供了一个窗口计算,允许在滑动窗口内的数据进行Transformation。

《Spark Streaming学习》 窗长是3个时间间隔,按照2个时间间隔进行滑动

任何一个window operation都需要下面两个参数:

window length – 窗长

sliding interval – 窗滑动的时间间隔

还是以上面那个数单词的case举例,假如现在有个需求是每隔10s就统计下最近30s时间内的单词数目。那么代码将写为:

《Spark Streaming学习》 窗长和窗滑动的时间间隔都必须是batch时长的整数倍

还有其他一些使用窗的Transformation:

《Spark Streaming学习》 和windowLength以及slideInterval相关的Tranformation

Join Operations

最后需要重点说下在Spark Streaming中做多个不同类型的join是how easily

Stream-stream joins

stream之间可以非常简单的join在一起

《Spark Streaming学习》

两个window stream相join

《Spark Streaming学习》

Stream-dataset joins

下面展示下一个window stream如何和一个Dataset进行join

《Spark Streaming学习》 用到了transform

DStream的输出

output操作类似与RDD的action操作,只有output才会真正执行之前的Transformation操作。

《Spark Streaming学习》

Design Patterns for using foreachRDD

主要讲到实际运行时类的序列化和反序列化的设计,以及错误的使用会导致的一些问题。暂时不细说了。

DataFrame and SQL Operations

还可以在流数据中轻松使用DataFrame和SQL。首先必须要使用StreamingContext的SparkContext来创建一个SparkSession。下面就是一个将之前的word count的例子改成使用DataFrame和SQL的方式。每个RDD都被转成DataFrame,注册成一个临时的表并使用SQL。

下面是具体的代码示例:

《Spark Streaming学习》 代码第一段

《Spark Streaming学习》 以StreamingContext的sparkConf来创建SparkSession

《Spark Streaming学习》 代码第二段

其中

《Spark Streaming学习》 JavaRecord的定义

MLlib Operations

还可以轻松使用MLlib的机器学习算法。首先存在流机器学习算法(比如:Streaming Linear Regression或者Streaming KMeans等),这些算法可以同时学习流数据产生模型并把模型应用到流数据当中。除了这些算法,还有更多的算法可以从offline的历史数据中学习得到模型,然后将这些模型应用到流数据中。具体的可以看指导文档:MLlib

    原文作者:shohokuooo
    原文地址: https://www.jianshu.com/p/a61732928945
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞