文章目录
概述
kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定。
1. sync vs async
在官方文档Producer Configs中有如下:
翻译过来就是:
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
为什么是后台线程进行发送? 其实client调用发送接口,所有的数据被临时加入请求队列InFlightRequest,后台线程是通过 读取该队列的数据,进行发送操作的。
对于异步模式,还有4个配套的参数,如下:
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
注:这里的参数是指安装包中的shell脚本命令,而java客户端代码,需要用相应的语法
总结:
- 同步方式,一定是逐条发送的,第一条响应到达后,才会请求第二条
- 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次
1.1 java代码同步和异步
同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send("testJson", message).get();
异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println(result.getProducerRecord());
System.out.println(result.getRecordMetadata());
}
});
2. 可靠性机制(ack属性配置)
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks”,这个参数决定了producer要求leader partition收到确认的副本个数:
如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
2.1 oneway
前面只提到了sync和async,那么oneway是什么呢?
oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks
设置为0。
oneway的效果也是异步的。因此,设置同步和异步非时候,要综合考虑。
3. 一般配置
对于sync的发送方式:
producer.type=sync
request.required.acks=1
对于async的发送方式:
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
对于oneway的发送发送:
producer.type=async '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0
4. 同步异步和ack的联系和区别
上图分析:
当用户调用send时,就完成数据发送了(对于用户来说),后台线程负责实际发送数据,因此,新版本下,我们说数据发送总是异步的。
send()方法每次只能发送一条数据至InFlightRequest队列
用户可以通过send().get() ,把用户主线程改为同步方式
因此,同步和异步概念 分为用户线程和发送线程,用户线程有同步和异步之分;发送线程只有异步用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。
max.in.flight.requests.per.connection控制只能发送一次请求,发送次数有个窗口,控制该窗口的值,但是每次可发送一批数据;batch.size是控制一批数据的上限,当batch.size=1时,每次最多发送一条。组合在一起就是 只能连续发送一次请求,每次最多发送一条。
同步和异步指client(producer)是否收到leader给的ack后才发下一条,acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,因此可以是下面的组合
同步+ack任意值
异步+ack任意值
但是由于ack的选项有3个,会有最佳搭配的概念,例如:
producer.type=async '既然都已经设置ack=0忽略高可用性了,也就没必要设置为同步了'
request.required.acks=0
既然都已经设置ack=0忽略高可靠性了,ack=0牺牲可靠性换取速度,也就没必要设置为同步了,设置为异步又可以提高数据
参考
Kafka之sync、async以及oneway
kafka 同步、异步发送java代码同步和异步