(接上文:zzzach:Spark 流式处理与Kafka)
写入Kafka:
Spark2.3版本以上: 直接使用 .writeStream.format("kafka")
Spark2.3版本一下:使用 .foreach(wrr)
wrr是一个ForeachWriter
实例
// [String] 表示要写入kafka的数据是什么类型 //如("ab",0.1)则换成[(String,Double)] val wrr = new ForeachWriter[String] {
// 相关配置 val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", brokers)
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 声明 | 这里 [String,String] 是表示初始化为KV结构 var producer_ : KafkaProducer[String,String] = _
override def open(partitionId: Long, version: Long): Boolean = {
// 初始化 | 注意一定要在open函数内部初始化,因为KafkaProducer不支持序列化 producer_ = new KafkaProducer[String,String](kafkaProperties)
true
}
override def process(value: String): Unit = {
// 写入的数据"value" producer_.send(new ProducerRecord(topic, value))
}
override def close(errorOrNull: Throwable): Unit = {
producer_.close()
}
}
// 写入kafka的query val kafka_foreachQuery ={
streamingDF
.writeStream
.outputMode(OutputMode.Append)
.foreach(wrr)
.start()
}
kafka_foreachQuery.awaitTermination()
报错及解决:
1. Spark侧获取到的KV是Binary
类型,如字符串变成了[63, 55, 31, 35, 56]
如果是用的console-producer
生产测试数据,Spark端获取数据时是以binary
格式来解析 key value的,可以用cast
转换成String
2. CDH无法解析ip地址,要使用服务器名
报错:TimeoutException:Failed to update metadata after 60000ms
解决:在使用console-producer
和console-consumer
(也包括代码里消费、生产topic时),如果可以试试传“服务器名”而不是“ip地址” ,如使用mynamenode01:9092
而不是 10.33.22.99:9092
3. 指定Spark使用的Kafka版本
报错:java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
解决:需要在spark-submit
或者开启spark-shell
之前导入kafka的版本,即export SPARK_KAFKA_VERSION=0.10
4. 配置Spark读取Kafka数据的jar包
报错:java.lang.ClassNotFoundException: Failed to find data source: kafka
解决:提交任务(或者开启shell)的时候加上Spark参数 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
会自动下载jar包并进行配置(也可以手动进行,但流程比较复杂
5. ForeachWriter要在open函数里初始化kafkaProducer
报错:Class kafkashaded.org.apache.kafka.common.serialization.StringSerializer could not be found
解决:
如果是spark2.3以下,写入kafka必须用ForeachWriter
的方式,这里kafkaProducer不支持序列化,所以不能直接在外部初始化,要到每个executors上都初始化一个才行(即在open
函数里再初始化)
如果是spark2.3以上 ,直接用.writeStream().format("kafka")
6. 同时开启多个query
问题:StructuredStreaming在构造某个query之后,如果直接执行awaitTermination()
会卡主整个主程序(但是在spark-shell里面好像不会),导致无法同时启动多个query(例如想写入kafka的同时落盘hdfs备份)
解决:在所有query执行完start后,使用sparkSession.streams.awaitAnyTermination()
7.消费kafka数据时报错( Offset 问题)
【2019-01-23更新】:这个其实是 2.1.0 的bug,使用>=2.2.0版本一切正常。注意除了spark使用2.2.0,–package参数的kafka也要修改为2.2.0版本
报错:java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"rec.news-data":{"23":104,"41":110,"32":109,"44":109,"35":109,"8":109,"17":109,"26":109,"11":109,
解决:(临时)删除checkpoint文件。这个是因为Streaming程序有落盘到HDFS(parquet sink),并且checkpoint文件有问题,StructuredStreaming试图恢复丢失掉的数据时出问题,目前还没有深入了解。临时解决办法是删除checkpoint文件,但是这样streaming断掉的那几分钟的数据如果要找回来就需要手动传一下kafka的offset(使用startingOffsets参数)
8. 备注一些kafka控制台指令
- 创建生产者(如果指定的topic没有创建过,会自动创建)
kafka-console-producer --broker-list datanode002:9092 --topic test.a
- 使用KV结构
kafka-console-producer --broker-list datanode002:9092 --topic test.a --property "parse.key=true" --property "key.separator=:"
- 直接导入整个文件内容作为“生产数据”
kafka-console-producer --broker-list datanode002:9092 test.a --property "parse.key=true" --property "key.separator=:" < sample.txt
- 创建消费者
kafka-console-consumer --bootstrap-server datanode002:9092 --from-beginning --topic test.a