kafka生产者发送消息提升效率策略设置

前言

生产环境下,为了尽可能的提升kafka的整体吞吐量,可以对kafka的相关配置参数进行调优,以达到提升整体性能的目的

提升kafka的整体性能有多方面可以进行优化,比如生产端的优化,broker端的配置参数优化,消费端的优化等

本文结合几个关键的配置参数,说说如何从kafka的生产端入手进行消息发送效率的提升

几个关键参数说明

1、batch.size:批次大小,默认16k
(即每次消息攒够一批的数量大小)
2、linger.ms:等待时间,修改为5-100ms
(如果不设置该值,即消息来了就发送出去)
3、compression.type:压缩snappy
(消息积攒后在发送到broker时采用的压缩算法)
4、RecordAccumulator:缓冲区大小,修改为64m 以上几个参数为生产端的几个可用于日常提升消息发送吞吐量的参数,下面从代码层面运用一下这几个参数

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * 设置核心的参数,提升发送消息的效率
 */
public class ProducerCustomParams {

    public static void main(String[] args) throws Exception {

        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        System.out.println("开始发送数据");
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord<>("zcy222", "congge " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null) {
                        System.out.println("发送成功");
                        System.out.println("主题:" + metadata.topic());
                        System.out.println("分区:" + metadata.partition());
                    }
                }
            });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }

}

运行这段程序后,发现跟不添加这些参数没什么区别,有兴趣的同学可以测试发送100万的消息,然后看下耗时情况,毕竟在实际生产环境下,kafka承载的消息发送量是非常大的,调优与不调优,带来的性能上的提升肯定会有非常大的变化

    原文作者:逆风飞翔的小叔
    原文地址: https://blog.csdn.net/congge_study/article/details/123166974
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞