网上用python写spark+kafka的资料好少啊 自己记录一点踩到的坑~
spark+kafka介绍的官方网址:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
python的pyspark库函数文档:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=kafkautils.createdirectstream#pyspark.streaming.kafka.KafkaUtils.createDirectStream
上面两个是最重要的资料,大多数问题可以通过仔细研读上面两个文档得到答案
官网上说了,spark和kafka连用有两种方式:接收器形式 以及 直连形式
一、 接收器形式
优点:支持kafka的group.id设置,支持用kafka api查询offset,如果数据断掉后,可以通过group.id轻松找到上一次失败的位置
缺点:
1.失败处理复杂。由于kafka队列信息由kafka自己记录,当spark消费了数据但是处理中出错时会导致数据丢失。为了避免数据丢失就必须开启Write Ahead Logs,把spark接收到的数据都存储到分布式文件系统中,比如HDFS,然后失败时从存储的记录中找到失败的消息。这导致同一批数据被kafka和spark存储了2次。造成数据冗余。
2.如果有多个地方都想获取同一个kafka队列的数据,必须建立多个流,无法用一个流并行处理。
该方法是比较老的一种方式,并不太被推荐。
二、直连形式
优点:
1. 不需两次存储数据,直连形式时,spark自己管理偏移信息,不再使用kafka的offset信息。所以spark可以自行处理失败情况,不要再次存储数据。spark保证数据传输时Exactly-once。
2.只需建立一个流就可以并行的在多个地方使用流中的数据
缺点:
不支持kafka的group,不支持通过kafka api查询offset信息!!!!
在连接后spark会根据fromOffsets参数设置起始offset,默认是从最新的数据开始的。也就是说,必须自己记录spark消耗的offset位置。否则在两次脚本启动中间的数据都会丢失。
我选用的是直连形式,我处理offset的方法是将spark消费的offset信息实时记录到文件中。在启动脚本时通过记录的文件来找到起始位置。
#!/usr/bin/python # coding=utf-8 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition import time import os import json broker_list = "xxxx" topic_name = "xxxx" timer = 5 offsetRanges = [] def store_offset_ranges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def save_offset_ranges(rdd): root_path = os.path.dirname(os.path.realpath(__file__)) record_path = os.path.join(root_path, "offset.txt") data = dict() f = open(record_path, "w") for o in offsetRanges: data = {"topic": o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "untilOffset": o.untilOffset} f.write(json.dumps(data)) f.close() def deal_data(rdd): data = rdd.collect() for d in data: # do something pass def save_by_spark_streaming(): root_path = os.path.dirname(os.path.realpath(__file__)) record_path = os.path.join(root_path, "offset.txt") from_offsets = {} # 获取已有的offset,没有记录文件时则用默认值即最大值 if os.path.exists(record_path): f = open(record_path, "r") offset_data = json.loads(f.read()) f.close() if offset_data["topic"] != topic_name: raise Exception("the topic name in offset.txt is incorrect") topic_partion = TopicAndPartition(offset_data["topic"], offset_data["partition"]) from_offsets = {topic_partion: long(offset_data["untilOffset"])} # 注意设置起始offset时的方法 print "start from offsets: %s" % from_offsets sc = SparkContext(appName="Realtime-Analytics-Engine") ssc = StreamingContext(sc, int(timer)) kvs = KafkaUtils.createDirectStream(ssc=ssc, topics=[topic_name], fromOffsets=from_offsets, kafkaParams={"metadata.broker.list": broker_list}) kvs.foreachRDD(lambda rec: deal_data(rec)) kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges) ssc.start() ssc.awaitTermination() ssc.stop() if __name__ == '__main__': save_by_spark_streaming()
运行:
正常情况下,只要输入下面的语句就可以运行了
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 spark_kafka.py
然而,我的总是报错,找不到依赖包,说各种库不认识。所以我只好用–jars来手动指定包的位置了………………
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 --jars /root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar,/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar spark_kafka.py
吐槽:
我就踩在直连形式不支持offset的坑上了….. 开始官方文档没仔细看,就瞄了一眼说是直连形式好,就豪不犹豫的用了。结果我的脚本不稳定,各种断,然后中间数据就各种丢啊…….
还有官网上居然完全没有对fromOffsets这个参数的说明,我找了好久好久才弄清楚这个参数怎么拼出来啊……………..