Kafka滥用案例

最近再做一个master-worker式的任务调度程序,这个第一版是同事做的,我负责重构。
代码结构在这篇文章里

之前使用kafka来做队列,但是当我重构时发现,worker处理一个message的时间有时候需要几分钟,处理完茶都凉了,consumer和partition的连接都断了,没法commit了。

试了很多方法都不行,最后改成了下边这个丑样子:

    def run(self):
        consumer = KafkaConsumer(
            bootstrap_servers=conf.KAFKA_CONF['host'],
            group_id=self.group_id,
            enable_auto_commit=False,
            auto_offset_reset='earliest',
            api_version=(0, 9)
        )   
    
        consumer.subscribe([self.topic])
    
        while not self.exit.is_set():
            poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)
            for partition in poll_msgs:
                offset = consumer.committed(partition)
                try:
                    consumer.commit({partition: OffsetAndMetadata(offset+1, None)})
                except Exception, e:
                    Log.error(e)
                    continue

                msgs = poll_msgs[partition]
                for msg in msgs:
                    self.process(msg)
    
        consumer.close()
        Log.info('%s shutdown' % self.name)

也就是在处理前先commit,如果commit不了了则不处理该partition的任务,等到下一轮poll的时候(poll的时候partition就又连上了)再commit和处理该分区的任务。

真丑。

这个问题的本质在于kafka并不适合我们这个业务场景,kafka的设计是针对实时,高吞吐的场景,它设计时候就不会考虑一个message处理几分钟的情况,所以用起来会变扭死。

正确的做法是技术选型的时候就不应该用Kafka,应该换用其它队列比如redis。

    原文作者:衫秋南
    原文地址: https://zhuanlan.zhihu.com/p/27408940
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞