最近再做一个master-worker式的任务调度程序,这个第一版是同事做的,我负责重构。
代码结构在这篇文章里
之前使用kafka来做队列,但是当我重构时发现,worker处理一个message的时间有时候需要几分钟,处理完茶都凉了,consumer和partition的连接都断了,没法commit了。
试了很多方法都不行,最后改成了下边这个丑样子:
def run(self):
consumer = KafkaConsumer(
bootstrap_servers=conf.KAFKA_CONF['host'],
group_id=self.group_id,
enable_auto_commit=False,
auto_offset_reset='earliest',
api_version=(0, 9)
)
consumer.subscribe([self.topic])
while not self.exit.is_set():
poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)
for partition in poll_msgs:
offset = consumer.committed(partition)
try:
consumer.commit({partition: OffsetAndMetadata(offset+1, None)})
except Exception, e:
Log.error(e)
continue
msgs = poll_msgs[partition]
for msg in msgs:
self.process(msg)
consumer.close()
Log.info('%s shutdown' % self.name)
也就是在处理前先commit,如果commit不了了则不处理该partition的任务,等到下一轮poll的时候(poll的时候partition就又连上了)再commit和处理该分区的任务。
真丑。
这个问题的本质在于kafka并不适合我们这个业务场景,kafka的设计是针对实时,高吞吐的场景,它设计时候就不会考虑一个message处理几分钟的情况,所以用起来会变扭死。
正确的做法是技术选型的时候就不应该用Kafka,应该换用其它队列比如redis。