1. 问题描述
使用aiokafka消费kafka和aioredis写入redis。在峰值时候,kafka每秒有5万条数据。 之前是同步操作的,用的kafka-python和redis。每次poll一千条消息,然后批量写入redis。
现在换成这2个IO换成异步方式,然后并没有效率提升,反而下降了,莫名其妙的。
2. 代码演示
def filter_msg_handler(topic_str, msg_lists, filter_params):
"""返回list 供insert_msg_to_redis 使用"""
data = []
for msg_info in msg_lists:
"""
这里省略
"""
data.append(temp_tuple)
return data
async def insert_msg_to_redis(msg_tuple, loop):
"""
这里异步pipeline 写入redis,有超过1000条数据
"""
redis_host = (redis_write_config.get("host"), redis_write_config.get("port"))
redis_client = await aioredis.create_redis(address=redis_host,
password=redis_write_config.get("password"),
loop=loop)
pipe = redis_client.pipeline()
for k, msg in msg_tuple:
if not all([k, msg]):
continue
""" 有些字段不能暴露 """
temp_msg_info = json.dumps(msg)
redis_key = "%s_aaaaaaaaaaaaa:%s" % (_redis_const, k)
pipe.zadd(redis_key, temp_time, temp_member)
redis_key_list = "%s_aaaaa:%s" % (_redis_const, k)
pipe.lpush(redis_key_list, temp_msg_info)
result = await pipe.execute()
redis_client.close()
await redis_client.wait_closed()
async def run(topic, filter_params, loop):
"""
"""
topic_group = "%s_aaaaaaa" % (topic,)
consumer = AIOKafkaConsumer(topic,
group_id=topic_group,
loop=loop,
bootstrap_servers=_kafka_hosts,
session_timeout_ms=2 * 10000,
heartbeat_interval_ms=2 * 3000,
max_partition_fetch_bytes=15 * 1024 * 1024,
)
await consumer.start()
while True:
result = await consumer.getmany(timeout_ms=1 * 1000, max_records=1000)
for tp, messages in result.items():
if messages:
# 过滤然后写入redis
try:
res = filter_msg_handler(topic, messages, filter_params)
# 这里批量1000条结果写入一次
task = loop.create_task(insert_msg_to_redis(res, loop))
except Exception as e:
logging.error(traceback.print_exc())
def main(topic, filter_params):
"""
顶级入口
"""
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(run(topic, filter_params, event_loop))
finally:
event_loop.close() # 当轮训器关闭以后,所有没有执行完成的协成将全部关闭
if __name__ == '__main__':
""" """
try:
p = Pool(partition_len)
for x, _ in poll_data:
_topic_filter_config = _filter_config_format.get(x) or []
p.apply_async(main, args=(x, _topic_filter_config))
p.close()
p.join()
except Exception as e:
logging.info(traceback.print_exc())
3. 问题
实际测试下来发现,用上面异步的代码,效率没有提升。他们写入kafka每秒中有5万条数据,我消费处理能力还是跟不上,消费的逻辑就三部(消费kafka,过滤,写入redis), 问题在哪呢? 是协程问题还是我用法错了,请大家指教下。谢谢了。
原问题在