spark streaming
概述
传统的DStream原理和使用
批处理间隔
- Batch Duration
- 数据处理是按照批次处理的
- 超过这个时间间隔就把收集的数据作为一个批次去处理
- 对应着RDD
窗口间隔
- 窗口内部有N个批处理数据
- 满足窗口间隔条件才会触发操作
- 应该是批处理间隔的整数倍
- 对应着DStream
滑动间隔
- 默认等于批处理间隔
- 应该是批处理间隔的整数倍
- 每次往下滑动的间隔
DStream
- 表示连续数据流
- 内部一个HashMap
- 高 度抽象
- 对RDD的一层封装
- DStream代表了时空的概念
- RDD之间的具体依赖形成了空间维度
- DStream在RDD的基础上增加了时间维度
- DStream是一个没有边界的集合,没有大小的限制。
DStreamGraph
- 多个DStream之间的里来关系
- streaming代码会转化为DStream
- DStream到RDD的转换, 最后生成了RDD graph
思想
- 将流式计算分解为一系列短小的批处理作业
- 将DStream数据转换为一段段的RDD
- 把对DStream的操作转换为对RDD的操作
- 将RDD计算结果算出来
DStream使用
概述
- 应用的类型
- 单状态的应用,从一个状态转换到另一个状态
- 需要状态的应用
输入
- socketTextStream
- fileStream
- queueStream
- kafka
- 基于receiver的方法
- Receiver是使用Kafka的高层次Consumer API来实现的
- 在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)
- 偏移量管理交给kafka, 老版本存在zk中
- 原理
- receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
- Receiver的方式则不能保证只被处理一次,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据
- 源码
- KafkaInputDStream
- 实现的是DStream的ReceiverInputDStream
- 依赖给出的Receiver类,生成数据
- Compute函数中调用
- 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据
- Receiver方式就是从Kafka中拉取数据,每次接受固定时间间隔的数据存储到内存中
- 注意事项
- Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
- 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
- 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
- Direct方式
- 不适用receiver
- 原理
- Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
- 周期性查询kafka,获取topic和partition的offset, 存储在checkpoint中
- 优点
- 简单并且并行
- kafkaPartition和RDD partition会对应
- 性能更好
转换操作
- 通过已知DStream生成新DStream
- ransform
- updateStateByKey
窗口转换
- window
- coutByWindow
- reduceByWindow
- reduceByKeyAndWindow
输出
cache和checkpoint
- cache
- 需要checkpoint的
- 元数据checkpoint
- 配置信息
- DStream操作信息
- 未处理的batch信息
- RDDcheckpoint
- 聚合操作的结果
- 有状态的操作结果
- 要避免依赖链条变成,而导致的失败恢复时间过长
- 应该定期checkpoint
- 如何自动恢复
- 设置checkpoint
- 创建SparkSession,使用之前的checkpoint信息
- –Deploy-mode 为cluster
- –supervise
streaming原理运行原理
- DStream
- 概述
- 定期生成新的RDD
- 所有核心操作通过JobScheduler转化到RDD层操作
- generateJob
- 内部方法,实际生成Job,
- 调用Compute
- 被DStreamGraph调用,
- 类型
- 输入
- InputDStream
- 子类
- ConstantInputStream
- 测试用
- 每次都是一样的数据
- FileInputDStream
- ReceiverInputDStream
- QueueInputDStream
- DirectDStream
- ReceiverInputDStream
- 概述
- 单点不会成为瓶颈
- 通过将接收器放在work节点来生成的
- 每次接受固定时间间隔的数据存储到内存中
- 块间隔
- 是一个固定的时间
- 决定多久生成一个存储的Block块
- Receiver
- 放在每个Work节点上的每个Executor中,用来接收数据
- 将接收的数据放到Spark块存储中
- store方法!!!
- ForEachDStream
- 最后转换成这个,进行操作
- register()
- 注册操作!
- DStreamGraph
- addOutputStream
- 递归的setGraph
- DStream类中的方法
- 为依赖的每个DStream设置DStreamGraph
- StateDStream
- computeUsingPreviousRDD方法
- 之前的数据存储在哪里?
- 操作
- updateStateByKey
- mapWithState
- :也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。效率更高,建议使用这个.
- 还支持对状态超时管理,用户可以自行选择需要的输出,通过mapWithState操作可以很方便地实现前面提出的问题。
- window相关的
- WindowedDStream
- 计算多个处于同一个窗口时间内DStream的内容.
- 窗口大小
- 滑动步长
- streamingContext
- 概述
- 负责
- 创建输入的DStream
- 实现DStream算子操作
- start方法来开始实时处理数据
- awaitTermination
- start后, 不能添加计算逻辑
- stop后不能start重启
- stop可会停掉SparkContext
- 一个SC可以创建多个StreamingContext
- 成员
- 内部持有一个SparkContext
- checkpoint信息
- checkpointDuration
- batchDuration
- DStreamGraph
- 管理graph之间的DAG
- 多个DStream之间的里来关系
- streaming代码会转化为DStream
- DStream到RDD的转换, 最后生成了RDD graph
- JobScheduler
- job调度
- 内部使用JobGeneator生成Job
- ContextWaiter
- StreamingJobProgressListener
- 初始化过程
- 传入SC
- 初始化DStreamGraph
- 初始化JobGenerator
- 成员StreamingListenerBus
- 用来管理Job相关的消息
- StreamingListener
- 接收streaming消息的接口
- onStreamingStarted
- onReceiverStarted
- onReceiverError
- onReceiverStopped
- onBatchSubmitted
- onBatchStarted
- onBatchCompleted
- onOutputOperationStarted
- onOutputOperationCompleted
- StreamingJobProgressListener
- ExecutorAllocationManager
- 管理分配给StreamingContext的Executor
- StreamingContext
- eventLoop
- 用来管理接收JobSchedulerEvent
- processEvent
- JobStarted
- JobCompleted
- ErrorReported
- 处理定时生成job的消息
- GenerateJobs
- 调用DStream里面的GenerateJob
- 然后调用DStream的GenerateJob
- ClearMetadata
- DoCheckpoint
- ClearCheckpointData
- 成员Receiver Tracker
- 管理Executor中的Receiver
- 分配Block给Batch
- 协议
- ReceiverTracker
- receive
- StartAllReceivers
- RestartReceiver
- CleanupOldBlocks
- UpdateReceiverRateLimit
- receiveAndReply
- RegisterReceiver
- AddBlock
- DeregisterReceiver
- AllReceiverIds
- GetAllReceiverInfo
- StopAllReceivers
- receiver类
- 运行在Executor中
- 长时间作业
- 用于接收数据保存到Executor的内存中
- 如果开启WAL,会写到容错文件系统中
- 是ReceiverInputDStream依赖的组件
- DStreamGraph
- 管理graph之间的DAG
- 多个DStream之间的依赖关系
- streaming代码会转化为DStream
- DStream到RDD的转换, 最后生成了RDD graph
- 关键方法
- addInputStream
- addOutputStream
- 添加到DStreamGraph中
- outputStreams中存储这个OutputStream
- generateJobs
- 生成job
- 给每个OutputStream调用generateJob
- job提交流程
- 输入InputDStream
- 创建
- InputDStream
- 一般还会定义Receiver,在Executor上接收数据
- 缓存为Block,进行计算
- 设置好数据源
- 转换和输出操作
- 输出操作的时候, 把自己添加到DStreamGraph中
- 给每个DStream设置好Graph引用
- 一个job的执行过程
- 定时器触发后续一系列操作
- 内部使用JobGenerator生成job
- 每个时间间隔自动生成作业
- 定期调用JobGenerate方法
- 调用DStreamGraph的jobGenerate方法
- 挨个调用OutputDStream的JobGenerate
- getOrCompute
- 生成RDD
- 从ForeachDStream开始
- 可能经过ShuffleDStream和MappedDStream
- 到InputDStream
- context.sparkContext.runJob
集群原理
- 并行度???
- Driver端
- StreamContext
- 最终处理还是会交给SparkContext
- DStream Graph
- ReceiverTracker
- BlockManger
- Client端
- 接收器跟踪器
- Receiver Tracker运行在Driver中
- Receiver运行在Executor中
容错机制
- 基于RDD的容错机制
- streaming自身的容错机制
- 冷备份
- WAL
- 冷备份checkpoint
- 周期性保存检查点
- 与core的checkpoint不同
- metadata checkpoint
- data checkpoint
- 失败后
- checkpoint信息用于重启Driver, 重新构造上下文和Receiver
- 恢复块元数据信息
- 重新生成未完成Job
- 是否支持exactly-once
- spark streaming自身没有这个功能
- 依靠No Receiver方式和业务处理方式来解决