django – 如何使用芹菜队列中的倒数计时器刷新任务

我的Celery队列有数百个具有倒计时的任务,这些任务将使它们在接下来的几个小时内触发.有没有办法让这些任务立即运行,以便有效地刷新队列?

我目前正在计划升级到我们的服务器,并且我希望确保在升级完成时没有后台任务正在运行.如果我必须等待这些倒计时,那没关系,但我宁愿强迫任务运行.

另一种选择可能是暂停处理队列,直到升级完成,但是刷新似乎是一个更好的选择.

编辑:我已经找到了如何找到计划的任务列表:

from celery.task.control import inspect
i = inspect()
tasks = i.scheduled()

现在我只需要弄清楚如何强制执行.

最佳答案 好的,我很确定我已经大致整理了如何做到这一点.我正在将这个答案作为维基并记下我的笔记,以防有人想在这里调整一般过程.

一般的想法是这样的:

>停止向队列中添加新项目.
>确定排队的任何任务.
>使用result.revoke()撤销所有这些任务.
>使用某些已保存的状态重新启动这些任务.

请注意,这不支持在重新排队后为项添加eta,因为这可能是特定于实现的.

因此,要确定排队的任务,您可以:

from celery.task.control import inspect
i = inspect()
scheduled_tasks = i.scheduled()

返回一个dict,如下:

{u'w1.courtlistener.com': [{u'eta': 1414435210.198864,
   u'priority': 6,
   u'request': {u'acknowledged': False,
    u'args': u'(2745724,)',
    u'delivery_info': {u'exchange': u'celery',
     u'priority': None,
     u'routing_key': u'celery'},
    u'hostname': u'w1.courtlistener.com',
    u'id': u'99bc8650-3be1-4d24-81d6-a882d77a8b25',
    u'kwargs': u'{}',
    u'name': u'citations.tasks.update_document_by_id',
    u'time_start': None,
    u'worker_pid': None}}]}

下一步是撤销所有这些任务,例如:

from celery.task.control import revoke
with open('revoked_tasks.csv', 'w') as f:
    for worker, tasks in scheduled_tasks.iteritems():
        print "Now processing worker: %s" % worker
        for task in tasks:
            print "Now revoking task: %s. %s with args: %s and kwargs: %s" % \
              (task['request']['id'], task['request']['name'], task['request']['args'], task['request']['kwargs'])
            f.write('%s|%s|%s|%s|%s\n' % (worker, task['request']['name'], task['request']['id'], task['request']['args'], task['request']['kwargs']))
            revoke(task['request']['id'], terminate=True)

然后,最后,像往常一样重新运行任务,从CSV文件加载它们:

with open('revoked_tasks', 'r') as f:
    for line in f:
        worker, command, id, args, kwargs = line.split("|")
        # Impost task here, something like...
        package, module = command.rsplit('.', 1)
        mod = __import__(package, globals(), locals(), [module])

        # Run the commands, something like...
        mod.__get_attribute__(module).delay(args*, kwargs**)
点赞