资讯专栏INFORMATION COLUMN

如何在多个queue多台server上部署Celery 以及任务状态监控flower

goji / 1918人阅读

摘要:是分布式任务队列,能实时处理任务,同时支持官方文档工作原理如下发送给从中消费消息,并将结果存储在中本文中使用的是,使用的是现在有两个,分别是加法运算和乘法运算。假定乘法运算的事件优先级高事件也很多,对于加法运算,要求每分钟最多处理个事件。

Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档
Celery工作原理如下:

celery client发送message给broker

worker 从broker中消费消息,并将结果存储在result_end中

本文中使用的broker是Rabbit MQ,result_end使用的是Redis.

Scenario

现在有两个task,分别是加法运算和乘法运算。假定乘法运算的事件优先级高&事件也很多,对于加法运算,要求每分钟最多处理10个事件。

框架

Celery Worker:
在2 台server上部署worker,其中:
server1上的worker处理queue priority_low和priority_high上的事件
server2上的worker只处理priority_high上的事件

Celery Client:在应用中调用

Rabbit MQ:在server3上启动

Redis:在localhost启动

Code tasks.py & callback

对两个任务加上callback的处理,如果成功,打印“----[task_id] is done”

from celery import Celery
from kombu import Queue
import time


app = Celery("tasks", backend="redis://127.0.0.1:6379/6")
app.config_from_object("celeryconfig")


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "----%s is done" % task_id

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass

@app.task(base=CallbackTask) 
def add(x, y):
    return x + y


@app.task(base=CallbackTask) 
def multiply(x,y):
    return x * y
celeryconfig.py
from kombu import Queue
from kombu import Exchange

result_serializer = "json"


broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f"

task_queues = (
    Queue("priority_low",  exchange=Exchange("priority", type="direct"), routing_key="priority_low"),
    Queue("priority_high",  exchange=Exchange("priority", type="direct"), routing_key="priority_high"),
)

task_routes = ([
    ("tasks.add", {"queue": "priority_low"}),
    ("tasks.multiply", {"queue": "priority_high"}),
],)

task_annotations = {
    "tasks.add": {"rate_limit": "10/m"}
}
Celery Server and Client Worker on Server1

消费priority_high事件

celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h
Worker on Server2

消费priority_high和priority_low事件

celery -A tasks worker -Q priority_high,priority_low --concurrency=4  -l info -E -n worker2@%h
Client

生产者,pushlish 事件到broker

from tasks import add
from tasks import multiply


for i in xrange(50):
    add.delay(2, 2)
    multiply.delay(10,10)
监控 install
pip install flower
启动flower

假设在server2上启动flower,flower默认的端口是5555.

celery  flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
监控界面

在浏览器上输入 http://server2_ip:5555, 可以看到如下界面:
从queued tasks途中,可以看出 priority_high中的task先消费完,和预期是一样的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38606.html

相关文章

  • 如何多个queue多台server部署Celery 以及任务状态监控flower

    摘要:是分布式任务队列,能实时处理任务,同时支持官方文档工作原理如下发送给从中消费消息,并将结果存储在中本文中使用的是,使用的是现在有两个,分别是加法运算和乘法运算。假定乘法运算的事件优先级高事件也很多,对于加法运算,要求每分钟最多处理个事件。 Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档Celery工作原理如下: celery cli...

    Corwien 评论0 收藏0
  • Celery实际使用与内存泄漏问题(面试)

    摘要:结论执行完任务不释放内存与原一直没有被销毁有关,因此可以适当配置小点,而任务并发数与配置项有关,每增加一个必然增加内存消耗,同时也影响到一个何时被销毁,因为是均匀调度任务至每个,因此也不宜配置过大,适当配置。 1.实际使用 ​ 监控task的执行结果:任务id,结果,traceback,children,任务状态 ​ 配置 backend=redis://127...

    0x584a 评论0 收藏0
  • 大数据开发平台(Data Platform)有赞的最佳实践

    摘要:任务调度设计大数据开发平台的任务调度是指在作业发布之后,按照作业配置中指定的调度周期通过指定在一段时间范围内通过开始结束时间指定周期性的执行用户代码。 前言 随着公司规模的增长,对大数据的离线应用开发的需求越来越多,这些需求包括但不限于离线数据同步(MySQL/Hive/Hbase/Elastic Search 等之间的离线同步)、离线计算(Hive/MapReduce/Spark 等...

    HitenDev 评论0 收藏0
  • 手把手教你如何用Crawlab构建技术文章聚合平台(一)

    摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...

    LinkedME2016 评论0 收藏0
  • 手把手教你如何用Crawlab构建技术文章聚合平台(一)

    摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...

    Jeffrrey 评论0 收藏0

发表评论

0条评论

goji

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<