python 协程 kafka redis效率研究

1. 问题描述

使用aiokafka消费kafka和aioredis写入redis。在峰值时候,kafka每秒有5万条数据。 之前是同步操作的,用的kafka-python和redis。每次poll一千条消息,然后批量写入redis。
现在换成这2个IO换成异步方式,然后并没有效率提升,反而下降了,莫名其妙的。

2. 代码演示

def filter_msg_handler(topic_str, msg_lists, filter_params):
    """返回list 供insert_msg_to_redis 使用"""
    data = []
    for msg_info in msg_lists:
        """
        这里省略
        """
        data.append(temp_tuple)

    return data


async def insert_msg_to_redis(msg_tuple, loop):
    """
    这里异步pipeline 写入redis,有超过1000条数据
    """
    redis_host = (redis_write_config.get("host"), redis_write_config.get("port"))

    redis_client = await aioredis.create_redis(address=redis_host,
                                               password=redis_write_config.get("password"),
                                               loop=loop)

    pipe = redis_client.pipeline()
    for k, msg in msg_tuple:
        if not all([k, msg]):
            continue
        """ 有些字段不能暴露 """

        temp_msg_info = json.dumps(msg)
        redis_key = "%s_aaaaaaaaaaaaa:%s" % (_redis_const, k)
        pipe.zadd(redis_key, temp_time, temp_member)
        redis_key_list = "%s_aaaaa:%s" % (_redis_const, k)
        pipe.lpush(redis_key_list, temp_msg_info)

    result = await pipe.execute()
    redis_client.close()
    await redis_client.wait_closed()



async def run(topic, filter_params, loop):
    """
    """
    topic_group = "%s_aaaaaaa" % (topic,)
    consumer = AIOKafkaConsumer(topic,
                                group_id=topic_group,
                                loop=loop,
                                bootstrap_servers=_kafka_hosts,
                                session_timeout_ms=2 * 10000,
                                heartbeat_interval_ms=2 * 3000,
                                max_partition_fetch_bytes=15 * 1024 * 1024,
                                )
    await consumer.start()
    while True:
        result = await consumer.getmany(timeout_ms=1 * 1000, max_records=1000)
        for tp, messages in result.items():
            if messages:
                # 过滤然后写入redis
                try:
                    res = filter_msg_handler(topic, messages, filter_params)
                    # 这里批量1000条结果写入一次
                    task = loop.create_task(insert_msg_to_redis(res, loop))
                except Exception as e:
                    logging.error(traceback.print_exc())

def main(topic, filter_params):
    """
    顶级入口
    """
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(run(topic, filter_params, event_loop))
    finally:
        event_loop.close()  # 当轮训器关闭以后,所有没有执行完成的协成将全部关闭


if __name__ == '__main__':
    """ """
    try:
        p = Pool(partition_len)
        for x, _ in poll_data:
            _topic_filter_config = _filter_config_format.get(x) or []
            p.apply_async(main, args=(x, _topic_filter_config))

        p.close()
        p.join()

    except Exception as e:
        logging.info(traceback.print_exc())

3. 问题

实际测试下来发现,用上面异步的代码,效率没有提升。他们写入kafka每秒中有5万条数据,我消费处理能力还是跟不上,消费的逻辑就三部(消费kafka,过滤,写入redis), 问题在哪呢? 是协程问题还是我用法错了,请大家指教下。谢谢了。

原问题在

人类身份验证 – SegmentFaultsegmentfault.com

    原文作者:beyondmars3
    原文地址: https://zhuanlan.zhihu.com/p/61143493
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞