摘要:源碼之分析的協(xié)程原理分析版本為支持異步,實(shí)現(xiàn)了一個(gè)協(xié)程庫(kù)。提供了回調(diào)函數(shù)注冊(cè)當(dāng)異步事件完成后,調(diào)用注冊(cè)的回調(diào)中間結(jié)果保存結(jié)束結(jié)果返回等功能注冊(cè)回調(diào)函數(shù),當(dāng)被解決時(shí),改回調(diào)函數(shù)被調(diào)用。相當(dāng)于喚醒已經(jīng)處于狀態(tài)的父協(xié)程,通過回調(diào)函數(shù),再執(zhí)行。
tornado 源碼之 coroutine 分析
tornado 的協(xié)程原理分析
版本:4.3.0
為支持異步,tornado 實(shí)現(xiàn)了一個(gè)協(xié)程庫(kù)。
tornado 實(shí)現(xiàn)的協(xié)程框架有下面幾個(gè)特點(diǎn):
支持 python 2.7,沒有使用 yield from
特性,純粹使用 yield 實(shí)現(xiàn)
使用拋出異常的方式從協(xié)程返回值
采用 Future 類代理協(xié)程(保存協(xié)程的執(zhí)行結(jié)果,當(dāng)攜程執(zhí)行結(jié)束時(shí),調(diào)用注冊(cè)的回調(diào)函數(shù))
使用 IOLoop 事件循環(huán),當(dāng)事件發(fā)生時(shí)在循環(huán)中調(diào)用注冊(cè)的回調(diào),驅(qū)動(dòng)協(xié)程向前執(zhí)行
由此可見,這是 python 協(xié)程的一個(gè)經(jīng)典的實(shí)現(xiàn)。
本文將實(shí)現(xiàn)一個(gè)類似 tornado 實(shí)現(xiàn)的基礎(chǔ)協(xié)程框架,并闡述相應(yīng)的原理。
外部庫(kù)使用 time 來實(shí)現(xiàn)定時(shí)器回調(diào)的時(shí)間計(jì)算。
bisect 的 insort 方法維護(hù)一個(gè)時(shí)間有限的定時(shí)器隊(duì)列。
functools 的 partial 方法綁定函數(shù)部分參數(shù)。
使用 backports_abc 導(dǎo)入 Generator 來判斷函數(shù)是否是生成器。
import time import bisect import functools from backports_abc import Generator as GeneratorTypeFuture
是一個(gè)穿梭于協(xié)程和調(diào)度器之間的信使。
提供了回調(diào)函數(shù)注冊(cè)(當(dāng)異步事件完成后,調(diào)用注冊(cè)的回調(diào))、中間結(jié)果保存、結(jié)束結(jié)果返回等功能
add_done_callback 注冊(cè)回調(diào)函數(shù),當(dāng) Future 被解決時(shí),改回調(diào)函數(shù)被調(diào)用。
set_result 設(shè)置最終的狀態(tài),并且調(diào)用已注冊(cè)的回調(diào)函數(shù)
協(xié)程中的每一個(gè) yield 對(duì)應(yīng)一個(gè)協(xié)程,相應(yīng)的對(duì)應(yīng)一個(gè) Future 對(duì)象,譬如:
@coroutine def routine_main(): yield routine_simple() yield sleep(1)
這里的 routine_simple() 和 sleep(1) 分別對(duì)應(yīng)一個(gè)協(xié)程,同時(shí)有一個(gè) Future 對(duì)應(yīng)。
class Future(object): def __init__(self): self._done = False self._callbacks = [] self._result = None def _set_done(self): self._done = True for cb in self._callbacks: cb(self) self._callbacks = None def done(self): return self._done def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def result(self): return self._resultIOLoop
這里的 IOLoop 去掉了 tornado 源代碼中 IO 相關(guān)部分,只保留了基本需要的功能,如果命名為 CoroutineLoop 更貼切。
這里的 IOLoop 提供基本的回調(diào)功能。它是一個(gè)線程循環(huán),在循環(huán)中完成兩件事:
檢測(cè)有沒有注冊(cè)的回調(diào)并執(zhí)行
檢測(cè)有沒有到期的定時(shí)器回調(diào)并執(zhí)行
程序中注冊(cè)的回調(diào)事件,最終都會(huì)在此處執(zhí)行。
可以認(rèn)為,協(xié)程程序本身、協(xié)程的驅(qū)動(dòng)程序 都會(huì)在此處執(zhí)行。
協(xié)程本身使用 wrapper 包裝,并最后注冊(cè)到 IOLoop 的事件回調(diào),所以它的從預(yù)激到結(jié)束的代碼全部在 IOLoop 回調(diào)中執(zhí)行。
而協(xié)程預(yù)激后,會(huì)把 Runner.run() 函數(shù)注冊(cè)到 IOLoop 的事件回調(diào),以驅(qū)動(dòng)協(xié)程向前運(yùn)行。
理解這一點(diǎn)對(duì)于理解協(xié)程的運(yùn)行原理至關(guān)重要。
這就是單線程異步的基本原理。因?yàn)槎荚谝粋€(gè)線程循環(huán)中執(zhí)行,我們可以不用處理多線程需要面對(duì)的各種繁瑣的事情。
IOLoop.start事件循環(huán),回調(diào)事件和定時(shí)器事件在循環(huán)中調(diào)用。
IOLoop.run_sync執(zhí)行一個(gè)協(xié)程。
將 run 注冊(cè)進(jìn)全局回調(diào),在 run 中調(diào)用 func()啟動(dòng)協(xié)程。
注冊(cè)協(xié)程結(jié)束回調(diào) stop, 退出 run_sync 的 start 循環(huán),事件循環(huán)隨之結(jié)束。
class IOLoop(object):, def __init__(self): self._callbacks = [] self._timers = [] self._running = False @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_future(self, future, callback): future.add_done_callback( lambda future: self.add_callback(functools.partial(callback, future))) def add_timeout(self, when, callback): bisect.insort(self._timers, (when, callback)) def call_later(self, delay, callback): return self.add_timeout(time.time() + delay, callback) def add_callback(self, call_back): self._callbacks.append(call_back) def start(self): self._running = True while self._running: # 回調(diào)任務(wù) callbacks = self._callbacks self._callbacks = [] for call_back in callbacks: call_back() # 定時(shí)器任務(wù) while self._timers and self._timers[0][0] < time.time(): task = self._timers[0][1] del self._timers[0] task() def stop(self): self._running = False def run_sync(self, func): future_cell = [None] def run(): try: future_cell[0] = func() except Exception: pass self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) self.start() return future_cell[0].result()coroutine
協(xié)程裝飾器。
協(xié)程由 coroutine 裝飾,分為兩類:
含 yield 的生成器函數(shù)
無 yield 語句的普通函數(shù)
裝飾協(xié)程,并通過注冊(cè)回調(diào)驅(qū)動(dòng)協(xié)程運(yùn)行。
程序中通過 yield coroutine_func() 方式調(diào)用協(xié)程。
此時(shí),wrapper 函數(shù)被調(diào)用:
獲取協(xié)程生成器
如果是生成器,則
調(diào)用 next() 預(yù)激協(xié)程
實(shí)例化 Runner(),驅(qū)動(dòng)協(xié)程
如果是普通函數(shù),則
調(diào)用 set_result() 結(jié)束協(xié)程
協(xié)程返回 Future 對(duì)象,供外層的協(xié)程處理。外部通過操作該 Future 控制協(xié)程的運(yùn)行。
每個(gè) yield 對(duì)應(yīng)一個(gè)協(xié)程,每個(gè)協(xié)程擁有一個(gè) Future 對(duì)象。
外部協(xié)程獲取到內(nèi)部協(xié)程的 Future 對(duì)象,如果內(nèi)部協(xié)程尚未結(jié)束,將 Runner.run() 方法注冊(cè)到 內(nèi)部協(xié)程的 Future 的結(jié)束回調(diào)。
這樣,在內(nèi)部協(xié)程結(jié)束時(shí),會(huì)調(diào)用注冊(cè)的 run() 方法,從而驅(qū)動(dòng)外部協(xié)程向前執(zhí)行。
各個(gè)協(xié)程通過 Future 形成一個(gè)鏈?zhǔn)交卣{(diào)關(guān)系。
Runner 類在下面多帶帶小節(jié)描述。
def coroutine(func): return _make_coroutine_wrapper(func) # 每個(gè)協(xié)程都有一個(gè) future, 代表當(dāng)前協(xié)程的運(yùn)行狀態(tài) def _make_coroutine_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): future = Future() try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: return future else: if isinstance(result, GeneratorType): try: yielded = next(result) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: pass else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper協(xié)程返回值
因?yàn)闆]有使用 yield from,協(xié)程無法直接返回值,所以使用拋出異常的方式返回。
python 2 無法在生成器中使用 return 語句。但是生成器中拋出的異常可以在外部 send() 語句中捕獲。
所以,使用拋出異常的方式,將返回值存儲(chǔ)在異常的 value 屬性中,拋出。外部使用諸如:
try: yielded = gen.send(value) except Return as e:
這樣的方式獲取協(xié)程的返回值。
class Return(Exception): def __init__(self, value=None): super(Return, self).__init__() self.value = value self.args = (value,)Runner
Runner 是協(xié)程的驅(qū)動(dòng)器類。
self.result_future 保存當(dāng)前協(xié)程的狀態(tài)。
self.future 保存 yield 子協(xié)程傳遞回來的協(xié)程狀態(tài)。
從子協(xié)程的 future 獲取協(xié)程運(yùn)行結(jié)果 send 給當(dāng)前協(xié)程,以驅(qū)動(dòng)協(xié)程向前執(zhí)行。
注意,會(huì)判斷子協(xié)程返回的 future
如果 future 已經(jīng) set_result,代表子協(xié)程運(yùn)行結(jié)束,回到 while Ture 循環(huán),繼續(xù)往下執(zhí)行下一個(gè) send;
如果 future 未 set_result,代表子協(xié)程運(yùn)行未結(jié)束,將 self.run 注冊(cè)到子協(xié)程結(jié)束的回調(diào),這樣,子協(xié)程結(jié)束時(shí)會(huì)調(diào)用 self.run,重新驅(qū)動(dòng)協(xié)程執(zhí)行。
如果本協(xié)程 send() 執(zhí)行過程中,捕獲到 StopIteration 或者 Return 異常,說明本協(xié)程執(zhí)行結(jié)束,設(shè)置 result_future 的協(xié)程返回值,此時(shí),注冊(cè)的回調(diào)函數(shù)被執(zhí)行。這里的回調(diào)函數(shù)為本協(xié)程的父協(xié)程所注冊(cè)的 run()。
相當(dāng)于喚醒已經(jīng)處于 yiled 狀態(tài)的父協(xié)程,通過 IOLoop 回調(diào) run 函數(shù),再執(zhí)行 send()。
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.io_loop = IOLoop.instance() self.running = False self.future = None if self.handle_yield(first_yielded): self.run() def run(self): try: self.running = True while True: try: # 每一個(gè) yield 處看做一個(gè)協(xié)程,對(duì)應(yīng)一個(gè) Future # 將該協(xié)程的結(jié)果 send 出去 # 這樣外層形如 ret = yiled coroutine_func() 能夠獲取到協(xié)程的返回?cái)?shù)據(jù) value = self.future.result() yielded = self.gen.send(value) except (StopIteration, Return) as e: # 協(xié)程執(zhí)行完成,不再注冊(cè)回調(diào) self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None return except Exception: return # 協(xié)程未執(zhí)行結(jié)束,繼續(xù)使用 self.run() 進(jìn)行驅(qū)動(dòng) if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): self.future = yielded if not self.future.done(): # 給 future 增加執(zhí)行結(jié)束回調(diào)函數(shù),這樣,外部使用 future.set_result 時(shí)會(huì)調(diào)用該回調(diào) # 而該回調(diào)是把 self.run() 注冊(cè)到 IOLoop 的事件循環(huán) # 所以,future.set_result 會(huì)把 self.run() 注冊(cè)到 IOLoop 的事件循環(huán),從而在下一個(gè)事件循環(huán)中調(diào)用 self.io_loop.add_future( self.future, lambda f: self.run()) return False return Truesleep
sleep 是一個(gè)延時(shí)協(xié)程,充分展示了協(xié)程的標(biāo)準(zhǔn)實(shí)現(xiàn)。
創(chuàng)建一個(gè) Future,并返回給外部協(xié)程;
外部協(xié)程發(fā)現(xiàn)是一個(gè)未完的狀態(tài),將 run()注冊(cè)到 Future 的完成回調(diào),同時(shí)外部協(xié)程被掛起;
在設(shè)置的延時(shí)后,IOLoop 會(huì)回調(diào) set_result 結(jié)束協(xié)程;
IOLoop 調(diào)用 run() 函數(shù);
IOLoop 調(diào)用 send(),喚醒掛起的外部協(xié)程。
流程如下圖:
def sleep(duration): f = Future() IOLoop.instance().call_later(duration, lambda: f.set_result(None)) return f運(yùn)行
@coroutine def routine_ur(url, wait): yield sleep(wait) print("routine_ur {} took {}s to get!".format(url, wait)) @coroutine def routine_url_with_return(url, wait): yield sleep(wait) print("routine_url_with_return {} took {}s to get!".format(url, wait)) raise Return((url, wait)) # 非生成器協(xié)程,不會(huì)為之生成多帶帶的 Runner() # coroutine 運(yùn)行結(jié)束后,直接返回一個(gè)已經(jīng)執(zhí)行結(jié)束的 future @coroutine def routine_simple(): print("it is simple routine") @coroutine def routine_simple_return(): print("it is simple routine with return") raise Return("value from routine_simple_return") @coroutine def routine_main(): yield routine_simple() yield routine_ur("url0", 1) ret = yield routine_simple_return() print(ret) ret = yield routine_url_with_return("url1", 1) print(ret) ret = yield routine_url_with_return("url2", 2) print(ret) if __name__ == "__main__": IOLoop.instance().run_sync(routine_main)
運(yùn)行輸出為:
it is simple routine routine_ur url0 took 1s to get! it is simple routine with return value from routine_simple_return routine_url_with_return url1 took 1s to get! ("url1", 1) routine_url_with_return url2 took 2s to get! ("url2", 2)
可以觀察到協(xié)程 sleep 已經(jīng)生效。
源碼simple_coroutine.py
copyrightauthor:bigfish
copyright: 許可協(xié)議 知識(shí)共享署名-非商業(yè)性使用 4.0 國(guó)際許可協(xié)議
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/45036.html
摘要:清楚了以上流程,我們直接來看函數(shù)主要用作初始化應(yīng)用監(jiān)聽端口以及啟動(dòng)。其中就是保存聊天室所有聊天消息的結(jié)構(gòu)。關(guān)于的解讀我會(huì)放到閱讀源碼時(shí)講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應(yīng)用: chatdemo 首先看 chatdemo 的目錄結(jié)構(gòu): ├── chatdemo.py ├── ...
摘要:序言最近閑暇無事閱讀了一下的源碼對(duì)整體的結(jié)構(gòu)有了初步認(rèn)識(shí)與大家分享不知道為什么右邊的目錄一直出不來非常不舒服不如移步到吧是的核心模塊也是個(gè)調(diào)度模塊各種異步事件都是由他調(diào)度的所以必須弄清他的執(zhí)行邏輯源碼分析而的核心部分則是這個(gè)循環(huán)內(nèi)部的邏輯貼 序言 最近閑暇無事,閱讀了一下tornado的源碼,對(duì)整體的結(jié)構(gòu)有了初步認(rèn)識(shí),與大家分享 不知道為什么右邊的目錄一直出不來,非常不舒服. 不如移...
摘要:前言本文將嘗試詳細(xì)的帶大家一步步走完一個(gè)異步操作從而了解是如何實(shí)現(xiàn)異步的其實(shí)本文是對(duì)上一篇文的實(shí)踐和復(fù)習(xí)主旨在于關(guān)注異步的實(shí)現(xiàn)所以會(huì)忽略掉代碼中的一些異常處理文字較多湊合下吧接下來只會(huì)貼出部分源碼幫助理解希望有耐心的同學(xué)打開源碼一起跟蹤一遍 前言 本文將嘗試詳細(xì)的帶大家一步步走完一個(gè)異步操作,從而了解tornado是如何實(shí)現(xiàn)異步io的. 其實(shí)本文是對(duì)[上一篇文][1]的實(shí)踐和復(fù)習(xí) 主...
摘要:前言俗話說光說不練假把式上一篇文里都只是光看著別人的源碼說貌似有點(diǎn)紙上談兵的意思所以這次寫一個(gè)簡(jiǎn)單的自己定義協(xié)議的既可以熟悉和的用法又可以在去除了復(fù)雜的協(xié)議后了解的工作原理代碼不多加上空行和也就行不到在上的源碼點(diǎn)這里目標(biāo)定義一個(gè)簡(jiǎn)單的協(xié)議達(dá) 前言 俗話說光說不練假把式,上一篇文里都只是光看著別人的源碼說,貌似有點(diǎn)紙上談兵的意思. 所以這次寫一個(gè)簡(jiǎn)單的,自己定義協(xié)議的server. 既...
這篇文章摘自我的博客, 歡迎大家沒事去逛逛~ 背景 這幾個(gè)月我開發(fā)了公司里的一個(gè)restful webservice,起初技術(shù)選型的時(shí)候是采用了flask框架。雖然flask是一個(gè)同步的框架,但是可以配合gevent或者其它方式運(yùn)行在異步的容器中(測(cè)試鏈接),效果看上去也還可以,因此就采用了這種方式。 后面閱讀了tornado的源碼,也去了解了各種協(xié)程框架以及運(yùn)行的原理。總感覺flask的這種同步...
閱讀 4956·2023-04-25 18:47
閱讀 2687·2021-11-19 11:33
閱讀 3456·2021-11-11 16:54
閱讀 3111·2021-10-26 09:50
閱讀 2558·2021-10-14 09:43
閱讀 679·2021-09-03 10:47
閱讀 685·2019-08-30 15:54
閱讀 1512·2019-08-30 15:44