学点 Kafka 流处理

我们都知道淘宝,京东!

每个人都会在上面买东西。我双十一偶尔也剁手买个机械键盘。

当然最多的还是在亚马逊买书,一买都是大几百。

现在屋子里都是书,也是大几百本了。

作为程序员的我们,肯定首关注的是成交量。

这些成交量,是老老实实写进数据库的。

那么另外一些“不老实”的数据,是什么,都写到哪里去了?

我在亚马逊买书的时候,通常会关注推荐给我的书。

亚马逊记录了我的浏览,过段时间会塞到我邮箱里一些书单。

我偶尔打开一看,这些书单还真是想看的书,就顺手下单了。

那么我想这些“不老实”的数据,其实就是我的浏览记录。

成千上万的读者在挑书的时候,都会留下浏览记录。

这些数据就通过流式处理,发送给 Hadoop, Hive , Spark 做计算了。

然后我们登录的时候,就收到类似推荐了。强大的推荐系统!

所以我决定研究一番做类似的数据流处理,需要涉及到哪些领域的知识。

今天是第一篇关于流式处理的文章,以 Kafka 为例。

Kafka 是 LinkedIn 开源的消息处理程序。号称可以支撑百万级的请求。

我们先从简单例子开始:

1 简单例子:

1.1 下载安装包: kafka_2.11-1.0.0.tgz

1.2 设置环境变量:$KAFKA_HOME = /opt/kafka; $PATH=$KAFKA_HOME/bin:$PATH

1.3 运行Kafka 服务:

在运行kafka之前,需要先运行或者找到一台 zookeeper 服务器

可以使用 kafka 安装包下面的 zookeeoper 来做实验

运行 zookeeper 服务:

>zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

运行 kafka 服务:

>kafka-server-start.sh $KAFKA_HOME/config/server.properties

zookeeper 与 kafka 服务进程,都是一个单独的进程。

因此需要在单独的窗口中运行。

当我们启动了这两个服务之后,可以看到进程窗口是一直在不停的诊听客户端动作

kafka-server-start.sh 其实就是启动了一个单机版的伪集群服务进程,即一个Broker.

如果要建立多个broker,那么只要启动多个kafka-server-start.sh即可。

运行多个kafka-server-start.sh进程在同一台机器上,也可以分布在多台机器上。

我们需要一个 kafka 客户端来像 kafka 服务集群发送消息。

1.4 kafka 一个完整的消息传递流程

1.4.1 创建一个 Topic

>kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic ConsoleRealTimeMessage

这里, –zookeeper localhost:2181 在 zookeeper.properties 里面定义好了

1.4.2 围绕着新建的 Topic 发送一些消息

>kafka-console-producer.sh –broker-list localhost:9092 –topic ConsoleRealTimeMessage

在接下来的提示中,随意输入一些消息.

这些信息将会被传入到 Kafka cluster 的 broker 中,并且存储起来。

–broker-list localhost:9092

因为我们安装配置 kafka 的方式是单机版的伪集群,所以这里只有一个 broker.

为什么是9092这个端口,而不是其他端口。这个端口可以在配置文件中指定:

$KAFKA_HOME/config/server.properties

默认的端口便是 9092

–topic 指定像哪一个Topic 发送消息。

新发送的消息即将被送到指定的 topic 下面。

topic 有可能被分区,因此新发送的消息根据分区策略转移到相应分区并存储起来。

如果我们指定一个并不存在的 topic , 发送消息该如何处理?

lumatoMacBook-Pro:config lewis$ kafka-console-producer.sh –topic realmessage –broker-list localhost:9092

>mesage from lenis

[2018-02-22 10:32:27,132] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {realmessage=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

提示的是 partition leader 不可用。

因此在发送消息之前,一定要先建立对应的 topic

虽然如此,但是消息还是能接收的到:

lumatoMacBook-Pro:config lewis$ kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic realmessage –from-beginning

mesage from lenis

1.4.3 启动一个 Consumer, 并开始接受消息

> kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ConsoleRealTimeMessage –from-beginning

–bootstrap-server localhost:9092

这是 broker 的server 地址;

–topic 指定从哪个 topic 来接收数据。

如果不指定或者指定一个不存在的topic,则也会报 Topic Partition Leader 不可用的错误

–from-beginning:

如果不指定,则只接收从当前时间开始,之后 producer 产生的消息

1.5 配置一个 Kafka 集群

kafka 集群,本质就是启动多个 kafka broker.

kafka broker 是由这个命令启动的:

>kafka-server-start.sh $KAFKA_HOME/config/server.properties

理论上,只要server.properties配置妥当,

在哪台机器上启动kafka-server-start.sh是没有限制的

因此我们现在本地配置一个简单的单机版集群:

1.5.1 复制多个 server.properties, 命名为

$KAFKA_HOME/config/server1.properties

$KAFKA_HOME/config/server2.properties

1.5.2 编辑这两个新加的配置文件,如下:

server1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

server2.properties:

broker.id=2

listeners=PLAINTEXT://9094

log.dir=/tmp/kafka-logs-2

1.5.3 分别在两个进程中,调kafka-server-start.sh, 指定不同的配置文件

>kafka-server-start.sh $KAFKA_HOME/config/server1.properties

>kafka-server-start.sh $KAFKA_HOME/config/server2.properties

1.5.4 新建topic, 分发消息,接收消息

当我们是往旧的单机版kafka cluster加入新增的 2 台 kafka broker时

使用的 zookeeper 是同一个机器,所以自动就被设置为同一个cluster

做这么一个假设的前提,是因为我仅仅在两个不同窗口启动了kafka-server-start.sh。配置文件也仅仅是修改了前面的3个项目

而无论我是用localhost:9092, localhost:9093, localhost:9094 发消息

消息始终都可以发到kafka cluster中

并且我无论指向boostrap-server localhost:9092, localhost:9093, localhost:9094

都可以读到发送出来的消息

使用以下命令查看 topic 的分区,replication 信息:

kafka-topics.sh –describe –zookeeper localhost:2181 –topic ConsoleRealTimeMessage

1.5.5 关掉一个kafka broker进程,分发消息,接收消息

假如我们关掉一个 kafka broker:

> ps | grep server1.properties

> kill -9 7553

这时,不能通过这台 kafka broker 来发送消息,也不能通过这台 kafka broker 来接收数据

但是其他两台 kafka broker依然是可以正常工作的

那么这里有个问题了:

既然都可以正常工作,我们设置某一个topic 的replication 数量为 3 ,有什么意义呢?(下篇继续讲)

建立一个有 3 个replica 的 topic:

>kafka-topics.sh –create –zookeeper localhost:2181 –topic ConsoleRealTimeMessageReplica –replication-factor 3 –partitions 1

注意:前面关掉了一个 kafka broker, 所以当 replication factor 超过可用的 kafka broker 数量

就会报错:

Error while executing topic command : Replication factor: 3 larger than available brokers: 2.

1.6 使用 Kafka Connect 来完成导入导出

导入数据到 kafka :

>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties

第一个参数是配置 kafka broker 的常用配置

第二个参数是connect的connector配置,可以指定数据源是什么。

如果能理解 connect-standalone.sh 的运行,将其中获取数据的代码,套入到自己的程序

就可以实现往kafka cluster传送数据了

数据源的配置文件($KAFKA_HOME/config/connect-file-source.properties)如下:

name=local-file-source

connector.class=FileStreamSource

tasks.max=1

file=/opt/dataexport/hongKongTravel.txt

topic=connect-test

导出数据到目的地:

>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties

配置文件($KAFKA_HOME/config/connect-file-sink.properties)如下:

name=local-file-sink

connector.class=FileStreamSink

tasks.max=1

file=/opt/dataexport/hongkong.txt

topics=connect-test

当然我们也可以使用 kafka-consumer 连接到 kafka cluster 读取对应的 topic 消息:

kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic Connect-Test –from-beginning

发现 Kafka 的官方文档写的非常好。

简单,引人入胜。

通过这么个可用的例子,给了我们最直观的印象。

如果需要套入自己的网站或者 App 应用里面,

就需要使用 Kafka Api 了。

    原文作者:黄赟
    原文地址: https://zhuanlan.zhihu.com/p/33973076
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞