kafka集群(docker环境)及springboot整合

kafka集群(docker环境)及springboot整合

kafka理论

一、kafka集群搭建

1、环境准备

  • linux环境(vm环境)

  • docker环境

  • zookeeper 环境

    kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。

    参考zookeeper集群(docker)搭建
    《kafka集群(docker环境)及springboot整合》
    如图三个zookeeper容器组成的集群

2、kafka集群搭建

1、集群规划

《kafka集群(docker环境)及springboot整合》

2、集群搭建

  • 拉取镜像

    docker pull wurstmeister/kafka
    

《kafka集群(docker环境)及springboot整合》

  • 创建容器

    docker run -d \
    --name=kafka1 \
    --restart=always \
    -p 9092:9092 \
    --network=my-net \ 
    -e KAFKA_ADVERTISED_HOST_NAME=192.168.48.131 \ 
    -e HOST_IP=192.168.48.131:9092 \
    -e KAFKA_ADVERTISED_PORT=9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2182,zookeeper3:2183 \ 
    -e KAFKA_BROKER_ID=0 \
    wurstmeister/kafka:latest
    

    参数说明:

    • –network: 使用docker 自定义的网络通道
    • KAFKA_ADVERTISED_HOST_NAME:宿主机地址
    • KAFKA_ADVERTISED_PORT:宿主机端口
    • KAFKA_ZOOKEEPER_CONNECT:zookeeper集群地址
    • KAFKA_BROKER_ID:broked.id集群中必须唯一
    • HOST_IP:暴露的宿主机地址

如上创建三个容器

《kafka集群(docker环境)及springboot整合》

注:修改容器名称与端口号

3、kafka集群监控

使用KafkaOffsetMonitor-assembly-0.4.6.jar对kafka集群监控

1、在/opt/module/下创建kafka-offset-console文件夹

2、将上传的jar包放入刚创建的目录下

3、在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹

4、在/opt/module/kafka-offset-console目录下创建启动脚本start.sh

java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers 192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 \
--kafkaSecurityProtocol PLAINTEXT \
--zk 192.168.48.131:2181,192.168.48.131:2182,192.168.48.131:2183 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &

5、启动监控

./start.sh

6、在主机访问测试
《kafka集群(docker环境)及springboot整合》

致此集群搭建完成;

二、springboot整合

1、导入依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置文件

server.port=8080
#============== kafka ===================
spring.kafka.bootstrap-servers=192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094

#=============== provider  =======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.interceptor.class=com.example.demo.Interceptor.TimeInterceptor,com.example.demo.Interceptor.CounterInterceptor

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、配置类

@Configuration
public class KafkaConfigration { 

    @Autowired
    private KafkaProperties properties;

    @Value("#{'${spring.kafka.producer.interceptor.class}'.split(',')}")
    private ArrayList<String> interceptors;

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) { 
        Map<String, Object> map = this.properties.buildProducerProperties();
        map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(map);
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) { 
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }
}

4、Controller层

@Autowired
private KafkaTemplate kafkaTemplate;
//从前端接收消息,并调用生产者封装的api发送消息
@GetMapping("/sendMassage/{massage}")
public String sendMassage(@PathVariable("massage") String massage){ 
    kafkaTemplate.send("first", JSON.toJSONString(massage));
    return "消息已发送";
}

5、消费消息

    @KafkaListener(topics = { "first"})
    public String receMassage(ConsumerRecord<?,?> consumerRecord){ 
        //判断是否为null
        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if(kafkaMessage.isPresent()){ 
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            System.err.println("消费消息:"+message);
        }
        return null;
    }

6、拦截器

在拦截器中对消息进行处理

1、时间拦截器

@Component
public class TimeInterceptor implements ProducerInterceptor<String, String> { 

    @Override
    public void configure(Map<String, ?> map) { 

    }

    /* * 创建一个新的record,把时间戳写入消息体的最前部 * */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { 
        return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                new SimpleDateFormat("yyyy/MM/dd HH-mm-ss").format(System.currentTimeMillis()) + "," + producerRecord.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { 

    }

    @Override
    public void close() { 

    }
}

2、计数拦截器

@Component
public class CounterInterceptor implements ProducerInterceptor<String, String> { 

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { 
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { 
        // 统计成功和失败的次数
        if (e == null) { 
            successCounter++;
        } else { 
            errorCounter++;
        }
    }

    @Override
    public void close() { 
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    @Override
    public void configure(Map<String, ?> map) { 

    }
}

7、日志配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <contextName>logback</contextName>
    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>ERROR</level> </filter>-->
        <encoder>
            <pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="logFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <Prudent>true</Prudent>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>
                poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log
            </FileNamePattern>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} -%msg%n
            </Pattern>
        </layout>
    </appender>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <layout class="ch.qos.logback.classic.PatternLayout">
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            </Pattern>
        </layout>
    </appender>

    <logger name="org.apache.kafka" level="warn">
        <appender-ref ref="STDOUT" />
    </logger>
    <root level="INFO,ERROR">
        <appender-ref ref="console" />
        <appender-ref ref="logFile" />
    </root>
</configuration>

8、测试

《kafka集群(docker环境)及springboot整合》
《kafka集群(docker环境)及springboot整合》

    原文作者:豪本豪
    原文地址: https://blog.csdn.net/WuHuaRou123/article/details/121161000
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞