[译]Spark Streaming + Kafka集成指南

本文适用于Kafka broker 0.8.2.1及更高版本。

这里会说明如何配置Spark Streaming接收Kafka的数据。有两种方法 – 老方法使用Receiver和Kafka的高层API,新方法不适用Receiver。两种方法具有不同的编程模型,性能特点和语义保证,下面具体介绍。两种方法对于当前版本的Spark(2.1.1)都有稳定的API。

方法1:基于Receiver的方法

这个方法使用Receiver接收数据。Receiver使用Kafka的高层消费者API实现。和所有receiver一样,通过Receiver从Kafka接收的数据存储到Spark executor中,然后由Spark Streaming启动的作业处理这些数据。

但是,在默认配置下,这种方法会在出错时出现数据丢失(具体参见receiver reliability。为了保证零数据丢失,必须在Spark Streaming中额外启用Write Ahead Logs)。这样会同步保存所有接收到的Kafka数据到分布式文件系统(如HDFS)中,所有数据都可以从出错中进行恢复。

下面,讨论如何使用这种方法编写streaming应用程序。

  1. 链接:对于使用SBT/Maven工程定义的Scala/Java应用程序,需要将你的streaming应用程序链接到下面的artifact。
 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.1.1

对于Python应用程序,必须字部署用用程序时添加上面的库及其依赖。参见下面的部署章节。

  1. 编程:在streaming应用程序代码中,引入KafkaUtils并创建一个输入DStream,如下。
 import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

需要记住的几点:

  • Kafka中的Topic分区和Spark Streaming中的RDD分区是不相关的。所以在KafkaUtils.createStream()增加指定topic分区数量只会增加单个receiver中消费topic的线程数量。不会增加Spark处理数据的并行性。
  • 对于不同group和topic可以创建多个Kafka输入DStream,使用多个receiver并行接收数据。
  • 如果已经启用了Write Ahead Logs,接收的数据会被复制到日志中。因此,需要将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER(即KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。
  1. 部署:对于任何Spark应用程序,spark-submit用于启动应用程序。但是,对于Scala/Java应用程序和Python应用程序有些不同。

对于Scala和Java应用程序,如果使用了SBT或者Maven管理项目,则会将spark-streaming-kafka-0-8_2.11及其依赖打包到应用程序JAR包中。确保spark-core_2.11spark-streaming_2.11标记为provided,它们在Spark安装包中已经存在了。然后使用spark-submit启动应用程序。

对于Python应用程序缺少了SBT/Maven项目管理,需要将spark-streaming-kafka-0-8_2.11及其依赖直接添加到spark-submit,使用--packages(具体参见Application Submission Guide)。如下:

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 ...

另外,也可以从Maven repository下载spark-streaming-kafka-0-8-assembly的JAR包,然后使用--jars添加到spark-submit

方法2:直接方法(不适用Receiver)
这种新方法从Spark 1.3开始支持,具有更强的端到端保证。和使用receiver接收数据不同,这种方法会周期性地查询Kafka每个topic+partition最新的偏移量,然后根据定义的偏移量范围在每个批次中处理数据。当处理数据的作业启动后,Kafka的简单消费者API会被用来读取定义偏移量范围的数据(和从文件系统中读取文件类似)。注意,这个特性是从Spark 1.3开始支持Scala和Java API,从Spark 1.4开始支持Python API。

这种方法相比于基于receiver的方法具有以下优势:

  • 简化并行:不需要创建多个输入Kafka流,然后合并它们。使用directStream,Spark Streaming会创建和Kafka分区一样多的RDD分区进行消费,会并行读取Kafka的数据。所以Kafka分区和RDD分区会有一一对应,更容易理解和使用。
  • 效率:方法1中实现零数据丢失需要将数据存储到Write Ahead Log,这会复制一遍数据。这实际上是低效的,因为数据复制了两次,一次是Kafka,一次是Write Ahead Log。方法2解决了这个问题,因为没有receiver,也就不需要Write Ahead Logs。只要有足够的Kafka缓冲,可以从Kafka恢复消息。
  • 只有一次语义:方法1使用Kafka的高层API在Zookeeper中存储消费的偏移量。这是从Kafka消费数据的传统方法。虽然这种方法(结合write ahead logs)可以保证零数据丢失(即至少一次语义),但是还是会有一些情况会在出错时导致一些记录被消费两次。这是因为Spark Streaming接收数据和Zookeeper跟踪的偏移量不一致导致的。因此,在方法2中,使用了简单Kafka API不适用Zookeeper。偏移量是在Spark Streaming的检查点中跟踪的。这就消除了Spark Streaming和Zookeeper/Kafka的不一致,每条记录都只会被Spark Streaming接收一次,即便在出错的情况下。为了实现结果输出的只有一次语义,数据存到外部存储的输出操作必须是幂等的,或是保存结果和偏移量的原子事务。

注意,这种方法的一个劣势是不在Zookeeper中更新偏移量,因此基于Zookeeper的Kafka监控工具就无法显示进度。但是,可以在每个批次中访问偏移量,然后自己更新到Zookeeper中。

下面,讨论如何使用这种方法编程。

  1. 链接:这种方法只有Scala/Java应用程序支持。SBT/Maven工程链接下面的artifact。
 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.1.1
  1. 编程:在streaming应用程序代码中,引入KafkaUtils然后创建一个输入DStream,如下。
 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

可传递messageHandlercreateDirectStream,用于访问包含当前消息元数据的MessageAndMetadata并转为想要的格式。可参见API docsexample

在Kafka参数中,必须指定metadata.broker.listbootstrap.servers。默认地,会从每个Kafka分区的最近偏移量开始消费。如果设置了配置auto.offset.resetsmallest,则会从最小的偏移量开始。

也可以从任意偏移量开始消费,使用其它KafkaUtils.createDirectStream变量。另外,如果想要访问每个批次范根的Kafka偏移量,方法如下。

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

如果想要使用基于Zookeeper的Kafka监控工具显示streaming应用程序的过程,可使用上述代码自己更新偏移量的信息到Zookeeper。

注意,HasOffsetRanges只在directKafkaStream调用的第一个方法中可以成功获取。可以使用transform()不用foreachRDD()作为第一个方法调用以便访问偏移量,然后再调用更多Spark方法。但是,需要意识到RDD分区和Kafka分区的一一对应关系在调用了shuffle或者repartition方法(如reduceByKey()或window())后就不存在了。

另外需要注意的是,由于这种方法不使用Receiver,标准receiver(spark.streaming.receiver.*配置相关)不能应用于这里的输入DStream。相反,使用spark.streaming.kafka.*配置。非常重要的一个是spark.streaming.kafka.maxRatePerPartition设置每个Kafka分区通过直接API读取的最大速率(每秒钟的记录数)。

  1. 部署:和方法1一样的。
    原文作者:steanxy
    原文地址: https://www.jianshu.com/p/03a3d0deb418
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞