kafka集群安装
本人是一名在校大三学生,所选专业大数据技术,为了毕业能有一份可观的工作,
目前正在致力于努力学习中,在学习中的一些笔记和经验,希望可以通过写CSDN
博文记录并和同样正在努力中的你分享
前提
在安装好zookeeper集群的基础上进行安装kafka集群!!!
一、安装包获取
- 在Windows中在官网下载,然后上传到虚拟机
链接:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/ - 在虚拟机中通过wget命令下载
- 首先安装wget:
yum -y install wget
- 下载kafka:进入你放置安装包的目录下执行下面命令
wget http://archive.apache.org/dist/kafka/2.3.1/kafka_2.11-2.3.1.tgz
- 首先安装wget:
- 百度网盘下载
下载地址:1111
二、安装kafka
解压安装包
tar -zvxf kafka_2.11-2.3.1.tgz -C /opt/module/
解压后修改文件名字
mv kafka_2.11-2.3.1/ kafka
配置环境变量
vi /etc/profile
添加以下配置:#kafka export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
使配置文件生效
source /etc/profile
在/opt/module/kafka目录下创建logs目录**
mkdir logs
修改/opt/module/kafka/conf目录下的配置文件
vi server.properties
将里面的内容修改为一下内容:#broker的全局唯一编号,不能重复 broker.id=0 #删除topic功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka运行日志存放的路径 log.dirs=/opt/module/kafka/logs #topic在当前broker上的分区个数 num.partitions=1 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接Zookeeper集群地址 zookeeper.connect=ethan001:2181,ethan002:2181,ethan003:2181
注意,记得修改最后的主机名为你的集群中的三台主机名
分发安装包到集群的另外两台节点
scp -r kafka root@ethan001:/opt/module/ scp -r kafka root@ethan003:/opt/module/
注意:
– 分发之后配置好另外两台的环境变量
– 修改/opt/module/kafka/conf目录下的server.properties文件中的broker.id=1 和broker.id=2
7. 启动kafka集群
启动zookeeper集群之后再启动kafka集群
[root@ethan001 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@ethan002 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@ethan003 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
- 关闭kafka集群
[root@ethan001 kafka]# bin/kafka-server-stop.sh stop [root@ethan002 kafka]# bin/kafka-server-stop.sh stop [root@ethan003 kafka]# bin/kafka-server-stop.sh stop
三、Kafka命令行操作
启动kafka集群,然后可以进行kafka的相关命令操作
查看当前服务器中的所有topic
[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --list
创建topic
名字:abc[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --create --replication-factor 3 --partitions 1 --topic abc
启动生产者进程并发送消息
[root@ethan002 kafka]# bin/kafka-console-producer.sh --broker-list ethan002:9092 --topic abc >hello word >I love guiZhou >hello
启动消费者进程并消费消息
在另外开一个窗口,可以获取生产的数据[root@ethan002 kafka]# bin/kafka-console-consumer.sh --bootstrap-server ethan002:9092 --topic abc --from-beginning hello word I love guiZhou hell
查看某个Topic的详情
[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --describe --topic abc
结果:
Topic:abc PartitionCount:1 ReplicationFactor:3 Configs: Topic: abc Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,2
修改分区数
[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --alter --topic abc --partitions 6
注意:
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
删除topic
[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --delete --topic abc
注意:
需要server.properties中设置delete.topic.enable=true否则只是标记删除