背景
随着业务的发展,项目组有大量的任务需要处理。
这些任务需要主要分为两种类型:
- 通过接口调用, 后台执行任务
- 通过调度系统定时执行
接口调用就需要执行任务不能阻塞, 不然系统的处理能力就会下降。任务调度系统需要在在一个最小的检测粒度时间内,执行完所有任务。这两种情况都面临这样一个问题, 任务不能阻塞,不然会非常影响性能。所以需要引入消息中间件,将任务派发方和任务执行方分离出来。
在这种情况下, 我们选择了kafka作为了我们的消息中间件, 选择kafka主要基于以下几点:
- 支持分布式, 避免单点问题
- 技术方案成熟, 公司内部有上线项目
- 性能优异, 能够持久化消息
遇到的问题
我们团队在kafka使用上面都没有经验, 其他同事说kafka consumer在消费超时后会掉线,导致重复消费,当时没有这个使用场景,不能理解这个概念。
第一次发现问题是在联调的时候,任务执行方发现consumer会打印出错误日志,重复消费,并且陷入循环。
当时很快定位到问题, consumer长时间没有发送心跳包, 导致触发rebalance操作, consumer被踢下线了。
对于这个问题,需要详细讲述一下kafka consumer相关的机制。
kafka为了保证partition分配的高效率, 使用了如下机制:
- 所有的consumer都要和coordinator连接
- coordinator选出一个consumer作为leader来分配partition
- leader分配完以后通知coordinator, 由coordinator来通知给其他consumer
- 如果一个consumer不能工作了, coordinator会触发rebalance机制,重新分配partition
coordinator判定一个consumer不能工作, 依靠的就是heartbeat机制。consumer的配置里面有一项是session_timeout,如果heartbeat不能在session_timeout时间内发出一次请求,coordinator就会触发一次rebalance操作,重新分配partition。
从上面这样看没什么问题,很多系统都是这么设计的,一个工作线程,一个心跳包线程。但是kafka consumer为了设计上的简单(或者是出于其他目的),他们只有一个线程,也就是说工作逻辑和心跳包逻辑是同步的。对于心跳包这种定时任务,他们使用了一种叫做delayed_task的方案。
delayed_task是Best-Effort的,为什么这么说呢,我们来看看delayed_task是在什么时候工作的:
- 取出一批数据
- 执行delayed_task
- 循环yield 这批数据
- 重复执行上述过程
前面我们也说过, consumer只有一个线程, 也就意味着,如果主逻辑消耗了大量时间,delayed_task中的任务就会延期执行。在这种情况下, delayed_task只能保证任务不会提前进行,不能保证任务准时执行。拿一个具体的场景来说, 如果主逻辑花费了60s, 那么delayed_task中的任务最早也只能在60s之后执行,像heartbeat任务就直接超时了。
在提出解决方案之前, 我们需要考虑一下几个问题:
生产者速度大于消费者速度怎么处理
如果生产者速度大于消费者速度,消息就会积累。常规的解决方案是增加partition,增加消费者数量,但是在某一些场景下却不能这么实现。思考一下,如果生产者的速度不是恒定的,而是波动的,并且波峰和波谷差距比较大,大部分时间出于波谷,这样在波谷时其实资源是闲置的,并且会降低消费速度。另外对于消费的实时性比较高的场景,如果短时间内消息被积压,纵然最后能够消费掉,但是已经过了有效期,这样的消费其实是无效的。
所以我们必须有能力知道两个数据,即当前队列剩余的消息的数量和当前消息产生的时间。
在消费速度不一致的情况下如何提交offset
kafka-consumer的offset的提交机制是定时向delayed_task里面加入一个AutoCommitTask。但是在消费者消费速度不均衡的情况下不能这么做,如果消费者消费速度比较快,定时提交offset的机制会使得一旦consumer宕机,会丢失一大批消费信息。
同时我们也不能单纯的以消费数量作为是否提交的阈值,在消费者比较消费速率比较慢的情况下,一旦consumer宕机,我们会耗费大量时间在无用的消费上面。
所以我们需要同时衡量数量和时间两个变量,作为我们是否提交的阈值
offset提交失败该怎么处理
consumer的offset提交是按照TopicPartition作为提交单元的。在consumer消费过程中,可能会发生reblance事件,如果当前consumer分配到的partition数量大于1个,可能这个partition会被分配给其他的consumer。在这个过程中,consumer已经消费了该条数据,那么在提交offset的时候,就会遇到CommitOffsetError,因为这个partition已经不属于自己了。
这种情况下该如何处理这些数据
解决方案
带着上面的一些问题,我们开始着手提出解决方案。
从上面的分析可以看出来, consumer掉线的最主要问题就是delayed_task和主函数出于同一个工作线程中,那么最直观的解决方法就是将这两个分离出来。
由于python GIL的限制,加上kafka consumer 是线程不安全的, 所以我们使用多进程来解决这个问题。
在consumer中,除了迭代器_message_generator之外,还提供了一个poll函数。这个函数和迭代器功能差不多,也能够获取消息,同时也会执行delayed_task。不同之处是, 这个函数会一次性返回一批数据,这样我们就有能力统计剩下的消息的数量。同时我们要求在producer发送消息的时候,一定要带上create_time这个字段,标注消息产生的时间。客户端现在同时能获取数量和时间两个参数,对于实时性要求比较高的场景,他就可以选择性的丢弃一批不满足要求的数据。
当消费者消费速度比较低的时候,我们需要停止获取数据,但是同时不能停下delayed_task。幸运的是,consumer提供了一个pause的函数,可以让我们停止对应的partition。一旦使用pause函数,poll函数将不会返回任何数据,单他依然会执行delayed_task。
由于我们使用poll函数一次性返回多个数据,加上在消费速度不均衡的情况下offset管理的问题。所以我们必须要手动管理offset, 保存我们上次提交offset的时间和未提交offset的数量,一旦其中某一个达到阈值,就真正的提交offset。
当我们提交offset失败的时候,我们需要清除对应的partition的所有数据,防止consumer做无用消费。
综合上面,我们就有能力构造出一个强健的consumer客户端,方便其他同学来使用。
核心代码
while True:
topic_records = self.consumer.poll().values()
if not topic_records:
self.get_offset()
time.sleep(self.config['idle_timeout'])
self.consumer.pause(*self.consumer.assigment())
paused = True
for records in topic_records:
remain = len(records)
for record in records:
while True:
data = {"record":record, "remain": remain}
try:
self.task_queue.put(data, self.config['block_timeout'])
remain -= 1
break
except Full:
self.consumer.poll()
self.get_offset()
if self.task_queue.qsize < self.config['resumen_count'] and paused:
partitions = self.consumer.paused()
if partitions:
self.consumer.resume(*partitions)