我们都知道淘宝,京东!
每个人都会在上面买东西。我双十一偶尔也剁手买个机械键盘。
当然最多的还是在亚马逊买书,一买都是大几百。
现在屋子里都是书,也是大几百本了。
作为程序员的我们,肯定首关注的是成交量。
这些成交量,是老老实实写进数据库的。
那么另外一些“不老实”的数据,是什么,都写到哪里去了?
我在亚马逊买书的时候,通常会关注推荐给我的书。
亚马逊记录了我的浏览,过段时间会塞到我邮箱里一些书单。
我偶尔打开一看,这些书单还真是想看的书,就顺手下单了。
那么我想这些“不老实”的数据,其实就是我的浏览记录。
成千上万的读者在挑书的时候,都会留下浏览记录。
这些数据就通过流式处理,发送给 Hadoop, Hive , Spark 做计算了。
然后我们登录的时候,就收到类似推荐了。强大的推荐系统!
所以我决定研究一番做类似的数据流处理,需要涉及到哪些领域的知识。
今天是第一篇关于流式处理的文章,以 Kafka 为例。
Kafka 是 LinkedIn 开源的消息处理程序。号称可以支撑百万级的请求。
我们先从简单例子开始:
1 简单例子:
1.1 下载安装包: kafka_2.11-1.0.0.tgz
1.2 设置环境变量:$KAFKA_HOME = /opt/kafka; $PATH=$KAFKA_HOME/bin:$PATH
1.3 运行Kafka 服务:
在运行kafka之前,需要先运行或者找到一台 zookeeper 服务器
可以使用 kafka 安装包下面的 zookeeoper 来做实验
运行 zookeeper 服务:
>zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
运行 kafka 服务:
>kafka-server-start.sh $KAFKA_HOME/config/server.properties
zookeeper 与 kafka 服务进程,都是一个单独的进程。
因此需要在单独的窗口中运行。
当我们启动了这两个服务之后,可以看到进程窗口是一直在不停的诊听客户端动作
kafka-server-start.sh 其实就是启动了一个单机版的伪集群服务进程,即一个Broker.
如果要建立多个broker,那么只要启动多个kafka-server-start.sh即可。
运行多个kafka-server-start.sh进程在同一台机器上,也可以分布在多台机器上。
我们需要一个 kafka 客户端来像 kafka 服务集群发送消息。
1.4 kafka 一个完整的消息传递流程
1.4.1 创建一个 Topic
>kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic ConsoleRealTimeMessage
这里, –zookeeper localhost:2181 在 zookeeper.properties 里面定义好了
1.4.2 围绕着新建的 Topic 发送一些消息
>kafka-console-producer.sh –broker-list localhost:9092 –topic ConsoleRealTimeMessage
在接下来的提示中,随意输入一些消息.
这些信息将会被传入到 Kafka cluster 的 broker 中,并且存储起来。
–broker-list localhost:9092
因为我们安装配置 kafka 的方式是单机版的伪集群,所以这里只有一个 broker.
为什么是9092这个端口,而不是其他端口。这个端口可以在配置文件中指定:
$KAFKA_HOME/config/server.properties
默认的端口便是 9092
–topic 指定像哪一个Topic 发送消息。
新发送的消息即将被送到指定的 topic 下面。
topic 有可能被分区,因此新发送的消息根据分区策略转移到相应分区并存储起来。
如果我们指定一个并不存在的 topic , 发送消息该如何处理?
lumatoMacBook-Pro:config lewis$ kafka-console-producer.sh –topic realmessage –broker-list localhost:9092
>mesage from lenis
[2018-02-22 10:32:27,132] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {realmessage=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
提示的是 partition leader 不可用。
因此在发送消息之前,一定要先建立对应的 topic
虽然如此,但是消息还是能接收的到:
lumatoMacBook-Pro:config lewis$ kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic realmessage –from-beginning
mesage from lenis
1.4.3 启动一个 Consumer, 并开始接受消息
> kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ConsoleRealTimeMessage –from-beginning
–bootstrap-server localhost:9092
这是 broker 的server 地址;
–topic 指定从哪个 topic 来接收数据。
如果不指定或者指定一个不存在的topic,则也会报 Topic Partition Leader 不可用的错误
–from-beginning:
如果不指定,则只接收从当前时间开始,之后 producer 产生的消息
1.5 配置一个 Kafka 集群
kafka 集群,本质就是启动多个 kafka broker.
kafka broker 是由这个命令启动的:
>kafka-server-start.sh $KAFKA_HOME/config/server.properties
理论上,只要server.properties配置妥当,
在哪台机器上启动kafka-server-start.sh是没有限制的
因此我们现在本地配置一个简单的单机版集群:
1.5.1 复制多个 server.properties, 命名为
$KAFKA_HOME/config/server1.properties
$KAFKA_HOME/config/server2.properties
1.5.2 编辑这两个新加的配置文件,如下:
server1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
server2.properties:
broker.id=2
listeners=PLAINTEXT://9094
log.dir=/tmp/kafka-logs-2
1.5.3 分别在两个进程中,调kafka-server-start.sh, 指定不同的配置文件
>kafka-server-start.sh $KAFKA_HOME/config/server1.properties
>kafka-server-start.sh $KAFKA_HOME/config/server2.properties
1.5.4 新建topic, 分发消息,接收消息
当我们是往旧的单机版kafka cluster加入新增的 2 台 kafka broker时
使用的 zookeeper 是同一个机器,所以自动就被设置为同一个cluster
做这么一个假设的前提,是因为我仅仅在两个不同窗口启动了kafka-server-start.sh。配置文件也仅仅是修改了前面的3个项目
而无论我是用localhost:9092, localhost:9093, localhost:9094 发消息
消息始终都可以发到kafka cluster中
并且我无论指向boostrap-server localhost:9092, localhost:9093, localhost:9094
都可以读到发送出来的消息
使用以下命令查看 topic 的分区,replication 信息:
kafka-topics.sh –describe –zookeeper localhost:2181 –topic ConsoleRealTimeMessage
1.5.5 关掉一个kafka broker进程,分发消息,接收消息
假如我们关掉一个 kafka broker:
> ps | grep server1.properties
> kill -9 7553
这时,不能通过这台 kafka broker 来发送消息,也不能通过这台 kafka broker 来接收数据
但是其他两台 kafka broker依然是可以正常工作的
那么这里有个问题了:
既然都可以正常工作,我们设置某一个topic 的replication 数量为 3 ,有什么意义呢?(下篇继续讲)
建立一个有 3 个replica 的 topic:
>kafka-topics.sh –create –zookeeper localhost:2181 –topic ConsoleRealTimeMessageReplica –replication-factor 3 –partitions 1
注意:前面关掉了一个 kafka broker, 所以当 replication factor 超过可用的 kafka broker 数量
就会报错:
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
1.6 使用 Kafka Connect 来完成导入导出
导入数据到 kafka :
>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties
第一个参数是配置 kafka broker 的常用配置
第二个参数是connect的connector配置,可以指定数据源是什么。
如果能理解 connect-standalone.sh 的运行,将其中获取数据的代码,套入到自己的程序
就可以实现往kafka cluster传送数据了
数据源的配置文件($KAFKA_HOME/config/connect-file-source.properties)如下:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/dataexport/hongKongTravel.txt
topic=connect-test
导出数据到目的地:
>connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
配置文件($KAFKA_HOME/config/connect-file-sink.properties)如下:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/dataexport/hongkong.txt
topics=connect-test
当然我们也可以使用 kafka-consumer 连接到 kafka cluster 读取对应的 topic 消息:
kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic Connect-Test –from-beginning
发现 Kafka 的官方文档写的非常好。
简单,引人入胜。
通过这么个可用的例子,给了我们最直观的印象。
如果需要套入自己的网站或者 App 应用里面,
就需要使用 Kafka Api 了。