我的Celery队列有数百个具有倒计时的任务,这些任务将使它们在接下来的几个小时内触发.有没有办法让这些任务立即运行,以便有效地刷新队列?
我目前正在计划升级到我们的服务器,并且我希望确保在升级完成时没有后台任务正在运行.如果我必须等待这些倒计时,那没关系,但我宁愿强迫任务运行.
另一种选择可能是暂停处理队列,直到升级完成,但是刷新似乎是一个更好的选择.
编辑:我已经找到了如何找到计划的任务列表:
from celery.task.control import inspect
i = inspect()
tasks = i.scheduled()
现在我只需要弄清楚如何强制执行.
最佳答案 好的,我很确定我已经大致整理了如何做到这一点.我正在将这个答案作为维基并记下我的笔记,以防有人想在这里调整一般过程.
一般的想法是这样的:
>停止向队列中添加新项目.
>确定排队的任何任务.
>使用result.revoke()
撤销所有这些任务.
>使用某些已保存的状态重新启动这些任务.
请注意,这不支持在重新排队后为项添加eta,因为这可能是特定于实现的.
因此,要确定排队的任务,您可以:
from celery.task.control import inspect
i = inspect()
scheduled_tasks = i.scheduled()
返回一个dict,如下:
{u'w1.courtlistener.com': [{u'eta': 1414435210.198864,
u'priority': 6,
u'request': {u'acknowledged': False,
u'args': u'(2745724,)',
u'delivery_info': {u'exchange': u'celery',
u'priority': None,
u'routing_key': u'celery'},
u'hostname': u'w1.courtlistener.com',
u'id': u'99bc8650-3be1-4d24-81d6-a882d77a8b25',
u'kwargs': u'{}',
u'name': u'citations.tasks.update_document_by_id',
u'time_start': None,
u'worker_pid': None}}]}
下一步是撤销所有这些任务,例如:
from celery.task.control import revoke
with open('revoked_tasks.csv', 'w') as f:
for worker, tasks in scheduled_tasks.iteritems():
print "Now processing worker: %s" % worker
for task in tasks:
print "Now revoking task: %s. %s with args: %s and kwargs: %s" % \
(task['request']['id'], task['request']['name'], task['request']['args'], task['request']['kwargs'])
f.write('%s|%s|%s|%s|%s\n' % (worker, task['request']['name'], task['request']['id'], task['request']['args'], task['request']['kwargs']))
revoke(task['request']['id'], terminate=True)
然后,最后,像往常一样重新运行任务,从CSV文件加载它们:
with open('revoked_tasks', 'r') as f:
for line in f:
worker, command, id, args, kwargs = line.split("|")
# Impost task here, something like...
package, module = command.rsplit('.', 1)
mod = __import__(package, globals(), locals(), [module])
# Run the commands, something like...
mod.__get_attribute__(module).delay(args*, kwargs**)