摘要:是分布式任務隊列,能實時處理任務,同時支持官方文檔工作原理如下發送給從中消費消息,并將結果存儲在中本文中使用的是,使用的是現在有兩個,分別是加法運算和乘法運算。假定乘法運算的事件優先級高事件也很多,對于加法運算,要求每分鐘最多處理個事件。
Celery是分布式任務隊列,能實時處理任務, 同時支持task scheduling. 官方文檔
Celery工作原理如下:
celery client發送message給broker
worker 從broker中消費消息,并將結果存儲在result_end中
本文中使用的broker是Rabbit MQ,result_end使用的是Redis.
Scenario現在有兩個task,分別是加法運算和乘法運算。假定乘法運算的事件優先級高&事件也很多,對于加法運算,要求每分鐘最多處理10個事件。
框架Celery Worker:
在2 臺server上部署worker,其中:
server1上的worker處理queue priority_low和priority_high上的事件
server2上的worker只處理priority_high上的事件
Celery Client:在應用中調用
Rabbit MQ:在server3上啟動
Redis:在localhost啟動
Code tasks.py & callback對兩個任務加上callback的處理,如果成功,打印“----[task_id] is done”
from celery import Celery from kombu import Queue import time app = Celery("tasks", backend="redis://127.0.0.1:6379/6") app.config_from_object("celeryconfig") class CallbackTask(Task): def on_success(self, retval, task_id, args, kwargs): print "----%s is done" % task_id def on_failure(self, exc, task_id, args, kwargs, einfo): pass @app.task(base=CallbackTask) def add(x, y): return x + y @app.task(base=CallbackTask) def multiply(x,y): return x * yceleryconfig.py
from kombu import Queue from kombu import Exchange result_serializer = "json" broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f" task_queues = ( Queue("priority_low", exchange=Exchange("priority", type="direct"), routing_key="priority_low"), Queue("priority_high", exchange=Exchange("priority", type="direct"), routing_key="priority_high"), ) task_routes = ([ ("tasks.add", {"queue": "priority_low"}), ("tasks.multiply", {"queue": "priority_high"}), ],) task_annotations = { "tasks.add": {"rate_limit": "10/m"} }Celery Server and Client Worker on Server1
消費priority_high事件
celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%hWorker on Server2
消費priority_high和priority_low事件
celery -A tasks worker -Q priority_high,priority_low --concurrency=4 -l info -E -n worker2@%hClient
生產者,pushlish 事件到broker
from tasks import add from tasks import multiply for i in xrange(50): add.delay(2, 2) multiply.delay(10,10)監控 install
pip install flower啟動flower
假設在server2上啟動flower,flower默認的端口是5555.
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//監控界面
在瀏覽器上輸入 http://server2_ip:5555, 可以看到如下界面:
從queued tasks途中,可以看出 priority_high中的task先消費完,和預期是一樣的。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/38606.html
摘要:結論執行完任務不釋放內存與原一直沒有被銷毀有關,因此可以適當配置小點,而任務并發數與配置項有關,每增加一個必然增加內存消耗,同時也影響到一個何時被銷毀,因為是均勻調度任務至每個,因此也不宜配置過大,適當配置。 1.實際使用 ? 監控task的執行結果:任務id,結果,traceback,children,任務狀態 ? 配置 backend=redis://127...
摘要:本文將介紹如何使用和抓取主流的技術博客文章,然后用搭建一個小型的技術文章聚合平臺。是谷歌開源的基于和的自動化測試工具,可以很方便的讓程序模擬用戶的操作,對瀏覽器進行程序化控制。相對于,是新的開源項目,而且是谷歌開發,可以使用很多新的特性。 背景 說到爬蟲,大多數程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯,而且有很強大的生態圈,有gerapy等優秀的可視化界面。但...
摘要:本文將介紹如何使用和抓取主流的技術博客文章,然后用搭建一個小型的技術文章聚合平臺。是谷歌開源的基于和的自動化測試工具,可以很方便的讓程序模擬用戶的操作,對瀏覽器進行程序化控制。相對于,是新的開源項目,而且是谷歌開發,可以使用很多新的特性。 背景 說到爬蟲,大多數程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯,而且有很強大的生態圈,有gerapy等優秀的可視化界面。但...
閱讀 1464·2021-11-24 09:39
閱讀 1783·2021-11-22 15:25
閱讀 3736·2021-11-19 09:40
閱讀 3296·2021-09-22 15:31
閱讀 1296·2021-07-29 13:49
閱讀 1204·2019-08-26 11:59
閱讀 1317·2019-08-26 11:39
閱讀 928·2019-08-26 11:00