如何在多个queue多台server上部署Celery 以及任务状态监控flower

Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档
Celery工作原理如下:

  1. celery client发送message给broker

  2. worker 从broker中消费消息,并将结果存储在result_end中

本文中使用的broker是Rabbit MQ,result_end使用的是Redis.

Scenario

现在有两个task,分别是加法运算和乘法运算。假定乘法运算的事件优先级高&事件也很多,对于加法运算,要求每分钟最多处理10个事件。

框架

  • Celery Worker:
    在2 台server上部署worker,其中:
    server1上的worker处理queue priority_low和priority_high上的事件
    server2上的worker只处理priority_high上的事件

  • Celery Client:在应用中调用

  • Rabbit MQ:在server3上启动

  • Redis:在localhost启动

《如何在多个queue多台server上部署Celery 以及任务状态监控flower》

Code

tasks.py & callback

对两个任务加上callback的处理,如果成功,打印“—-[task_id] is done”

from celery import Celery
from kombu import Queue
import time


app = Celery('tasks', backend='redis://127.0.0.1:6379/6')
app.config_from_object('celeryconfig')


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "----%s is done" % task_id

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass

@app.task(base=CallbackTask) 
def add(x, y):
    return x + y


@app.task(base=CallbackTask) 
def multiply(x,y):
    return x * y

celeryconfig.py

from kombu import Queue
from kombu import Exchange

result_serializer = 'json'


broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f"

task_queues = (
    Queue('priority_low',  exchange=Exchange('priority', type='direct'), routing_key='priority_low'),
    Queue('priority_high',  exchange=Exchange('priority', type='direct'), routing_key='priority_high'),
)

task_routes = ([
    ('tasks.add', {'queue': 'priority_low'}),
    ('tasks.multiply', {'queue': 'priority_high'}),
],)

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

Celery Server and Client

Worker on Server1

消费priority_high事件

celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h

Worker on Server2

消费priority_high和priority_low事件

celery -A tasks worker -Q priority_high,priority_low --concurrency=4  -l info -E -n worker2@%h

Client

生产者,pushlish 事件到broker

from tasks import add
from tasks import multiply


for i in xrange(50):
    add.delay(2, 2)
    multiply.delay(10,10)

监控

install

pip install flower

启动flower

假设在server2上启动flower,flower默认的端口是5555.

celery  flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//

监控界面

在浏览器上输入 http://server2_ip:5555, 可以看到如下界面:
从queued tasks途中,可以看出 priority_high中的task先消费完,和预期是一样的。
《如何在多个queue多台server上部署Celery 以及任务状态监控flower》
《如何在多个queue多台server上部署Celery 以及任务状态监控flower》

    原文作者:here_bg
    原文地址: https://segmentfault.com/a/1190000009149587
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞