kafka集群安装

kafka集群安装

本人是一名在校大三学生,所选专业大数据技术,为了毕业能有一份可观的工作,
目前正在致力于努力学习中,在学习中的一些笔记和经验,希望可以通过写CSDN
博文记录并和同样正在努力中的你分享

前提

在安装好zookeeper集群的基础上进行安装kafka集群!!!

一、安装包获取

  • 在Windows中在官网下载,然后上传到虚拟机
    链接:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
  • 在虚拟机中通过wget命令下载
    • 首先安装wget:yum -y install wget
    • 下载kafka:进入你放置安装包的目录下执行下面命令
      wget http://archive.apache.org/dist/kafka/2.3.1/kafka_2.11-2.3.1.tgz
  • 百度网盘下载
    下载地址:1111

二、安装kafka

  1. 解压安装包

    tar -zvxf kafka_2.11-2.3.1.tgz -C /opt/module/
    
  2. 解压后修改文件名字

    mv kafka_2.11-2.3.1/ kafka
    
  3. 配置环境变量
    vi /etc/profile
    添加以下配置:

    #kafka
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    

    使配置文件生效
    source /etc/profile

  4. 在/opt/module/kafka目录下创建logs目录**

    mkdir logs
    
  5. 修改/opt/module/kafka/conf目录下的配置文件
    vi server.properties
    将里面的内容修改为一下内容:

    #broker的全局唯一编号,不能重复
    broker.id=0
    #删除topic功能使能
    delete.topic.enable=true
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘IO的现成数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka运行日志存放的路径
    log.dirs=/opt/module/kafka/logs
    #topic在当前broker上的分区个数
    num.partitions=1
    #用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #配置连接Zookeeper集群地址
    zookeeper.connect=ethan001:2181,ethan002:2181,ethan003:2181
    

    注意,记得修改最后的主机名为你的集群中的三台主机名

  6. 分发安装包到集群的另外两台节点

    	scp -r kafka root@ethan001:/opt/module/
    	scp -r kafka root@ethan003:/opt/module/
    

注意:
– 分发之后配置好另外两台的环境变量
– 修改/opt/module/kafka/conf目录下的server.properties文件中的broker.id=1 和broker.id=2

7. 启动kafka集群
启动zookeeper集群之后再启动kafka集群

[root@ethan001 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
[root@ethan002 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
[root@ethan003 kafka]# bin/kafka-server-start.sh -daemon  config/server.properties
  1. 关闭kafka集群
    [root@ethan001 kafka]# bin/kafka-server-stop.sh stop
    [root@ethan002 kafka]# bin/kafka-server-stop.sh stop
    [root@ethan003 kafka]# bin/kafka-server-stop.sh stop
    

三、Kafka命令行操作

启动kafka集群,然后可以进行kafka的相关命令操作

  1. 查看当前服务器中的所有topic

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --list
    
    
  2. 创建topic
    名字:abc

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --create --replication-factor 3 --partitions 1 --topic abc
    
  3. 启动生产者进程并发送消息

    [root@ethan002 kafka]# bin/kafka-console-producer.sh --broker-list ethan002:9092 --topic abc
    >hello word
    >I love guiZhou
    >hello
    
  4. 启动消费者进程并消费消息
    在另外开一个窗口,可以获取生产的数据

    [root@ethan002 kafka]# bin/kafka-console-consumer.sh --bootstrap-server ethan002:9092 --topic abc --from-beginning
    hello word
    I love guiZhou
    hell
    
  5. 查看某个Topic的详情

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --describe --topic abc
    

    结果:

    Topic:abc       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: abc      Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,2
    
    
  6. 修改分区数

    [root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --alter --topic abc --partitions 6
    

    注意:
    选项说明:
    –topic 定义topic名
    –replication-factor 定义副本数
    –partitions 定义分区数

  7. 删除topic

    	[root@ethan002 kafka]# bin/kafka-topics.sh --zookeeper ethan002:2181 --delete --topic abc
    

    注意:
    需要server.properties中设置delete.topic.enable=true否则只是标记删除

    原文作者:徐象
    原文地址: https://blog.csdn.net/IT_technologier/article/details/121551995
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞