流式计算概述
常规计算引擎分类
- 批处理
• 高吞吐,低延迟
• 面向静态数据集合的处理
• 分钟甚至小时级别延迟
• 比如MR, Spark - 流式计算
• 面向行级别数据处理
• 毫秒级延迟
• 比如storm
流式计算分类
- 面向行
Apache Flink — 收集一堆数据,然后一行一行处理
Storm - 面向micro-Batch
Spark Streaming — 收集一堆数据,然后一起处理
流式计算通用户环节
数据源 —> 数据缓存 —> 流式引擎 —> 结果存储
流式计算计算方式
固定窗口
Spark Streaming 常规支持的方式滑动窗口( window )
会话计算( mapWithStates )
存储Spark Streaming的状态信息(类似session),可以进行过期处理
Spark Streaming编程要点
Spark Streaming: exactly once delivery
特殊情况:故障重算,推测执行等
- 监控和管理 jobs
- where to run the driver?
Yarn cluster mode. Driver will continue to running when the client machine goes down. - How to restart driver ?
set up automatic restart.
In spark configuration (e.g. spark-defaults.conf):
spark.yarn.maxAppAttempts=2 // 重试尝试次数
spark.yarn.am.attemptFailuresValidityInterval=1h // 重置尝试次数的时间
spark.yarn.max.executor.failures={8 * num_executors} // executor失败的最大次数
spark.yarn.executor.failuresValidityInterval=1h // 重置失败的时间
spark.task.maxFailures=8 // task重试次数 默认是4
spark.speculation=true //预测执行, 前提:task是幂等
- Summary
各种Listener
接口
- 如何优雅的管理streaming app
思路: Thread hooks – Check for an external flag every N seconds
/** * Stop the execution of the streams, with option of ensuring all received data
* has been processed.
*
* * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
receiverTracker.stop(processAllReceivedData) //default is to wait 10 second, grace waits until done jobGenerator.stop(processAllReceivedData) // Will use spark.streaming.gracefulStopTimeout
jobExecutor.shutdown()
val terminated = if (processAllReceivedData) {
jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time
} else {
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
}
if (!terminated) { jobExecutor.shutdownNow()
}
How to be graceful?
• 方法一 cmd line
– $SPARK_HOME_DIR/bin/spark-submit –master $MASTER_REST_URL –kill $DRIVER_ID
– spark.streaming.stopGracefullyOnShutdown=true
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully) }
• 方法二 By marker file 推荐
– Touch a file when starting the app on HDFS
– Remove the file when you want to stop
– Separate thread in Spark app, calls
streamingContext.stop(stopSparkContext = true, stopGracefully = true)