关于structured streaming, spark社区已经有很多文章介绍,个人认为其中最大的特点是将流视作没有边界的大表,从而能够使用sql来操作这张表,其中包括使用sql join(截止Spark2.1.1,目前只支持streaming和静态表之间的join,还不支持多条流之间的join ‘期待社区大招 ’)。
消息队列 Kafka
Kafka作为一个开源的消息队列,在流处理应用中应用非常广泛,比较典型的架构是: Storm + kafka + Redis 或者 Spark Streaming + Kafka + Redis。Kafka一般作为数据源,流计算引擎例如Storm或者Spark streaming从Kafka中读取数据,这样保证了流处理数据源的低延迟。在Structured Streaming之前,已经很好的和Spark做了集成。
管中窥豹之 Structured Streaming
尽管在Spark2.1.1中Structured streaming还处于Alpha版本,但是已经提供了一些example程序,可以试着跑跑。本文使用2.1.1中提供的structured streaming集成Kafka的样例 - StructuredKafkaWordCount
详细的代码请戳: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
具体来看,本例提供了一个简单的Structured Streaming wordCount程序,从Kafka中读取文本数据,然后在流中对读入的文本进行word拆分并统计各word出现的次数。
代码解析
- 创建Spark Session. 无需多言,每个Spark程序都需要做的步骤。
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
- 读入Kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
format(“kafka”) 表示读取的类型是Kafka,kafka.bootstrap.servers 用来设置Kafka的broker。subscribeType 表示订阅类型,包括’assign’, ‘subscribe’, ‘subscribePattern’。
- 如果设置为assign, 那么相应的值就要设置为消费的topic及partition,例如{“topicA”:[0,1],”topicB”:[2,4]}。
- 如果设置为subscribe,相应的值为消费的topic list.
- 如果设置为subscribePattern,相应的正则表达式描述
- wordcount
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
对每行的字符串分割成各个word, 然后count得到单词出现的次数。
- 输出
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
把结果输出到终端,输出模式为complete模式。
运行
Spark2.1.1发布的包中已经包含了此例子,直接在spark环境中提交作业就可以
- 提交命令
spark-submit –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 –master yarn-client –class org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount /usr/hdp/2.6.0.3-8/spark2/examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar ai01:6667 subscribe test
“`
其中test topic为从某个文本中读取到kafka中的内容。
tail -n 0 -f zookeeper.log | /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --broker-list ai01:6667 --sync --topic test
Spark界面查看作业
从作业监控看到,目前structured streaming的作业并不像之前的spark streaming作业一样有个streaming的标签,可以看到每个batch读取的记录以及处理情况展示。不知道是还有待完善还是以后就这样了. :)屏幕快照 2017-07-05 下午5.11.11.png