Apache Beam将Kafka作为数据输入的实践案例源码分析:
首先,我们建立一个maven工程,在添加原始的Beam依赖之后,还需要添加如下的支持Kafka的依赖
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>0.5.0</version>
</dependency>
依赖下载完成之后,我们就可以使用像诸如KafkaIO工具之类的工具类来处理Beam中的来自kafka的数据。
在使用KafkaIO工具类之前,我们先来对KafkaIo类的源码做一个翻译工作,以便更好的使用它
/**
* An unbounded source and a sink
for <a href="http://kafka.apache.org/"
>Kafka</a> topics.
(以kakfa的topic形式的无边界数据源)
* Kafka version 0.9
and above are supported.
(支持kafka 0.9以及0.9以上的版本)
*
(从kafka的topics中读取数据)
*
* <p>KafkaIO source returns
unbounded collection of Kafka records as
* {@code PCollection<KafkaRecord<K, V>>}
. A {@link KafkaRecord} includes basic
* metadata like topic-partition and offset
, along with key and value associated with a Kafka
* record.
*
(通过读取kafka toppics中的数据从而返回一
个无边界的集合对象
PCollection<KafkaRecord<K, V>>
,该对象包含基本的元数据信息,包括topic分区
以及偏移量
,并以键值对的形式记录kafka的每一个record)
* <p>Although most applications consume a
single topic, the source can be configured to consume
* multiple topics or even a
specific set of {@link TopicPartition}s.
*
(虽然很多应用消费单个的kafka数据topic,
但是数据源也可以配置消费多个topics,,
甚至可以指定TopicPartion集合。配置kafka数据源
,必须至少要配置好bootstrapServers,至少一个topic。
)
案例代码如下所示:
WordCountOptions options =
PipelineOptionsFactory
.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline pipeline =
Pipeline.create(options);
// 返回 PCollection<KafkaRecord<byte[], byte[]>
//配置kafka server和topic,该配置是必须的
pipeline.apply(KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a", "topic_b"))
// 接下来的配置是可以选择的(可选项) :
//配置键值对编码
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
//配置缓冲字节大小
.updateConsumerProperties(ImmutableMap
.of("receive.buffer.bytes", 1024 * 1024))
//自定义函数计算记录时间戳(缺省为处理时间)
.withTimestampFn(new MyTypestampFunction())
//自定义 watermark函数 (默认是 timestamp)
.withWatermarkFn(new MyWatermarkFunction())
//最后,如果你不需要Kafka 元数据的话,你可以丢弃它
.withoutMetadata()
//返回PCollection<String>,指定类型为String
.apply(Values.<String>create())
* <h3>分区和检查点</h3>
* The Kafka partitions are
evenly distributed among splits (workers).
(kafka分区分布在各个workers上)
* Dataflow checkpointing is f
ully supported and
* each split can resume
from previous checkpoint
(Dataflow支持每个分片可以从以前的检查点上恢复)
. See
{@link UnboundedKafkaSource#generateInitialSplits
(int, PipelineOptions)} for more details on
* splits and checkpoint support.
*(当pipeline第一次开始执行的时候,
没有任何checkpoint,source开始消费最新的数据,)
*
* <h3>写入数据到kafka</h3>
*(以键值对的形式写入kafka)
* <p>KafkaIO sink supports writing
key-value pairs to a Kafka topic. Users can also write
(也可以只写入value值)
* just the values. To configure
a Kafka sink, you must specify
at the minimum Kafka
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
// 设置Key and Value编码
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
// 进一步指定KafkaProducer
// 指定KafkaProducer属性,比如压缩格式等
.updateProducerProperties(ImmutableMap
.of("compression.type", "gzip")));
(通常情况下,有可能只需要写入值到kafka中
那就按照如下的方式进行)
<p>Often you might want to write just
values without any keys to Kafka. Use {@code values()} to
write records with default empty(null) key:
PCollection<String> strings = ...;
strings.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withValueCoder(StringUtf8Coder.of())
.values() //只需要在此写入默认的key就行了,默认为null值
);
*/