小編寫這篇文章的主要目的,主要是給大家去進行講解Django項目實例情況,包括celery的一些具體使用情況介紹,學習這些的話,對我們的工作和生活幫助還是很大的,但是怎么樣才能夠更快的進行上手呢?下面就一個具體實例給大家進行解答。
1、django應用Celery
django框架請求/響應的過程是同步的,框架本身無法實現異步響應。
但是我們在項目過程中會經常會遇到一些耗時的任務,比如:發送郵件、發送短信、大數據統計等等,這些操作耗時長,同步執行對用戶體驗非常不友好,那么在這種情況下就需要實現異步執行。
異步執行前端一般使用ajax,后端使用Celery。
2、項目應用
django項目應用celery,主要有兩種任務方式,一是異步任務(發布者任務),一般是web請求,二是定時任務。
celery組成
Celery是由Python開發、簡單、靈活、可靠的分布式任務隊列,是一個處理異步任務的框架,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery側重于實時操作,但對調度支持也很好,其每天可以處理數以百萬計的任務。特點:
簡單:熟悉celery的工作流程后,配置使用簡單
高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務
快速:一個單進程的celery每分鐘可處理上百萬個任務
靈活:幾乎celery的各個組件都可以被擴展及自定制
Celery由三部分構成:
消息中間件(Broker):官方提供了很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached等,官方推薦RabbitMQ
任務執行單元(Worker):任務執行單元,負責從消息隊列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分布式的核心
結果存儲(Backend):官方提供了諸多的存儲方式支持:RabbitMQ、Redis、Memcached,SQLAlchemy,Django ORM、Apache Cassandra、Elasticsearch等
架構如下:
工作原理:
任務模塊Task包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發并發往消息隊列,而定時任務由Celery Beat進程周期性地將任務發往消息隊列;
任務執行單元Worker實時監視消息隊列獲取隊列中的任務執行;
Woker執行完任務后將結果保存在Backend中;
本文使用的是redis數據庫作為消息中間件和結果存儲數據庫
1.異步任務redis
1.安裝庫
pip install celery pip install redis
2.celery.py
在主項目目錄下,新建celery.py文件:
import os import django from celery import Celery from django.conf import settings #設置系統環境變量,安裝django,必須設置,否則在啟動celery時會報錯 #celery_study是當前項目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE','celery_study.settings') django.setup() celery_app=Celery('celery_study') celery_app.config_from_object('django.conf:settings') celery_app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)
注意:是和settings.py文件同目錄,一定不能建立在項目根目錄,不然會引起celery這個模塊名的命名沖突
同時,在主項目的init.py中,添加如下代碼:
from.celery import celery_app __all__=['celery_app']
3.settings.py
在配置文件中配置對應的redis配置:
#Broker配置,使用Redis作為消息中間件 BROKER_URL='redis://127.0.0.1:6379/0' #BACKEND配置,這里使用redis CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0' #結果序列化方案 CELERY_RESULT_SERIALIZER='json' #任務結果過期時間,秒 CELERY_TASK_RESULT_EXPIRES=60*60*24 #時區配置 CELERY_TIMEZONE='Asia/Shanghai' #指定導入的任務模塊,可以指定多個 #CELERY_IMPORTS=( #'other_dir.tasks', #)
注意:所有配置的官方文檔:Configuration and defaults—Celery 5.2.0b3 documentation
4.tasks.py
在子應用下建立各自對應的任務文件tasks.py(必須是tasks.py這個名字,不允許修改)
from celery import shared_task shared_task def add(x,y): return x+y shared_task def mul(x,y): return x*y shared_task def xsum(numbers): return sum(numbers)
5.調用任務
from.tasks import* #Create your views here. def task_add_view(request): add.delay(100,200) return HttpResponse(f'調用函數結果')
6.啟動celery
pip install eventlet celery-A celery_study worker-l debug-P eventlet
注意:celery_study是項目名
使用redis時,有可能會出現如下類似的異常
AttributeError:'str'object has no attribute'items'
這是由于版本差異,需要卸載已經安裝的python環境中的redis庫,重新指定安裝特定版本(celery4.x以下適用redis2.10.6,celery4.3以上使用redis3.2.0以上):
xxxxxxxxxx pip install redis==2.10.6
7.獲取任務結果
在views.py中,通過AsyncResult.get()獲取結果
from celery import result def get_result_by_taskid(request): task_id=request.GET.get('task_id') #異步執行 ar=result.AsyncResult(task_id) if ar.ready(): return JsonResponse({'status':ar.state,'result':ar.get()}) else: return JsonResponse({'status':ar.state,'result':''})
AsyncResult類的常用的屬性和方法:
state:返回任務狀態,等同status;
task_id:返回任務id;
result:返回任務結果,同get()方法;
ready():判斷任務是否執行以及有結果,有結果為True,否則False;
info():獲取任務信息,默認為結果;
wait(t):等待t秒后獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
successful():判斷任務是否成功,成功為True,否則為False;
2.定時任務
在第一步的異步任務的基礎上,進行部分修改即可
1.settings.py
from celery.schedules import crontab CELERYBEAT_SCHEDULE={ 'mul_every_30_seconds':{ #任務路徑 'task':'celery_app.tasks.mul', #每30秒執行一次 'schedule':5, 'args':(14,5) } }
說明(更多內容見文檔:Periodic Tasks—Celery 5.2.0b3 documentation):
task:任務函數
schedule:執行頻率,可以是整型(秒數),也可以是timedelta對象,也可以是crontab對象,也可以是自定義類(繼承celery.schedules.schedule)
args:位置參數,列表或元組
kwargs:關鍵字參數,字典
options:可選參數,字典,任何apply_async()支持的參數
relative:默認是False,取相對于beat的開始時間;設置為True,則取設置的timedelta時間
在task.py中設置了日志
from celery import shared_task import logging logger=logging.getLogger(__name__)) shared_task def mul(x,y): logger.info('___mul__'*10) return x*y
2.啟動celery
(兩個cmd)分別啟動worker和beat
celery-A worker celery_study-l debug-P eventlet celery beat-A celery_study-l debug
3.任務綁定
Celery可通過task綁定到實例獲取到task的上下文,這樣我們可以在task運行時候獲取到task的狀態,記錄相關日志等
方法:
在裝飾器中加入參數bind=True
在task函數中的第一個參數設置為self
在task.py里面寫
from celery import shared_task import logging logger=logging.getLogger(__name__) #任務綁定 shared_task(bind=True) def add(self,x,y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x+y
其中:self對象是celery.app.task.Task的實例,可以用于實現重試等多種功能
from celery import shared_task import logging logger=logging.getLogger(__name__) #任務綁定 shared_task(bind=True) def add(self,x,y): try: logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) raise Exception except Exception as e: #出錯每4秒嘗試一次,總共嘗試4次 self.retry(exc=e,countdown=4,max_retries=4) return x+y 啟動celery celery-A worker celery_study-l debug-P eventlet
4.任務鉤子
Celery在執行任務時,提供了鉤子方法用于在任務執行完成時候進行對應的操作,在Task源碼中提供了很多狀態鉤子函數如:on_success(成功后執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行)
方法:通過繼承Task類,重寫對應方法即可,
from celery import Task class MyHookTask(Task): def on_success(self,retval,task_id,args,kwargs): logger.info(f'task id:{task_id},arg:{args},successful!') def on_failure(self,exc,task_id,args,kwargs,einfo): logger.info(f'task id:{task_id},arg:{args},failed!erros:{exc}') def on_retry(self,exc,task_id,args,kwargs,einfo): logger.info(f'task id:{task_id},arg:{args},retry!erros:{exc}') #在對應的task函數的裝飾器中,通過base=MyHookTask指定 shared_task(base=MyHookTask,bind=True) def add(self,x,y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x+y
啟動celery
celery-A worker celery_study-l debug-P eventlet
5.任務編排
在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery也能實現這樣的任務,完成這類型的任務通過以下模塊完成:
group:并行調度任務
chain:鏈式任務調度
chord:類似group,但分header和body2個部分,header可以是一個group任務,執行完成后調用body的任務
map:映射調度,通過輸入多個入參來多次調度同一個任務
starmap:類似map,入參類似*args
chunks:將任務按照一定數量進行分組
文檔:Next Steps—Celery 5.2.0b3 documentation
1.group
urls.py:
path('primitive/',views.test_primitive),
views.py:
from.tasks import* from celery import group def test_primitive(request):
#創建10個并列的任務
lazy_group=group(add.s(i,i)for i in range(10))
promise=lazy_group()
result=promise.get()
return JsonResponse({'function':'test_primitive','result':result})
說明:
通過task函數的s方法傳入參數,啟動任務
上面這種方法需要進行等待,如果依然想實現異步的方式,那么就必須在tasks.py中新建一個task方法,調用group,示例如下:
tasks.py:
shared_task def group_task(num): return group(add.s(i,i)for i in range(num))().get() urls.py: path('first_group/',views.first_group), views.py: def first_group(request): ar=tasks.group_task.delay(10) return HttpResponse('返回first_group任務,task_id:'+ar.task_id)
2.chain
默認上一個任務的結果作為下一個任務的第一個參數
def test_primitive(request): #等同調用mul(add(add(2,2),5),8) promise=chain(tasks.add.s(2,2),tasks.add.s(5),tasks.mul.s(8))() #72 result=promise.get() return JsonResponse({'function':'test_primitive','result':result})
3.chord
任務分割,分為header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果作為參數傳遞給body
def test_primitive(request): #header:[3,12] #body:xsum([3,12]) promise=chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())() result=promise.get() return JsonResponse({'function':'test_primitive','result':result})
6、celery管理和監控
celery通過flower組件實現管理和監控功能,flower組件不僅僅提供監控功能,還提供HTTP API可實現對woker和task的管理
官網:flower·PyPI
文檔:Flower-Celery monitoring tool—Flower 1.0.1 documentation
安裝flower
pip install flower
啟動flower
flower-A celery_study--port=5555
說明:
-A:項目名
--port:端口號
訪問
在瀏覽器輸入:http://127.0.0.1:5555
通過api操作
curl http://127.0.0.1:5555/api/workers
到此為止,這篇文章就給大家介紹完畢了,希望能給大家帶來幫助。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/128278.html
摘要:結論執行完任務不釋放內存與原一直沒有被銷毀有關,因此可以適當配置小點,而任務并發數與配置項有關,每增加一個必然增加內存消耗,同時也影響到一個何時被銷毀,因為是均勻調度任務至每個,因此也不宜配置過大,適當配置。 1.實際使用 ? 監控task的執行結果:任務id,結果,traceback,children,任務狀態 ? 配置 backend=redis://127...
摘要:今天介紹一下如何在項目中使用搭建一個有兩個節點的任務隊列一個主節點一個子節點主節點發布任務,子節點收到任務并執行。 今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務并執行。搭建3個或者以上的節點就類似了),使用到了celery,rabbitmq。這里不會單獨介紹celery和rabbitmq中的知識了...
摘要:前言針對高延時任務直接在一次網絡請求中處理完畢會導致很不好的體驗則可以不阻塞請求后臺處理這些任務并且可以使用的進行數據庫操作環境其他創建工程此時項目結構如下修改添加修改創建新創 前言: 針對高延時任務, 直接在一次網絡請求中處理完畢會導致很不好的體驗, celery則可以不阻塞請求后臺處理這些任務, 并且可以使用django的models進行數據庫操作. 環境 python model...
閱讀 919·2023-01-14 11:38
閱讀 891·2023-01-14 11:04
閱讀 750·2023-01-14 10:48
閱讀 2039·2023-01-14 10:34
閱讀 956·2023-01-14 10:24
閱讀 835·2023-01-14 10:18
閱讀 506·2023-01-14 10:09
閱讀 583·2023-01-14 10:02