Apache Beam将Kafka作为数据输入的实践案例源码分析

Apache BeamKafka作为数据输入的实践案例源码分析:

首先,我们建立一个maven工程,在添加原始的Beam依赖之后,还需要添加如下的支持Kafka的依赖

<dependency>

<groupId>org.apache.beam</groupId>

<artifactId>beam-sdks-java-io-kafka</artifactId>

<version>0.5.0</version>

</dependency>

依赖下载完成之后,我们就可以使用像诸如KafkaIO工具之类的工具类来处理Beam中的来自kafka的数据。

在使用KafkaIO工具类之前,我们先来对KafkaIo类的源码做一个翻译工作,以便更好的使用它

/**
 * An unbounded source and a sink 
 for <a href="http://kafka.apache.org/"
>Kafka</a> topics.
(以kakfa的topic形式的无边界数据源)
 * Kafka version 0.9 
 and above are supported.
(支持kafka 0.9以及0.9以上的版本)
 *

(从kafka的topics中读取数据)
*
 * <p>KafkaIO source returns 
unbounded collection of Kafka records as
 * {@code PCollection<KafkaRecord<K, V>>}
. A {@link KafkaRecord} includes basic
 * metadata like topic-partition and offset
 , along with key and value associated with a Kafka
 * record.
 *
(通过读取kafka toppics中的数据从而返回一
个无边界的集合对象
PCollection<KafkaRecord<K, V>>

,该对象包含基本的元数据信息,包括topic分区
以及偏移量
,并以键值对的形式记录kafka的每一个record)
 * <p>Although most applications consume a
 single topic, the source can be configured to consume
 * multiple topics or even a
  specific set of {@link TopicPartition}s.
 *

(虽然很多应用消费单个的kafka数据topic,

但是数据源也可以配置消费多个topics,,
甚至可以指定TopicPartion集合。配置kafka数据源
,必须至少要配置好bootstrapServers,至少一个topic。

)

 案例代码如下所示:
WordCountOptions options =
PipelineOptionsFactory
.fromArgs(args).withValidation()
  .as(WordCountOptions.class);


Pipeline pipeline = 
Pipeline.create(options);
// 返回 PCollection<KafkaRecord<byte[], byte[]>
//配置kafka server和topic,该配置是必须的
pipeline.apply(KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a", "topic_b"))
// 接下来的配置是可以选择的(可选项) :
//配置键值对编码
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
//配置缓冲字节大小
.updateConsumerProperties(ImmutableMap
.of("receive.buffer.bytes", 1024 * 1024))
//自定义函数计算记录时间戳(缺省为处理时间)
.withTimestampFn(new MyTypestampFunction())
//自定义 watermark函数 (默认是 timestamp)
.withWatermarkFn(new MyWatermarkFunction())
//最后,如果你不需要Kafka 元数据的话,你可以丢弃它
.withoutMetadata() 
//返回PCollection<String>,指定类型为String
.apply(Values.<String>create())
* <h3>分区和检查点</h3>
* The Kafka partitions are 
evenly distributed among splits (workers).
(kafka分区分布在各个workers上)
 * Dataflow checkpointing is f
 ully supported and
 * each split can resume 
 from previous checkpoint
(Dataflow支持每个分片可以从以前的检查点上恢复)
. See
{@link UnboundedKafkaSource#generateInitialSplits
(int, PipelineOptions)} for more details on
 * splits and checkpoint support.
 *(当pipeline第一次开始执行的时候,
没有任何checkpoint,source开始消费最新的数据,)
 *
 * <h3>写入数据到kafka</h3>
*(以键值对的形式写入kafka)
 * <p>KafkaIO sink supports writing
 key-value pairs to a Kafka topic. Users can also write
(也可以只写入value值)
 * just the values. To configure 
 a Kafka sink, you must specify 
 at the minimum Kafka

  PCollection<KV<Long, String>> kvColl = ...;
  kvColl.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
  .withTopic("results")
  // 设置Key and Value编码
.withKeyCoder(BigEndianLongCoder.of()) 
.withValueCoder(StringUtf8Coder.of())
  // 进一步指定KafkaProducer
// 指定KafkaProducer属性,比如压缩格式等
.updateProducerProperties(ImmutableMap
.of("compression.type", "gzip")));
(通常情况下,有可能只需要写入值到kafka中
那就按照如下的方式进行)
<p>Often you might want to write just 
values without any keys to Kafka. Use {@code values()} to
  write records with default empty(null) key:

 PCollection<String> strings = ...;
 strings.apply(KafkaIO.write()
 .withBootstrapServers("broker_1:9092,broker_2:9092")
 .withTopic("results")
 .withValueCoder(StringUtf8Coder.of()) 

 .values() //只需要在此写入默认的key就行了,默认为null值
 );

*/

    原文作者:李林贵
    原文地址: https://zhuanlan.zhihu.com/p/25227588
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞