Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。下面介绍有关Kafka的简单安装和使用,想全面了解Kafka,请访问Kafka的官方博客
安装
Kafka的使用依赖于zookeeper,安装Kafka前必须先安装zookeeper.
执行完如上命令,系统自动安装好zookeeper和kafka.
brew install kafka
➜ ~ brew info kafka
kafka: stable 0.11.0.1 (bottled)
Publish-subscribe messaging rethought as a distributed commit log
https://kafka.apache.org
/usr/local/Cellar/kafka/0.11.0.1 (608 files, 43.3MB) *
Poured from bottle on 2017-09-21 at 18:45:53
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/kafka.rb
==> Dependencies
Required: zookeeper ✔
==> Requirements
Required: java = 1.8 ✔
==> Caveats
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
核心概念
下面介绍Kafka相关概念,以便运行下面实例的同时,更好地理解Kafka.
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) - Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition. - Producer
负责发布消息到Kafka broker - Consumer
消息消费者,向Kafka broker读取消息的客户端。 - Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
测试简单实例
接下来在Mac系统环境下测试简单的实例。Ubuntu系统请自己按照安装的位置,切换到相应的指令。按顺序执行如下命令:
brew services start zookeeper
brew services start kafka
此时输入kafka+tab,就可以发现系统有很多可用的命令了,如下:
➜ ~ kafka-
kafka-acls kafka-consumer-groups kafka-preferred-replica-election kafka-run-class kafka-topics
kafka-broker-api-versions kafka-consumer-offset-checker kafka-producer-perf-test kafka-server-start kafka-verifiable-consumer
kafka-configs kafka-consumer-perf-test kafka-reassign-partitions kafka-server-stop kafka-verifiable-producer
kafka-console-consumer kafka-delete-records kafka-replay-log-producer kafka-simple-consumer-shell
kafka-console-producer kafka-mirror-maker kafka-replica-verification kafka-streams-application-reset
命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务。输入如下命令:
➜ ~ kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic depth
Created topic “depth”.
topic是发布消息发布的category,以单节点的配置创建了一个叫depth的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。
➜ ~ kafka-topics –list –zookeeper localhost:2181
depth
可以在结果中查看到depth这个topic存在。接下来用producer生产点数据:
➜ ~ kafka-console-producer –broker-list localhost:9092 –topic depth
1
3
xx
然后再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据,输入如下命令:
➜ ~ kafka-console-consumer –zookeeper localhost:2181 –topic depth –from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1
3
xx
bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
根据提示,我们调用最新的命令:
➜ ~ kafka-console-consumer –bootstrap-server localhost:9092 –topic depth –from-beginning
1
3
xx
便可以看到刚才产生的三条信息。说明kafka安装成功。
查看每个broker都在做些什么:
➜ ~ kafka-topics –describe –zookeeper localhost:2181