CentOS 搭建Kafka集群

        目录

                1、Kafka下载

                2、安装 Zookeeper 

                3、安装 Kafka

                4、spring boot 整合Kafka 

                5、使用 kraft 注册中心

1、Kafka下载

       进行 Kafka官网 下载 kafka

wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz

《CentOS 搭建Kafka集群》

2、安装 Zookeeper 

        Zookeeper 集群搭建

        可以不使用 zookeeper 集群,单节点 zookeeper 即可。

        后面会有 kraft 作为注册中的说明,不想使用zookeeper的可以使用 kraft 。

3、安装 Kafka

        使用tar命令解压 kafka_2.12-3.1.0.tgz,通过 -C 指定 kafka 的安装目录。

        在 Kafka 根目录下创建 data 文件夹,存放数据文件

[root@server01 software]# tar -zxvf kafka_2.12-3.1.0.tgz -C /usr/local

[root@server01 software]# cd /usr/local/kafka_2.12-3.1.0/

[root@server01 kafka_2.12-3.1.0]# mkdir data

《CentOS 搭建Kafka集群》

        修改Kafka 的配置文件 server.properties 

[root@server01 kafka_2.12-3.1.0]# vi config/server.properties 

        每台服务器的 broker.id 必须唯一,host.name=ip 是新增的一行配置,避免本来连接虚拟机中的Kafka服务器报错。

        在配置zookeeper 的连接地址时,后面需要加上 /kafka ,这样Kafka 的相关配置就在zookeeper的 /kafka 节点下。

        《CentOS 搭建Kafka集群》

        Kafka 集群的IP分别是192.168.19.11、192.168.19.12、192.168.19.11,对应如下:

        Kafka 01 配置:

        broker.id=1
        host.name=192.168.19.11
        listeners=PLAINTEXT://192.168.19.11:9092
        advertised.listeners=PLAINTEXT://192.168.19.11:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        Kafka 02 配置:

        broker.id=2
        host.name=192.168.19.12
        listeners=PLAINTEXT://192.168.19.12:9092
        advertised.listeners=PLAINTEXT://192.168.19.12:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        Kafka 03 配置:

        broker.id=3
        host.name=192.168.19.13
        listeners=PLAINTEXT://192.168.19.13:9092
        advertised.listeners=PLAINTEXT://192.168.19.13:9092
        zookeeper.connect=192.168.19.11:2181,192.168.19.12:2181,192.168.19.13:2181/kafka

        分别启动 Kafka 服务,启动成功后,我们可以使用jps 查看启动情况,也可以通过 Kafka 日志查看 Kafka 是否启动成功,Kafka 日志文件在 Kafka 根目录下 logs/server.log。

[root@server01 kafka_2.12-3.1.0]# sh bin/kafka-server-start.sh -daemon config/server.properties

《CentOS 搭建Kafka集群》

        使用 PrettyZoo 连接zookeeper,查看 Kafka 注册情况

        PrettyZoo GitHub 地址:GitHub PrettyZoo

        PrettyZoo 下载地址:Releases · vran-dev/PrettyZoo · GitHub

《CentOS 搭建Kafka集群》

        ids 分别代表启动的三台 Kafka 服务器,topics 是Kafka 集群上的所有 topic。

4、spring boot 整合Kafka 

        新建 Spring Boot 项目

        引入 kafka 整合 spring 的依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.2</version>
        </dependency>

        application.yml 配置 

server:
  port: 10009
spring:
  application:
    name: spring-boot-kafka
  kafka:
    bootstrap-servers: 192.168.19.11:9092,192.168.19.12:9092,192.168.19.13:9092
    producer:
      retries: 3 # 客户端会发送失败重试次数
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      # 指定消息key和消息体的编码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: consumer-group # 消费者组
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

       使用 KafkaTemplate 发送一条简单的MQ 消息

@RestController
@RequestMapping("/kafka")
public class KafkaTopicController {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/sendMessage")
    public void sendMessage(){
        String topic = "order-topic";
        kafkaTemplate.send(topic,"{orderId:1}");
    }
}

         使用 @KafkaListener(topics = {“order-topic”}) 配置监听的topic。

@Slf4j
@Component
public class KafkaTopicListener {

    @KafkaListener(topics = {"order-topic"})
    public void receiveMessage(String message){
        log.info("接收到的消息:{}" , message);
    }
}

         访问 127.0.0.1:10009/kafka/sendMessage 测试 Kafka 消息是否发送成功。

《CentOS 搭建Kafka集群》

        在 zookeeper 上,我们可以看到刚刚我们新创建的 order-topic ,Kafka 配置文件中,分区数配置的是1 ,num.partitions=1 ,所以 order-topic 下只有一个分区。

《CentOS 搭建Kafka集群》

        代码地址:

        GitHub spring-boot/spring-boot-kafka

5、使用 kraft 注册中心

        Kafka决定未来弃用zookeepr已经有很长时间了,从Kafka 2.8开始,官方提出了Kraft模式,使用raft算法。不过目前仍然处于测试阶段,生产上不建议使用
        使用Kraft模式,元数据会存储再controller节点中的KRaft quorum中。相比与zookeeper模式,Kraft模式更加容易扩展。

        KRaft mode in Kafka 3.1 is provided for testing only, NOT for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode.

        There may be bugs, including serious ones. You should assume that your data could be lost at any time if you try the preview release of KRaft mode.

        下面,我会根据官方给出的文档,搭建一个Kraft集群。 

《CentOS 搭建Kafka集群》

         KRaft 相关的文档和配置,都在 kafka_2.12-3.1.0/config/kraft 目录下面

        《CentOS 搭建Kafka集群》    

        在 kafka_2.12-3.1.0 目录下创建 kraft-combined-logs 文件夹存储数据文件。

《CentOS 搭建Kafka集群》

启动KRaft集群,需要执行以下操作:

       (1)、修改kraft/server.properties
       (2)、生成一个cluster ID
       (3)、格式化存储目录
       (4)、启动Kafka服务

1、修改kraft/server.properties

node.id=1

controller.quorum.voters=1@192.168.19.11:9093,2@192.168.19.12:9093,3@192.168.19.13:9093

listeners=PLAINTEXT://192.168.19.11:9092,CONTROLLER://192.168.19.11:9093

inter.broker.listener.name=PLAINTEXT

log.dirs=/app/kafka-3.1.0/kraft-combined-logs

2、生成一个cluster ID

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-storage.sh random-uuid

3、格式化存储目录

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-storage.sh format -t jkrwjNohQKm2iVf4ttheQQ -c ./config/kraft/server.properties

《CentOS 搭建Kafka集群》

 4、启动Kafka服务

[root@server01 kafka_2.12-3.1.0]# ./bin/kafka-server-start.sh ./config/kraft/server.properties

《CentOS 搭建Kafka集群》

    原文作者:前尘忆梦Memory
    原文地址: https://blog.csdn.net/zhangchaoyang/article/details/124082608
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞