摘要:的簡介是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。目前支持等作為消息代理,但適用于生產環境的只有和官方推薦。任務處理完后保存狀態信息和結果,以供查詢。
celery的簡介
??celery是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。它的執行單元為任務(task),利用多線程,如Eventlet,gevent等,它們能被并發地執行在單個或多個職程服務器(worker servers)上。任務能異步執行(后臺運行)或同步執行(等待任務完成)。
??在生產系統中,celery能夠一天處理上百萬的任務。它的完整架構圖如下:
組件介紹:
Producer:調用了Celery提供的API、函數或者裝飾器而產生任務并交給任務隊列處理的都是任務生產者。
Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
Broker:消息代理,又稱消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息代理,但適用于生產環境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。
Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。
Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
??在客戶端和消費者之間傳輸數據需要序列化和反序列化。 Celery 支出的序列化方案如下所示:
準備工作??在本文中,我們使用的celery的消息代理和后端存儲數據庫都使用redis,序列化和反序列化選擇msgpack。
??首先,我們需要安裝redis數據庫,具體的安裝方法可參考:http://www.runoob.com/redis/r... 。啟動redis,我們會看到如下界面:
在redis可視化軟件rdm中,我們看到的數據庫如下:
里面沒有任何數據。
??接著,為了能夠在python中使用celery,我們需要安裝以下模塊:
celery
redis
msgpack
這樣,我們的準備工作就完畢了。
一個簡單的例子??我們創建的工程名稱為proj,結構如下圖:
??首先是主程序app_test.py,代碼如下:
from celery import Celery app = Celery("proj", include=["proj.tasks"]) app.config_from_object("proj.celeryconfig") if __name__ == "__main__": app.start()
分析一下這個程序:
"from celery import Celery"是導入celery中的Celery類。
app是Celery類的實例,創建的時候添加了proj.tasks這個模塊,也就是包含了proj/tasks.py這個文件。
把Celery配置存放進proj/celeryconfig.py文件,使用app.config_from_object加載配置。
??接著是任務函數文件tasks.py,代碼如下:
import time from proj.app_test import app @app.task def add(x, y): time.sleep(1) return x + y
tasks.py只有一個任務函數add,讓它生效的最直接的方法就是添加app.task這個裝飾器。add的功能是先休眠一秒,然后返回兩個數的和。
??接著是配置文件celeryconfig.py,代碼如下:
BROKER_URL = "redis://localhost" # 使用Redis作為消息代理 CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # 把任務結果存在了Redis CELERY_TASK_SERIALIZER = "msgpack" # 任務序列化和反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = "json" # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_ACCEPT_CONTENT = ["json", "msgpack"] # 指定接受的內容類型
??最后是調用文件diaoyong.py,代碼如下:
from proj.tasks import add import time t1 = time.time() r1 = add.delay(1, 2) r2 = add.delay(2, 4) r3 = add.delay(3, 6) r4 = add.delay(4, 8) r5 = add.delay(5, 10) r_list = [r1, r2, r3, r4, r5] for r in r_list: while not r.ready(): pass print(r.result) t2 = time.time() print("共耗時:%s" % str(t2-t1))
在這個程序中,我們調用了add函數五次,delay()用來調用任務。
例子的運行??到此為止,我們已經理解了整個項目的結構與代碼。
??接下來,我們嘗試著把這個項目運行起來。
??首先,我們需要啟動redis。接著,切換至proj項目所在目錄,并運行命令:
celery -A proj.app_test worker -l info
界面如下:
然后,我們運行diaoyong.py,輸出的結果如下:
3 6 9 12 15 共耗時:1.1370790004730225
后臺輸出如下:
接著,我們看一下rdm中的數據:
至此,我們已經成功運行了這個項目。
??下面,我們嘗試著對這個運行結果做些分析。首先,我們一次性調用了五次add函數,但是運行的總時間才1秒多。這是celery異步運行的結果,如果是同步運行,那么,至少需要5秒多,因為每調用add函數一次,就會休眠一秒。這就是celery的強大之處。
??從后臺輸出可以看到,程序會先將任務分發出來,每個任務一個ID,在后臺統一處理,處理完后會有相應的結果返回,同時該結果也會儲存之后臺數據庫。可以利用ready()判斷任務是否執行完畢,再用result獲取任務的結果。
??本文項目的github地址為:https://github.com/percent4/c... 。
??本次分享到此結束,感謝閱讀~
??注意:本人現已開通微信公眾號: Python爬蟲與算法(微信號為:easy_web_scrape), 歡迎大家關注哦~~
Celery 初步:http://docs.jinkan.org/docs/c...
使用Celery:https://zhuanlan.zhihu.com/p/...
異步神器celery:https://www.jianshu.com/p/9be...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/43139.html
摘要:是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。本文將介紹如何使用來加速爬蟲。本文爬蟲的例子來自文章爬蟲的種姿勢。雖然沒有這個爬蟲框架和異步框架來的快,但這也可以作為一種爬蟲的思路。 ??celery是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。關于celery的更多介紹及例子,筆者可以參考文章Python之celery的簡...
摘要:倘若該回答是正確的,則立即有如下推論在處理信號的過程中,字節碼具有原子性。因此,除了在兩個字節碼之間,應該還有其他時機喚起了。行的是信號處理函數的最外層包裝,由系統調用或注冊至內核,并在信號發生時被內核回調,是異常控制流的入口。 寫在前面 前幾天工作時遇到了一個匪夷所思的問題。經過幾次嘗試后問題得以解決,但問題產生的原因卻仍令人費解。查找 SO 無果,我決定翻看 Python 的源碼。...
摘要:比較的是兩個對象的內容是并發編程之協程異步后端掘金引言隨著的盛行,相信大家今年多多少少都聽到了異步編程這個概念。使用進行并發編程篇二掘金我們今天繼續深入學習。 python 之機器學習庫 scikit-learn - 后端 - 掘金一、 加載sklearn中的數據集datasets from sklearn import datasets iris = datasets.load_i...
閱讀 3259·2023-04-26 01:31
閱讀 1901·2023-04-25 22:08
閱讀 3449·2021-09-01 11:42
閱讀 2830·2019-08-30 12:58
閱讀 2174·2019-08-29 18:31
閱讀 2438·2019-08-29 17:18
閱讀 3070·2019-08-29 13:01
閱讀 2556·2019-08-28 18:22