【Kafka】消息的同步发送和异步发送

文章目录

概述

kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定。

1. sync vs async

在官方文档Producer Configs中有如下:
《【Kafka】消息的同步发送和异步发送》
翻译过来就是:
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

为什么是后台线程进行发送? 其实client调用发送接口,所有的数据被临时加入请求队列InFlightRequest,后台线程是通过 读取该队列的数据,进行发送操作的。

对于异步模式,还有4个配套的参数,如下:

PropertyDefaultDescription
queue.buffering.max.ms5000启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages10000启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms-1当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages200启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

注:这里的参数是指安装包中的shell脚本命令,而java客户端代码,需要用相应的语法

总结:

  • 同步方式,一定是逐条发送的,第一条响应到达后,才会请求第二条
  • 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次

1.1 java代码同步和异步

同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send("testJson", message).get();

异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() { 
    @Override
    public void onFailure(Throwable ex) { 
        ex.printStackTrace();
    }

    @Override
    public void onSuccess(SendResult<String, Message> result) { 
        System.out.println(result.getProducerRecord());
        System.out.println(result.getRecordMetadata());
    }
});

2. 可靠性机制(ack属性配置)

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks”,这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。

  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。

  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

2.1 oneway

前面只提到了sync和async,那么oneway是什么呢?
oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks设置为0。

oneway的效果也是异步的。因此,设置同步和异步非时候,要综合考虑。

3. 一般配置

对于sync的发送方式:

producer.type=sync
request.required.acks=1

对于async的发送方式:

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

对于oneway的发送发送:

producer.type=async           '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0

4. 同步异步和ack的联系和区别

《【Kafka】消息的同步发送和异步发送》
上图分析:

  • 当用户调用send时,就完成数据发送了(对于用户来说),后台线程负责实际发送数据,因此,新版本下,我们说数据发送总是异步的。

  • send()方法每次只能发送一条数据至InFlightRequest队列

  • 用户可以通过send().get() ,把用户主线程改为同步方式
    因此,同步和异步概念 分为用户线程和发送线程,用户线程有同步和异步之分;发送线程只有异步

    用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。

    max.in.flight.requests.per.connection控制只能发送一次请求,发送次数有个窗口,控制该窗口的值,但是每次可发送一批数据;batch.size是控制一批数据的上限,当batch.size=1时,每次最多发送一条。组合在一起就是 只能连续发送一次请求,每次最多发送一条。

同步和异步指client(producer)是否收到leader给的ack后才发下一条,acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,因此可以是下面的组合

同步+ack任意值
异步+ack任意值

但是由于ack的选项有3个,会有最佳搭配的概念,例如:

producer.type=async           '既然都已经设置ack=0忽略高可用性了,也就没必要设置为同步了'
request.required.acks=0

既然都已经设置ack=0忽略高可靠性了,ack=0牺牲可靠性换取速度,也就没必要设置为同步了,设置为异步又可以提高数据

参考

Kafka之sync、async以及oneway
kafka 同步、异步发送java代码同步和异步

    原文作者:云川之下
    原文地址: https://blog.csdn.net/m0_45406092/article/details/119546471
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞