摘要:是什么是一個由編寫的簡單靈活可靠的用來處理大量信息的分布式系統它同時提供操作和維護分布式系統所需的工具。專注于實時任務處理,支持任務調度。說白了,它是一個分布式隊列的管理工具,我們可以用提供的接口快速實現并管理一個分布式的任務隊列。
Celery 是什么?
Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分布式系統,它同時提供操作和維護分布式系統所需的工具。
Celery 專注于實時任務處理,支持任務調度。
說白了,它是一個分布式隊列的管理工具,我們可以用 Celery 提供的接口快速實現并管理一個分布式的任務隊列。
1.快速入門(本文以 Celery4.0 為基礎進行書寫)
首先,我們要理解 Celery 本身不是任務隊列,它是管理分布式任務隊列的工具,或者換一種說法,它封裝好了操作常見任務隊列的各種操作,我們用它可以快速進行任務隊列的使用與管理,當然你也可以自己看 rabbitmq 等隊列的文檔然后自己實現相關操作都是沒有問題的。
Celery 是語言無關的,雖然它是用 Python 實現的,但他提供了其他常見語言的接口支持。只是如果你恰好使用 Python 進行開發那么使用 Celery 就自然而然了。
想讓 Celery 運行起來我們要明白幾個概念:
1.1 Brokersbrokers 中文意思為中間人,在這里就是指任務隊列本身,Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列)
常見的 brokers 有 rabbitmq、redis、Zookeeper 等
1.2 Result Stores / backend顧名思義就是結果儲存的地方,隊列中的任務運行完后的結果或者狀態需要被任務發送者知道,那么就需要一個地方儲存這些結果,就是 Result Stores 了
常見的 backend 有 redis、Memcached 甚至常用的數據都可以。
1.3 Workers就是 Celery 中的工作者,類似與生產/消費模型中的消費者,其從隊列中取出任務并執行
1.4 Tasks就是我們想在隊列中進行的任務咯,一般由用戶、觸發器或其他操作將任務入隊,然后交由 workers 進行處理。
理解以上概念后我們就可以快速實現一個隊列的操作:
這里我們用 redis 當做 celery 的 broker 和 backend。
(其他 brokers 與 backend 支持看這里)
安裝 Celery 和 redis 以及 python 的 redis 支持:
apt-get install redis-server pip install redis pip install celery
這里需要注意如果你的 celery 是 4.0 及以上版本請確保 python 的 redis 庫版本在 2.10.4 及以上,否則會出現 redis 連接 timeout 的錯誤,具體參考
然后,我們需要寫一個task:
#tasks.py from celery import Celery app = Celery("tasks", backend="redis://localhost:6379/0", broker="redis://localhost:6379/0") #配置好celery的backend和broker @app.task #普通函數裝飾為 celery task def add(x, y): return x + y
OK,到這里,broker 我們有了,backend 我們有了,task 我們也有了,現在就該運行 worker 進行工作了,在 tasks.py 所在目錄下運行:
celery -A tasks worker --loglevel=info
意思就是運行 tasks 這個任務集合的 worker 進行工作(當然此時broker中還沒有任務,worker此時相當于待命狀態)
最后一步,就是觸發任務啦,最簡單方式就是再寫一個腳本然后調用那個被裝飾成 task 的函數:
#trigger.py from tasks import add result = add.delay(4, 4) #不要直接 add(4, 4),這里需要用 celery 提供的接口 delay 進行調用 while not result.ready(): time.sleep(1) print "task done: {0}".format(result.get())
運行此腳本
delay 返回的是一個 AsyncResult 對象,里面存的就是一個異步的結果,當任務完成時result.ready() 為 true,然后用 result.get() 取結果即可。
到此,一個簡單的 celery 應用就完成啦。
2. 進階用法經過快速入門的學習后,我們已經能夠使用 Celery 管理普通任務,但對于實際使用場景來說這是遠遠不夠的,所以我們需要更深入的去了解 Celery 更多的使用方式。
首先來看之前的task:
@app.task #普通函數裝飾為 celery task def add(x, y): return x + y
這里的裝飾器app.task實際上是將一個正常的函數修飾成了一個 celery task 對象,所以這里我們可以給修飾器加上參數來決定修飾后的 task 對象的一些屬性。
首先,我們可以讓被修飾的函數成為 task 對象的綁定方法,這樣就相當于被修飾的函數 add 成了 task 的實例方法,可以調用 self 獲取當前 task 實例的很多狀態及屬性。
其次,我們也可以自己復寫 task 類然后讓這個自定義 task 修飾函數 add ,來做一些自定義操作。
2.1 根據任務狀態執行不同操作任務執行后,根據任務狀態執行不同操作需要我們復寫 task 的 on_failure、on_success 等方法:
# tasks.py class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print "task done: {0}".format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print "task fail, reason: {0}".format(exc) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y
嗯, 然后繼續運行 worker:
celery -A tasks worker --loglevel=info
運行腳本,得到:
再修改下tasks:
@app.task #普通函數裝飾為 celery task def add(x, y): raise KeyError return x + y
重新運行 worker,再運行 trigger.py:
可以看到,任務執行成功或失敗后分別執行了我們自定義的 on_failure、on_success
2.2 綁定任務為實例方法# tasks.py from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): logger.info(self.request.__dict__) return x + y
然后重新運行:
執行中的任務獲取到了自己執行任務的各種信息,可以根據這些信息做很多其他操作,例如判斷鏈式任務是否到結尾等等。
關于 celery.task.request 對象的詳細數據可以看這里
2.3 任務狀態回調實際場景中得知任務狀態是很常見的需求,對于 Celery 其內建任務狀態有如下幾種:
參數 | 說明 |
---|---|
PENDING | 任務等待中 |
STARTED | 任務已開始 |
SUCCESS | 任務執行成功 |
FAILURE | 任務執行失敗 |
RETRY | 任務將被重試 |
REVOKED | 任務取消 |
當我們有個耗時時間較長的任務進行時一般我們想得知它的實時進度,這里就需要我們自定義一個任務狀態用來說明進度并手動更新狀態,從而告訴回調當前任務的進度,具體實現:
# tasks.py from celery import Celery import time @app.task(bind=True) def test_mes(self): for i in xrange(1, 11): time.sleep(0.1) self.update_state(state="PROGRESS", meta={"p": i*10}) return "finish"
然后在 trigger.py 中增加:
# trigger.py from task import add,test_mes import sys def pm(body): res = body.get("result") if body.get("status") == "PROGRESS": sys.stdout.write(" 任務進度: {0}%".format(res.get("p"))) sys.stdout.flush() else: print " " print res r = test_mes.delay() print r.get(on_message=pm, propagate=False)
然后運行任務:
Celery 進行周期任務也很簡單,只需要在配置中配置好周期任務,然后在運行一個周期任務觸發器( beat )即可:
新建 Celery 配置文件 celery_config.py:
# celery_config.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { "ptask": { "task": "tasks.period_task", "schedule": timedelta(seconds=5), }, } CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
配置中 schedule 就是間隔執行的時間,這里可以用 datetime.timedelta 或者 crontab 甚至太陽系經緯度坐標進行間隔時間配置,具體可以參考這里
如果定時任務涉及到 datetime 需要在配置中加入時區信息,否則默認是以 utc 為準。例如中國可以加上:
CELERY_TIMEZONE = "Asia/Shanghai"
然后在 tasks.py 中增加要被周期執行的任務:
# tasks.py app = Celery("tasks", backend="redis://localhost:6379/0", broker="redis://localhost:6379/0") app.config_from_object("celery_config") @app.task(bind=True) def period_task(self): print "period task done: {0}".format(self.request.id)
然后重新運行 worker,接著再運行 beat:
celery -A task beat
可以看到周期任務運行正常~
2.5 鏈式任務有些任務可能需由幾個子任務組成,此時調用各個子任務的方式就變的很重要,盡量不要以同步阻塞的方式調用子任務,而是用異步回調的方式進行鏈式任務的調用:
錯誤示范@app.task def update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info) @app.task def fetch_page(url): return myhttplib.get(url) @app.task def parse_page(url, page): return myparser.parse_document(page) @app.task def store_page_info(url, info): return PageInfo.objects.create(url, info)正確示范1
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)正確示范2
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
鏈式任務中前一個任務的返回值默認是下一個任務的輸入值之一 ( 不想讓返回值做默認參數可以用 si() 或者 s(immutable=True) 的方式調用 )。
這里的 s() 是方法 celery.signature() 的快捷調用方式,signature 具體作用就是生成一個包含調用任務及其調用參數與其他信息的對象,個人感覺有點類似偏函數的概念:先不執行任務,而是把任務與任務參數存起來以供其他地方調用。
2.6 調用任務前面講了調用任務不能直接使用普通的調用方式,而是要用類似 add.delay(2, 2) 的方式調用,而鏈式任務中又用到了 apply_async 方法進行調用,實際上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以進行更多的任務屬性設置,比如 callbacks/errbacks 正常回調與錯誤回調、執行超時、重試、重試時間等等,具體參數可以參考這里
2.7 關于 AsyncResultAsyncResult 主要用來儲存任務執行信息與執行結果,有點類似 tornado 中的 Future 對象,都有儲存異步結果與任務執行狀態的功能,對于寫 js 的朋友,它有點類似 Promise 對象,當然在 Celery 4.0 中已經支持了 promise 協議,只需要配合 gevent 一起使用就可以像寫 js promise 一樣寫回調:
import gevent.monkey monkey.patch_all() import time from celery import Celery app = Celery(broker="amqp://", backend="rpc") @app.task def add(x, y): return x + y def on_result_ready(result): print("Received result for id %r: %r" % (result.id, result.result,)) add.delay(2, 2).then(on_result_ready)
要注意的是這種 promise 寫法現在只能用在 backend 是 RPC (amqp) 或 Redis 時。 并且獨立使用時需要引入 gevent 的猴子補丁,可能會影響其他代碼。 官方文檔給的建議是這個特性結合異步框架使用更合適,例如 tornado、 twisted 等。
delay 與 apply_async 生成的都是 AsyncResult 對象,此外我們還可以根據 task id 直接獲取相關 task 的 AsyncResult: AsyncResult(task_id=xxx)
關于 AsyncResult 更詳細的內容,可以參考這里
利用 Celery 進行分布式隊列管理、開發將會大幅提升開發效率,關于 Celery 更詳細的使用大家可以去參考詳細的官方文檔
作者:rapospectre
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/38333.html
摘要:我們將窗口切換到的啟動窗口,會看到多了兩條日志這說明任務已經被調度并執行成功。本文標題為異步任務神器簡明筆記本文鏈接為參考資料使用之美分布式任務隊列的介紹思誠之道異步任務神器簡明筆記 Celery 在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會采用多線程或異步任務。比如,在 Web 開發中,對新用戶的注冊,我們通常會給他發一封激活郵件,...
摘要:的簡介是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。目前支持等作為消息代理,但適用于生產環境的只有和官方推薦。任務處理完后保存狀態信息和結果,以供查詢。 celery的簡介 ??celery是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。它的執行單元為任務(task),利用多線程,如Eventlet,gevent等,它們能被...
摘要:今天介紹一下如何在項目中使用搭建一個有兩個節點的任務隊列一個主節點一個子節點主節點發布任務,子節點收到任務并執行。 今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務并執行。搭建3個或者以上的節點就類似了),使用到了celery,rabbitmq。這里不會單獨介紹celery和rabbitmq中的知識了...
摘要:主要是為了實現系統之間的雙向解耦而實現的。問題及優化隊列過長問題使用上述方案的異步非阻塞可能會依賴于的任務隊列長度,若隊列中的任務過多,則可能導致長時間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個用python編寫的一個強大的、可擴展的異步HTTP服務器,同時也是一個web開發框架。tornado是一個非阻塞式web服務器,其速度相當快。...
閱讀 2471·2021-11-23 09:51
閱讀 531·2019-08-30 13:59
閱讀 1830·2019-08-29 11:20
閱讀 2538·2019-08-26 13:41
閱讀 3246·2019-08-26 12:16
閱讀 735·2019-08-26 10:59
閱讀 3331·2019-08-26 10:14
閱讀 605·2019-08-23 17:21