StructuredStreaming 写入Kafka

(接上文:zzzach:Spark 流式处理与Kafka)

写入Kafka:

Spark2.3版本以上: 直接使用 .writeStream.format("kafka")

Spark2.3版本一下:使用 .foreach(wrr) wrr是一个ForeachWriter 实例

    // [String] 表示要写入kafka的数据是什么类型     //如("ab",0.1)则换成[(String,Double)]     val wrr = new ForeachWriter[String] {
      // 相关配置       val kafkaProperties = new Properties()
      kafkaProperties.put("bootstrap.servers", brokers)
      kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      // 声明 | 这里 [String,String] 是表示初始化为KV结构       var producer_ : KafkaProducer[String,String] = _
      override def open(partitionId: Long, version: Long): Boolean = {
        // 初始化 | 注意一定要在open函数内部初始化,因为KafkaProducer不支持序列化         producer_ = new KafkaProducer[String,String](kafkaProperties)
        true
      }
      override def process(value: String): Unit = {
        // 写入的数据"value"         producer_.send(new ProducerRecord(topic, value))
      }

      override def close(errorOrNull: Throwable): Unit = {
        producer_.close()
      }
    }
    // 写入kafka的query     val kafka_foreachQuery ={
      streamingDF
        .writeStream
        .outputMode(OutputMode.Append)
        .foreach(wrr)
        .start()
    }
    kafka_foreachQuery.awaitTermination()

报错及解决:

1. Spark侧获取到的KV是Binary类型,如字符串变成了[63, 55, 31, 35, 56]

如果是用的console-producer生产测试数据,Spark端获取数据时是以binary格式来解析 key value的,可以用cast转换成String

2. CDH无法解析ip地址,要使用服务器名

报错:TimeoutException:Failed to update metadata after 60000ms

解决:在使用console-producerconsole-consumer(也包括代码里消费、生产topic时),如果可以试试传“服务器名”而不是“ip地址” ,如使用mynamenode01:9092 而不是 10.33.22.99:9092

3. 指定Spark使用的Kafka版本

报错:java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

解决:需要在spark-submit或者开启spark-shell之前导入kafka的版本,即export SPARK_KAFKA_VERSION=0.10

4. 配置Spark读取Kafka数据的jar包

报错:java.lang.ClassNotFoundException: Failed to find data source: kafka

解决:提交任务(或者开启shell)的时候加上Spark参数 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0会自动下载jar包并进行配置(也可以手动进行,但流程比较复杂

5. ForeachWriter要在open函数里初始化kafkaProducer

报错:Class kafkashaded.org.apache.kafka.common.serialization.StringSerializer could not be found

解决:

如果是spark2.3以下,写入kafka必须用ForeachWriter的方式,这里kafkaProducer不支持序列化,所以不能直接在外部初始化,要到每个executors上都初始化一个才行(即在open函数里再初始化)

如果是spark2.3以上 ,直接用.writeStream().format("kafka")

6. 同时开启多个query

问题:StructuredStreaming在构造某个query之后,如果直接执行awaitTermination()会卡主整个主程序(但是在spark-shell里面好像不会),导致无法同时启动多个query(例如想写入kafka的同时落盘hdfs备份)

解决:在所有query执行完start后,使用sparkSession.streams.awaitAnyTermination()

7.消费kafka数据时报错( Offset 问题)

【2019-01-23更新】:这个其实是 2.1.0 的bug,使用>=2.2.0版本一切正常。注意除了spark使用2.2.0,–package参数的kafka也要修改为2.2.0版本

报错:java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"rec.news-data":{"23":104,"41":110,"32":109,"44":109,"35":109,"8":109,"17":109,"26":109,"11":109,

解决:(临时)删除checkpoint文件。这个是因为Streaming程序有落盘到HDFS(parquet sink),并且checkpoint文件有问题,StructuredStreaming试图恢复丢失掉的数据时出问题,目前还没有深入了解。临时解决办法是删除checkpoint文件,但是这样streaming断掉的那几分钟的数据如果要找回来就需要手动传一下kafka的offset(使用startingOffsets参数)

8. 备注一些kafka控制台指令

  • 创建生产者(如果指定的topic没有创建过,会自动创建)

kafka-console-producer --broker-list datanode002:9092 --topic test.a

  • 使用KV结构

kafka-console-producer --broker-list datanode002:9092 --topic test.a --property "parse.key=true" --property "key.separator=:"

  • 直接导入整个文件内容作为“生产数据”

kafka-console-producer --broker-list datanode002:9092 test.a --property "parse.key=true" --property "key.separator=:" < sample.txt

  • 创建消费者

kafka-console-consumer --bootstrap-server datanode002:9092 --from-beginning --topic test.a

    原文作者:zzzach
    原文地址: https://zhuanlan.zhihu.com/p/53723190
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞