Kafka Streams 入门实例1 WordCount

WordCount

WordCount 堪称大数据界的HelloWorld,相信不管是Hadoop还是Spark等大数据工具的上手实例,第一个十有八九是WordCount。

Kafka Stream也不例外。作为集成在Kafka消息系统上的数据实时处理接口,WordCount也可以作为一个很好的入门实例。

实际上,Kafka官方已经提供了WordCount的Demo,org.apache.kafka.streams.examples.wordcount.WordCountDemo,但亲手实现一遍可以帮助我们快速入门。

逻辑流程

需要记住的是,Kafka中的数据都以<key, value>的形式存在。

假设我们的Kafka中,已经存在一个topic,其中的数据来自于一个文本文件。我们希望编写一个Kafka Streams Application对此topic中的数据进行WordCount计算,大概步骤可以分解为:

  • Stream 从源topic中取出每一行数据记录 (<key, value>格式) —- <null, “Hello World hello”>

  • MapValue 将value中所有文本转换成小写形式 —- <null, “hello world hello”>

  • FlatMapValues 按空格分解成单词 —- <null, “hello”>,<null, “world”>, <null, “hello”>

  • SelectKey 将value的值赋给key —- <“hello”, “hello”>,<“world”, “world”>, <“hello”, “hello”>

  • GroupByKey 按相同的Key分组 —- (<“hello”, “hello”>, <“hello, “hello”>),(<“world”, “world”>)

  • Count 计算每个组中元素个数 —- <“hello”, 2>,<“world”, 1>

  • To 将结果返回Kafka

逻辑代码

首先进行配置,包括Kafka Streams Application的ID,Kafka集群位置等:

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

这里使用Kafka Streams DSL。DSL提供的各种算子大部分情况下都可以满足需求。

使用DSL:

StreamsBuilder builder = new StreamsBuilder();

从Kafka源topic获取数据流:

KStream<String, String> textLines = builder.stream("streams-plaintext-input");

KStream即代表了由各个数据记录组成的数据流。

KStream可以从一或更多topic中的数据得来。
KStream可以进行对数据记录的逐条转换,和其它KStream,KTable进行join操作,或aggregate成KTable。

对得到的KStream进行transformationaggregation

将数据记录中的大写全部替换成小写:

.mapValues(textLine -> textLine.toLowerCase())

将各行数据按空格拆分:

.flatMapValues(textLine -> Arrays.asList(textLine.split(" ")))

将value作为新的key:

.selectKey((key, word) -> word)

aggregation操作前group by key:

.groupByKey()

计算每个组中的元素个数:

.count(Materialized.as("Counts"));

得到结果后将其存储为KTable

KTable<String, Long> wordCounts = textLines
                                  .mapValues ...

最后导入目标topic,其中key为String,value为Long。

wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
    原文作者:表现力
    原文地址: https://www.jianshu.com/p/1434ad5849da
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞