100行代码实现任务队列

最近刚看完python多线程,为了加深印象,按照1分钟实现“延迟消息”功能的思路,实现了一个简易版的异步队列。

高效延时消息,包含两个重要的数据结构:

1.环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)

2.任务集合,环上每一个slot是一个Set<Task>

同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:

(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针

下边是代码(代码不止100行,但是在200行内,也算100行了。)

#! -*- coding: utf-8 -*-

try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import simplejson as json
except ImportError:
    import json

import os
import errno
import Queue
import random
import logging
from functools import wraps
from threading import Timer, RLock, Thread
from time import sleep, time
from base64 import b64encode, b64decode

# json 的数据结构
# tasks = {
#     index: {
#         cycle_num: [(func, bargs)]
#     }
# }

logging.basicConfig(level=logging.DEBUG,
                    format='(%(asctime)-15s) %(message)s',)
tasks_file = 'tasks.json'
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY

# 为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量
WORKER_NUMS = 2
q = Queue.Queue(WORKER_NUMS)

lock = RLock()


def check_file():
    try:
        file_handle = os.open(tasks_file, flags)
    except OSError as e:
        if e.errno == errno.EEXIST:  # Failed as the file already exists.
            pass
        else:
            raise
    else:
        with os.fdopen(file_handle, 'w') as file_obj:
            file_obj.write("{}")


def set_delay_task(func_name, *args, **kwargs):
    # 使用锁来保证每次只要一个线程写入文件,防止数据出错
    with lock:
        with open(tasks_file, 'r+') as json_file:
            count_down = kwargs.pop('count_down', 0)
            tasks = json.load(json_file)
            # 执行时间
            exec_time = int(time()) + count_down
            # 循环索引
            index = str(exec_time % 3600)
            # 圈数
            cycle_num = str(exec_time / 3600 + 1)
            dargs = pickle.dumps((args, kwargs))
            bargs = b64encode(dargs)
            index_data = tasks.get(index, {})
            index_data.setdefault(cycle_num, []).append((func_name, bargs))
            tasks[index] = index_data
            json_file.seek(0)
            json.dump(tasks, json_file)
            logging.debug('Received task: %s' % func_name)


def get_delay_tasks():
    with open(tasks_file, 'r+') as json_file:
        tasks = json.load(json_file)
        # 执行时间
        current_time = int(time())
        # 循环索引
        index = str(current_time % 3600)
        # 圈数
        cycle_num = str(current_time / 3600 + 1)
        current_tasks = tasks.get(index, {}).get(cycle_num, [])
    tasks = []
    for func, bargs in current_tasks:
        dargs = b64decode(bargs)
        args, kwargs = pickle.loads(dargs)
        tasks.append((func, (args, kwargs)))
    return tasks


def get_method_by_name(method_name):
    possibles = globals().copy()
    possibles.update(locals())
    method = possibles.get(method_name)
    return method


def create_task(task_class, func, task_name=None, **kwargs):

    def execute(self):
        args, kwargs = self.data or ((), {})
        return func(*args, **kwargs)

    attrs = {
        'execute': execute,
        'func_name': func.__name__,
        '__module__': func.__module__,
        '__doc__': func.__doc__
    }
    attrs.update(kwargs)

    klass = type(
        task_name or func.__name__,
        (task_class,),
        attrs
    )

    return klass


class Hu(object):

    def __init__(self, func_name=None):
        self.func_name = func_name
        check_file()

    def task(self):
        def deco(func):
            self.func_name = func.__name__
            klass = create_task(Hu, func, self.func_name)
            func.delay = klass(func_name=klass.func_name).delay
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            return wrapper
        return deco

    def delay(self, *args, **kwargs):
        _args = [self.func_name]
        _args.extend(args)
        Timer(0, set_delay_task, _args, kwargs).start()
        return True


def boss():
    while True:
        current_tasks = get_delay_tasks()
        for func, params in current_tasks:
            # Task accepted: auth.tasks.send_msg
            logging.debug('Task accepted: %s' % func)
            q.put((func, params))
        sleep(1)


def worker():
    while True:
        func, params = q.get()
        print 'get task: %s\n' % func
        method = get_method_by_name(func)
        args, kwargs = params
        # Task auth.tasks.send_msgsucceeded in
        start_time = time()
        method(*args, **kwargs)
        end_time = time()
        logging.debug('Task %s succeeded in %s' % (str(func), end_time - start_time))
        q.task_done()


def main():
    check_file()
    print('starting at:', time())
    for target in (boss, worker):
        t = Thread(target=target)
        t.start()
    print('all DONE at:', time())

hu = Hu()

# 使用方式如下:

@hu.task()
def test(num):
    sleep(2)
    print 'test: %s' % num


if __name__ == '__main__':
    for i in range(10):
        test.delay(i, count_down=random.randint(1, 10))
    main()

# output

(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test

(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2
get task: test

(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796
(2017-03-21 15:59:24,404) Task accepted: test
test: 1
get task: test

按照1分钟实现“延迟消息”功能的思路。队列的数据结构为

{
    index: {
        cycle_num: [(func, bargs)]
    }
}

index的值为 1-3600。每小时一个循环。
cycle_num 则是 由 (时间戳 / 3600 + 1) 计算得到的值,是圈数。

每当有任务加入,我们计算出index和cycle_num 将参数和方法名写入json文件。
读取任务时,计算当前 index和cycle_num, 取出需要执行的任务,使用多线程的形式执行。

为了防止任务太多需要生成过多的线程,我们使用Queue 来限制生成的线程数量。

加锁的主要作用是防止多线程同时操作文件读写,影响数据一致性。

当然,也可以使用redis 存储队列,因为 redis 是单线程操作,可以防止多线程操作影响数据一致性的问题。
这一部分有需要的可以自己实现。

参考:

>欢迎关注>请我喝芬达
《100行代码实现任务队列》《100行代码实现任务队列》
    原文作者:goodspeed
    原文地址: https://segmentfault.com/a/1190000008778351
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞