摘要:序言最近閑暇無事閱讀了一下的源碼對(duì)整體的結(jié)構(gòu)有了初步認(rèn)識(shí)與大家分享不知道為什么右邊的目錄一直出不來非常不舒服不如移步到吧是的核心模塊也是個(gè)調(diào)度模塊各種異步事件都是由他調(diào)度的所以必須弄清他的執(zhí)行邏輯源碼分析而的核心部分則是這個(gè)循環(huán)內(nèi)部的邏輯貼
序言
最近閑暇無事,閱讀了一下tornado的源碼,對(duì)整體的結(jié)構(gòu)有了初步認(rèn)識(shí),與大家分享 不知道為什么右邊的目錄一直出不來,非常不舒服. 不如移步到oschina吧....[http://my.oschina.net/abc2001x/blog/476349][1]ioloop
`ioloop`是`tornado`的核心模塊,也是個(gè)調(diào)度模塊,各種異步事件都是由他調(diào)度的,所以必須弄清他的執(zhí)行邏輯源碼分析
而`ioloop`的核心部分則是 `while True`這個(gè)循環(huán)內(nèi)部的邏輯,貼上他的代碼如下
def start(self): if self._running: raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() self._running = True old_wakeup_fd = None if hasattr(signal, "set_wakeup_fd") and os.name == "posix": try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: with self._callback_lock: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None if self._callbacks: poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd)
除去注釋,代碼其實(shí)沒多少行. 由while 內(nèi)部代碼可以看出ioloop主要由三部分組成:1.回調(diào) callbacks
他是ioloop回調(diào)的基礎(chǔ)部分,通過IOLoop.instance().add_callback()添加到self._callbacks
他們將在每一次loop中被運(yùn)行.
主要用途是將邏輯分塊,在適合時(shí)機(jī)將包裝好的callback添加到self._callbacks讓其執(zhí)行.
例如ioloop中的add_future
def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
future對(duì)象得到result的時(shí)候會(huì)調(diào)用future.add_done_callback添加的callback,再將其轉(zhuǎn)至ioloop執(zhí)行
2.定時(shí)器 due_timeouts這是定時(shí)器,在指定的事件執(zhí)行callback.
跟1中的callback類似,通過IOLoop.instance().add_callback
在每一次循環(huán),會(huì)計(jì)算timeouts回調(diào)列表里的事件,運(yùn)行已到期的callback.
當(dāng)然不是無節(jié)操的循環(huán).
因?yàn)?b>poll操作會(huì)阻塞到有io操作發(fā)生,所以只要計(jì)算最近的timeout,
然后用這個(gè)時(shí)間作為self._impl.poll(poll_timeout) 的 poll_timeout ,
就可以達(dá)到按時(shí)運(yùn)行了
但是,假設(shè)poll_timeout的時(shí)間很大時(shí),self._impl.poll一直在堵塞中(沒有io事件,但在處理某一個(gè)io事件),
那添加剛才1中的callback不是要等很久才會(huì)被運(yùn)行嗎? 答案當(dāng)然是不會(huì).
ioloop中有個(gè)waker對(duì)象,他是由兩個(gè)fd組成,一個(gè)讀一個(gè)寫.
ioloop在初始化的時(shí)候把waker綁定到epoll里了,add_callback時(shí)會(huì)觸發(fā)waker的讀寫.
這樣ioloop就會(huì)在poll中被喚醒了,接著就可以及時(shí)處理timeout callback了
用這樣的方式也可以自己封裝一個(gè)小的定時(shí)器功能玩玩
3.io事件的event loop處理epoll事件的功能
通過IOLoop.instance().add_handler(fd, handler, events)綁定fd event的處理事件
在httpserver.listen的代碼內(nèi),
netutil.py中的netutil.py的add_accept_handler綁定accept handler處理客戶端接入的邏輯
如法炮制,其他的io事件也這樣綁定,業(yè)務(wù)邏輯的分塊交由ioloop的callback和future處理
關(guān)于epoll的用法的內(nèi)容.詳情見我第一篇文章吧,哈哈
總結(jié)ioloop由callback(業(yè)務(wù)分塊), timeout callback(定時(shí)任務(wù)) io event(io傳輸和解析) 三塊組成,互相配合完成異步的功能,構(gòu)建gen,httpclient,iostream等功能
串聯(lián)大致的流程是,tornado 綁定io event,處理io傳輸解析,傳輸完成后(結(jié)合Future)回調(diào)(callback)業(yè)務(wù)處理的邏輯和一些固定操作 . 定時(shí)器則是較為獨(dú)立的模塊
Futrue個(gè)人認(rèn)為Future是tornado僅此ioloop重要的模塊,他貫穿全文,所有異步操作都有他的身影
顧名思義,他主要是關(guān)注日后要做的事,類似jquery的Deferred吧
一般的用法是通過ioloop的add_future定義future的done callback,
當(dāng)future被set_result的時(shí)候,future的done callback就會(huì)被調(diào)用.
從而完成Future的功能.
具體可以參考gen.coroutine的實(shí)現(xiàn),本文后面也會(huì)講到
他的組成不復(fù)雜,只有幾個(gè)重要的方法
最重要的是 add_done_callback , set_result
tornado用Future和ioloop,yield實(shí)現(xiàn)了gen.coroutine
1. add_done_callback跟ioloop的callback類似 , 存儲(chǔ)事件完成后的callback在self._callbacks里
def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn)2.set_result
設(shè)置事件的結(jié)果,并運(yùn)行之前存儲(chǔ)好的callback
def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception("Exception in callback %r for %r", cb, self) self._callbacks = None
為了驗(yàn)證之前所說的,上一段測試代碼
#! /usr/bin/env python #coding=utf-8 import tornado.web import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future def test(): def pp(s): print s future = Future() iol = tornado.ioloop.IOLoop.instance() print "init future %s"%future iol.add_future(future, lambda f: pp("ioloop callback after future done,future is %s"%f)) #模擬io延遲操作 iol.add_timeout(iol.time()+5,lambda:future.set_result("set future is done")) print "init complete" tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": test()
運(yùn)行結(jié)果:
gen.coroutine接著繼續(xù)延伸,看看coroutine的實(shí)現(xiàn)
gen.coroutine實(shí)現(xiàn)的功能其實(shí)是將原來的callback的寫法,用yield的寫法代替. 即以yield為分界,將代碼分成兩部分.
如:
#! /usr/bin/env python #coding=utf-8 import tornado.ioloop from tornado.gen import coroutine from tornado.httpclient import AsyncHTTPClient @coroutine def cotest(): client = AsyncHTTPClient() res = yield client.fetch("http://www.segmentfault.com/") print res if __name__ == "__main__": f = cotest() print f #這里返回了一個(gè)future哦 tornado.ioloop.IOLoop.instance().start()
運(yùn)行結(jié)果:
源碼分析接下來分析下coroutine的實(shí)現(xiàn)
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() if replace_callback and "callback" in kwargs: callback = kwargs.pop("callback") IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = getattr(e, "value", None) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, types.GeneratorType): try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( "stack_context inconsistency (probably caused " "by yield within a "with StackContext" block)")) except (StopIteration, Return) as e: future.set_result(getattr(e, "value", None)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
如源碼所示,func運(yùn)行的結(jié)果是GeneratorType ,yielded = next(result),
運(yùn)行至原函數(shù)的yield位置,返回的是原函數(shù)func內(nèi)部 yield 右邊返回的對(duì)象(必須是Future或Future的list)給yielded.
經(jīng)過Runner(result, future, yielded) 對(duì)yielded進(jìn)行處理.
在此就 貼出Runner的代碼了.
Runner初始化過程,調(diào)用handle_yield, 查看yielded是否已done了,否則add_future運(yùn)行Runner的run方法,
run方法中如果yielded對(duì)象已完成,用對(duì)它的gen調(diào)用send,發(fā)送完成的結(jié)果.
所以yielded在什么地方被set_result非常重要,
當(dāng)被set_result的時(shí)候,才會(huì)send結(jié)果給原func,完成整個(gè)異步操作
詳情可以查看tornado 中重要的對(duì)象 iostream,源碼中iostream的 _handle_connect,如此設(shè)置了連接的result.
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) if self._connect_future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None future.set_result(self) self._connecting = False
最后貼上一個(gè)簡單的測試代碼,演示coroutine,future的用法
import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future @coroutine def asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() future2 = Future() iol = tornado.ioloop.IOLoop.instance() print future def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2) iol.add_timeout(iol.time()+3,callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result)) yield future2 print "after future2" def main(): f = asyn_sum(2,3) print "" print f tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
運(yùn)行結(jié)果:
為什么代碼中個(gè)yield都起作用了? 因?yàn)?b>Runner.run里,最后繼續(xù)用handle_yield處理了send后返回的yielded對(duì)象,意思是func里可以有n干個(gè)yield操作
if not self.handle_yield(yielded): return總結(jié)
至此,已完成tornado中重要的幾個(gè)模塊的流程,其他模塊也是由此而來.寫了這么多,越寫越卡,就到此為止先吧,
最后的最后的最后啊~~~~~~好想有份工作 和女朋友啊~~~~~
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/37553.html
摘要:學(xué)習(xí)筆記七數(shù)學(xué)形態(tài)學(xué)關(guān)注的是圖像中的形狀,它提供了一些方法用于檢測形狀和改變形狀。學(xué)習(xí)筆記十一尺度不變特征變換,簡稱是圖像局部特征提取的現(xiàn)代方法基于區(qū)域圖像塊的分析。本文的目的是簡明扼要地說明的編碼機(jī)制,并給出一些建議。 showImg(https://segmentfault.com/img/bVRJbz?w=900&h=385); 前言 開始之前,我們先來看這樣一個(gè)提問: pyth...
摘要:軟件開發(fā)者通常依據(jù)特定的框架實(shí)現(xiàn)更為復(fù)雜的商業(yè)運(yùn)用和業(yè)務(wù)邏輯。所有,做開發(fā),要用一個(gè)框架。的性能是相當(dāng)優(yōu)異的,因?yàn)樗鼛熗浇鉀Q一個(gè)被稱之為問題,就是處理大于或等于一萬的并發(fā)。 One does not live by bread alone,but by every word that comes from the mouth of God --(MATTHEW4:4) 不...
摘要:初步分析提升可從兩方面入手,一個(gè)是增加并發(fā)數(shù),其二是減少平均響應(yīng)時(shí)間。大部分的時(shí)間花在系統(tǒng)與數(shù)據(jù)庫的交互上,到這,便有了一個(gè)優(yōu)化的主題思路最大限度的降低平均響應(yīng)時(shí)間。不要輕易否定一項(xiàng)公認(rèn)的技術(shù)真理,要拿數(shù)據(jù)說話。 本文最早發(fā)表于個(gè)人博客:PylixmWiki 應(yīng)項(xiàng)目的需求,我們使用tornado開發(fā)了一個(gè)api系統(tǒng),系統(tǒng)開發(fā)完后,在8核16G的虛機(jī)上經(jīng)過壓測qps只有200+。與我們當(dāng)...
摘要:特別提醒,看官不要自宮,因?yàn)楸窘坛滩皇潜傩皠ψV,也不是葵花寶典,撰寫本課程的人更是生理健全者。直到目前,科學(xué)上尚未有證實(shí)或證偽自宮和寫程序之間是否存在某種因果關(guān)系。和是中用的最多的方法啦。 Do not store up for yourselves treasures on earth, where moth and rust consume and where thieves...
閱讀 2775·2021-11-17 09:33
閱讀 3110·2021-10-25 09:44
閱讀 1217·2021-10-11 10:59
閱讀 2410·2021-09-27 13:34
閱讀 2918·2021-09-07 10:19
閱讀 2148·2019-08-29 18:46
閱讀 1542·2019-08-29 12:55
閱讀 938·2019-08-23 17:11