国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

在Celery中使用Flask的上下文

Sourcelink / 1087人閱讀

摘要:所以這就現(xiàn)實(shí)了在中使用的應(yīng)用上下文。要引入請(qǐng)求上下文,需要考慮這兩個(gè)問題如何在中產(chǎn)生請(qǐng)求上下文。中有和可以產(chǎn)生請(qǐng)求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請(qǐng)求上下文可以覆蓋大多數(shù)的使用情況。

其實(shí)我只是想把郵件發(fā)送這個(gè)動(dòng)作移到Celery中執(zhí)行。
既然用到了Celery,那么每次發(fā)郵件都多帶帶開一個(gè)線程似乎有點(diǎn)多余,異步任務(wù)還是交給Celery吧。

在Flask應(yīng)用中集成Celery

Celery和Flask一起使用并沒有什么不和諧的地方,都可以不用定制的Flask擴(kuò)展,按照網(wǎng)上隨處可見的示例也很簡(jiǎn)單:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0"

celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)

@celery.task
def send_email():
    ....

然而,稍微上點(diǎn)規(guī)模的Flask應(yīng)用都會(huì)使用Factory模式(中文叫工廠函數(shù),我聽著特別扭),即只有在創(chuàng)建Flask實(shí)例時(shí),才會(huì)初始化各種擴(kuò)展,這樣可以動(dòng)態(tài)的修改擴(kuò)展程序的配置。比如你有一套線上部署的配置和一套本地開發(fā)測(cè)試的配置,希望通過不同的啟動(dòng)入口,就使用不同的配置。
使用Factory模式的話,上面的代碼大概要修改成這個(gè)樣:

from flask import Flask
from celery import Celery

app = Flask(__name__)
celery = Celery()

def create_app(config_name):
    app.config.from_object(config[config_name])
    celery.conf.update(app.config)

通過config_name,來動(dòng)態(tài)調(diào)整celery的配置。然而,這樣子是不行的!
Celery的__init__()函數(shù)會(huì)調(diào)用celery._state._register_app()直接就通過傳入的配置生成了Celery實(shí)例,上面的代碼中,celery = Celery()直接使用默認(rèn)的amqp作為了broker,隨后通過celery.conf.update(app.config)是更改不了broker的。這也就是為什么網(wǎng)上的示例代碼中,在定義Celery實(shí)例時(shí),就傳入了broker=app.config["CELERY_BROKER_URL"],而不是之后通過celery.conf.update(app.config)傳入。當(dāng)你的多套配置文件中,broker設(shè)置的不同時(shí),就悲劇了。

當(dāng)然不用自己造輪子,F(xiàn)lask-Celery-Helper就是解決以上問題的FLask擴(kuò)展。
看看它的__init__()函數(shù):

def __init__(self, app=None):
        """If app argument provided then initialize celery using application config values.
        If no app argument provided you should do initialization later with init_app method.
        :param app: Flask application instance.
        """
        self.original_register_app = _state._register_app  # Backup Celery app registration function.
        _state._register_app = lambda _: None  # Upon Celery app registration attempt, do nothing.
        super(Celery, self).__init__()
        if app is not None:
            self.init_app(app)

_state._register_app函數(shù)備份,再置為空。這樣__init__()就不會(huì)創(chuàng)建Celery實(shí)例了。但如果指定了app,那么進(jìn)入init_app,嗯,大多數(shù)Flask擴(kuò)展都有這個(gè)函數(shù),用來動(dòng)態(tài)生成擴(kuò)展實(shí)例。

def init_app(self, app):
        """Actual method to read celery settings from app configuration and initialize the celery instance.
        :param app: Flask application instance.
        """
        _state._register_app = self.original_register_app  # Restore Celery app registration function.
        if not hasattr(app, "extensions"):
            app.extensions = dict()
        if "celery" in app.extensions:
            raise ValueError("Already registered extension CELERY.")
        app.extensions["celery"] = _CeleryState(self, app)

        # Instantiate celery and read config.
        super(Celery, self).__init__(app.import_name, broker=app.config["CELERY_BROKER_URL"])
        ...

_state._register_app函數(shù)還原,再執(zhí)行Celery原本的__init__。這樣就達(dá)到動(dòng)態(tài)生成實(shí)例的目的了。接著往下看:

task_base = self.Task

# Add Flask app context to celery instance.
class ContextTask(task_base):
    """Celery instance wrapped within the Flask app context."""
    def __call__(self, *_args, **_kwargs):
        with app.app_context():
            return task_base.__call__(self, *_args, **_kwargs)
setattr(ContextTask, "abstract", True)
setattr(self, "Task", ContextTask)

這里重載了celery.Task類,通過with app.app_context():,在app.app_context()的上下文環(huán)境下執(zhí)行Task。對(duì)于一個(gè)已生成的Flask實(shí)例,應(yīng)用上下文不會(huì)隨便改變。所以這就現(xiàn)實(shí)了在Celery中使用Flask的應(yīng)用上下文。
下面是官方的示例代碼:

# extensions.py
from flask_celery import Celery
celery = Celery()

# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config["CELERY_IMPORTS"] = ("tasks.add_together", )
    app.config["CELERY_BROKER_URL"] = "redis://localhost"
    app.config["CELERY_RESULT_BACKEND"] = "redis://localhost"
    celery.init_app(app)
    return app
    
# tasks.py
from extensions import celery

@celery.task()
def add_together(a, b):
    return a + b

# manage.py
from application import create_app
app = create_app()
app.run()

跟普通的Flask擴(kuò)展一樣了。

Celery中使用Flask上下文

在Flask的view函數(shù)中調(diào)用task.delay()時(shí),這個(gè)task相當(dāng)于一個(gè)離線的異步任務(wù),它對(duì)Flask的應(yīng)用上下文和請(qǐng)求上下文一無所知。但是這都可能是異步任務(wù)需要用到的。比如發(fā)送郵件要用到的render_templateurl_for就分別要用到應(yīng)用上下文和請(qǐng)求上下文。不在celery中引入它們的話,就是Running code outside of a request。
引入應(yīng)用上下文的工作Flask-Celery-Helper已經(jīng)幫我們做好了,在Flask的文檔中也有相關(guān)介紹。實(shí)現(xiàn)方法和上面Flask-Celery-Helper的一樣。然而,不管是Flask-Celery-Helper還是Flask文檔,都沒有提及如何在Celery中使用請(qǐng)求上下文。

要引入請(qǐng)求上下文,需要考慮這兩個(gè)問題:

如何在Celery中產(chǎn)生請(qǐng)求上下文。Flask中有request_contexttest_request_context可以產(chǎn)生請(qǐng)求上下文。區(qū)別是request_context需要WSGI環(huán)境變量environ,而test_request_context根據(jù)傳入的參數(shù)生成請(qǐng)求上下文。我沒有找到如何在Celery中獲取到WSGI環(huán)境變量的方法,所以只能自己傳入相關(guān)參數(shù)生成請(qǐng)求上下文了。

請(qǐng)求上下文是隨HTTP請(qǐng)求產(chǎn)生的,要獲取請(qǐng)求上下文,就必須在view函數(shù)中處理,view函數(shù)通過task.delay()發(fā)送Celery任務(wù)。所以需要重載task.delay(),以獲取請(qǐng)求上下文。

具體的思路還是在init_app中重載celery.Task類,通過with app.test_request_context():,在app.test_request_context()的上下文環(huán)境下執(zhí)行Task。
首先獲取request,從中整理出test_request_context()需要的參數(shù)。根據(jù)test_request_context的函數(shù)注釋,它需要的參數(shù)和werkzeug.test.EnvironBuilder類的參數(shù)一樣。

CONTEXT_ARG_NAME = "_flask_request_context"
def _include_request_context(self, kwargs):
    """Includes all the information about current Flask request context
    as an additional argument to the task.
    """
    if not has_request_context():
        return

    # keys correspond to arguments of :meth:`Flask.test_request_context`
    context = {
        "path": request.path,
        "base_url": request.url_root,
        "method": request.method,
        "headers": dict(request.headers),
        "data": request.form
    }
    if "?" in request.url:
        context["query_string"] = request.url[(request.url.find("?") + 1):]

    kwargs[self.CONTEXT_ARG_NAME] = context

_include_request_context函數(shù)從request中提取path,base_url,method,headers,data,query_string。將他們傳入test_request_context,生成偽造的請(qǐng)求上下文可以覆蓋大多數(shù)的使用情況。
Celery通過apply_async,apply,retry調(diào)用異步任務(wù)(delayapply_async的簡(jiǎn)化方法)。這里需要重載它們,讓這些函數(shù)獲取request:

def apply_async(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply_async(args, kwargs, **rest)

def apply(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply(args, kwargs, **rest)

def retry(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).retry(args, kwargs, **rest)

最后重載celery.Task__call__方法:

def __call__(self, *args, **kwargs):
    """Execute task code with given arguments."""
    call = lambda: super(ContextTask, self).__call__(*args, **kwargs)

    context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
    if context is None or has_request_context():
        return call()

    with app.test_request_context(**context):
        result = call()

        # process a fake "Response" so that
        # ``@after_request`` hooks are executed
        app.process_response(make_response(result or ""))

    return result

context是我們從request中獲取的參數(shù),將它傳給test_request_context,偽造請(qǐng)求上下文,并在這個(gè)上下文環(huán)境中執(zhí)行task。既然偽造了請(qǐng)求,那也得為這個(gè)假請(qǐng)求生成響應(yīng),萬一你定義了after_request這個(gè)在響應(yīng)后執(zhí)行的鉤子呢?通過process_response就可以激活after_request。
注意這里并沒有傳入應(yīng)用上下文,因?yàn)镕lask在創(chuàng)建請(qǐng)求上下文時(shí),會(huì)判斷應(yīng)用上下文是否為空,為空就先創(chuàng)建應(yīng)用上下文,再創(chuàng)建請(qǐng)求上下文。

完整代碼在這里。
celery = CeleryWithContext()創(chuàng)建的Celery實(shí)例就可以給各種task使用了。
另外創(chuàng)建一個(gè)celery_worker.py文件,生成一個(gè)Flask實(shí)例,供Celery的worker使用。

# celery_worker.py

#!/usr/bin/env python
from app import create_app
from app.extensions import celery

app = create_app()

啟動(dòng)worker:celery -A celery_worker.celery worker -l info
這下就可以使用Celery發(fā)郵件了。唉,還真是麻煩。

reference

http://xion.io/post/code/celery-include-flask-request-context.html

博客地址

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/38572.html

相關(guān)文章

  • 基于websocketcelery任務(wù)狀態(tài)監(jiān)控

    摘要:目的曾經(jīng)想向前臺(tái)實(shí)時(shí)返回任務(wù)的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個(gè)小的,實(shí)現(xiàn)前臺(tái)實(shí)時(shí)獲取后臺(tái)傳輸?shù)娜蝿?wù)狀態(tài)。實(shí)現(xiàn)仿照其他例子實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的后臺(tái)任務(wù)監(jiān)控。 1. 目的曾經(jīng)想向前臺(tái)實(shí)時(shí)返回Celery任務(wù)的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個(gè)小的demo,實(shí)現(xiàn)前臺(tái)實(shí)時(shí)獲取后...

    microelec 評(píng)論0 收藏0
  • Flask+Celery+Redis實(shí)現(xiàn)隊(duì)列化異步任務(wù)

    摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。不存儲(chǔ)任務(wù)狀態(tài)。標(biāo)識(shí)要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲(chǔ)后端用于此任務(wù)。 概述: ????????我們考慮一個(gè)場(chǎng)景,公司有一個(gè)需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...

    Ali_ 評(píng)論0 收藏0
  • 基于Flask-Angular項(xiàng)目組網(wǎng)架構(gòu)與部署

    摘要:基于網(wǎng),分享項(xiàng)目的組網(wǎng)架構(gòu)和部署。項(xiàng)目組網(wǎng)架構(gòu)架構(gòu)說明流項(xiàng)目訪問分為兩個(gè)流,通過分兩個(gè)端口暴露給外部使用數(shù)據(jù)流用戶訪問網(wǎng)站。通過進(jìn)行配置,使用作為異步隊(duì)列來存儲(chǔ)任務(wù),并將處理結(jié)果存儲(chǔ)在中。 基于Raindrop網(wǎng),分享項(xiàng)目的組網(wǎng)架構(gòu)和部署。 項(xiàng)目組網(wǎng)架構(gòu) showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...

    kelvinlee 評(píng)論0 收藏0
  • Flask 下載文名文件

    摘要:解決辦法如下測(cè)試表格我們從引入,首先對(duì)文件名進(jìn)行編碼,然后中作為的參數(shù),這時(shí)候能成功下載文件,但是文件名是編碼后的名字,要解碼的話,我們需要在里面聲明編碼格式,即這樣的話,對(duì)文件名進(jìn)行解碼,我們的文件名就是中文了。 在寫 flask 后端的時(shí)候,特別是在做數(shù)據(jù)相關(guān)的操作的時(shí)候,產(chǎn)品往往需要我們做一個(gè)導(dǎo)出數(shù)據(jù)的需求,一般都是導(dǎo)出 excel 格式的文件。 那在 flask 上,如何實(shí)現(xiàn)請(qǐng)...

    harriszh 評(píng)論0 收藏0
  • 手把手教你如何用Crawlab構(gòu)建技術(shù)文章聚合平臺(tái)(一)

    摘要:本文將介紹如何使用和抓取主流的技術(shù)博客文章,然后用搭建一個(gè)小型的技術(shù)文章聚合平臺(tái)。是谷歌開源的基于和的自動(dòng)化測(cè)試工具,可以很方便的讓程序模擬用戶的操作,對(duì)瀏覽器進(jìn)行程序化控制。相對(duì)于,是新的開源項(xiàng)目,而且是谷歌開發(fā),可以使用很多新的特性。 背景 說到爬蟲,大多數(shù)程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯(cuò),而且有很強(qiáng)大的生態(tài)圈,有g(shù)erapy等優(yōu)秀的可視化界面。但...

    LinkedME2016 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<