流式计算概述和Spark Streaming tips

流式计算概述

常规计算引擎分类

  1. 批处理
    • 高吞吐,低延迟
    • 面向静态数据集合的处理
    • 分钟甚至小时级别延迟
    • 比如MR, Spark
  2. 流式计算
    • 面向行级别数据处理
    • 毫秒级延迟
    • 比如storm

流式计算分类

  1. 面向行
    Apache Flink — 收集一堆数据,然后一行一行处理
    Storm
  2. 面向micro-Batch
    Spark Streaming — 收集一堆数据,然后一起处理

流式计算通用户环节

数据源 —> 数据缓存 —> 流式引擎 —> 结果存储

流式计算计算方式

  1. 固定窗口
    Spark Streaming 常规支持的方式

  2. 滑动窗口( window )

  3. 会话计算( mapWithStates )
    存储Spark Streaming的状态信息(类似session),可以进行过期处理

Spark Streaming编程要点

Spark Streaming: exactly once delivery
特殊情况:故障重算,推测执行等

  1. 监控和管理 jobs
  • where to run the driver?
    Yarn cluster mode. Driver will continue to running when the client machine goes down.
  • How to restart driver ?
    set up automatic restart.
    In spark configuration (e.g. spark-defaults.conf):
spark.yarn.maxAppAttempts=2  // 重试尝试次数
spark.yarn.am.attemptFailuresValidityInterval=1h  // 重置尝试次数的时间
spark.yarn.max.executor.failures={8 * num_executors}  // executor失败的最大次数
spark.yarn.executor.failuresValidityInterval=1h // 重置失败的时间
spark.task.maxFailures=8   // task重试次数 默认是4
spark.speculation=true  //预测执行, 前提:task是幂等
  • Summary
    各种Listener接口
  1. 如何优雅的管理streaming app
    思路: Thread hooks – Check for an external flag every N seconds
/** * Stop the execution of the streams, with option of ensuring all received data 
    * has been processed. 
    *
    * * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext 
    * will be stopped regardless of whether this StreamingContext has been 
    * started. 
    * @param stopGracefully if true, stops gracefully by waiting for the processing of all 
    * received data to be completed 
    */ 
    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
    receiverTracker.stop(processAllReceivedData) //default is to wait 10 second, grace waits until done jobGenerator.stop(processAllReceivedData) // Will use spark.streaming.gracefulStopTimeout 
    jobExecutor.shutdown() 
    val terminated = if (processAllReceivedData) { 
        jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time 
    } else { 
        jobExecutor.awaitTermination(2, TimeUnit.SECONDS) 
    } 
    if (!terminated) { jobExecutor.shutdownNow() 
    }

How to be graceful?
• 方法一 cmd line
– $SPARK_HOME_DIR/bin/spark-submit –master $MASTER_REST_URL –kill $DRIVER_ID
– spark.streaming.stopGracefullyOnShutdown=true

private def stopOnShutdown(): Unit = { 
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) 
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") 
// Do not stop SparkContext, let its own shutdown hook stop it 
stop(stopSparkContext = false, stopGracefully = stopGracefully) }

• 方法二 By marker file 推荐
– Touch a file when starting the app on HDFS
– Remove the file when you want to stop
– Separate thread in Spark app, calls

streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    原文作者:这个该叫什么呢
    原文地址: https://www.jianshu.com/p/f89c00b41a67
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞