摘要:最近打算學習的源碼,所以就建立一個系列主題深入理解。那么就是最底層的實現。的實現基于,那么什么是是內核為處理大批量文件描述符而作了改進的。經過分析,我們可以看到,實際上是對的封裝,并加入了一些對上層事件的處理和相關的底層處理。
最近打算學習 tornado 的源碼,所以就建立一個系列主題 “深入理解 tornado”。 在此記錄學習經歷及個人見解與大家分享。文中一定會出現理解不到位或理解錯誤的地方,還請大家多多指教
進入正題:
tornado 優秀的大并發處理能力得益于它的 web server 從底層開始就自己實現了一整套基于 epoll 的單線程異步架構(其他 python web 框架的自帶 server 基本是基于 wsgi 寫的簡單服務器,并沒有自己實現底層結構。 關于 wsgi 詳見之前的文章: 自己寫一個 wsgi 服務器運行 Django 、Tornado 應用)。 那么 tornado.ioloop 就是 tornado web server 最底層的實現。
看 ioloop 之前,我們需要了解一些預備知識,有助于我們理解 ioloop。
epollioloop 的實現基于 epoll ,那么什么是 epoll? epoll 是Linux內核為處理大批量文件描述符而作了改進的 poll 。
那么什么又是 poll ? 首先,我們回顧一下, socket 通信時的服務端,當它接受( accept )一個連接并建立通信后( connection )就進行通信,而此時我們并不知道連接的客戶端有沒有信息發完。 這時候我們有兩種選擇:
一直在這里等著直到收發數據結束;
每隔一定時間來看看這里有沒有數據;
第二種辦法要比第一種好一些,多個連接可以統一在一定時間內輪流看一遍里面有沒有數據要讀寫,看上去我們可以處理多個連接了,這個方式就是 poll / select 的解決方案。 看起來似乎解決了問題,但實際上,隨著連接越來越多,輪詢所花費的時間將越來越長,而服務器連接的 socket 大多不是活躍的,所以輪詢所花費的大部分時間將是無用的。為了解決這個問題, epoll 被創造出來,它的概念和 poll 類似,不過每次輪詢時,他只會把有數據活躍的 socket 挑出來輪詢,這樣在有大量連接時輪詢就節省了大量時間。
對于 epoll 的操作,其實也很簡單,只要 4 個 API 就可以完全操作它。
epoll_create用來創建一個 epoll 描述符( 就是創建了一個 epoll )
epoll_ctl操作 epoll 中的 event;可用參數有:
參數 | 含義 |
---|---|
EPOLL_CTL_ADD | 添加一個新的epoll事件 |
EPOLL_CTL_DEL | 刪除一個epoll事件 |
EPOLL_CTL_MOD | 改變一個事件的監聽方式 |
而事件的監聽方式有七種,而我們只需要關心其中的三種:
宏定義 | 含義 |
---|---|
EPOLLIN | 緩沖區滿,有數據可讀 |
EPOLLOUT | 緩沖區空,可寫數據 |
EPOLLERR | 發生錯誤 |
就是讓 epoll 開始工作,里面有個參數 timeout,當設置為非 0 正整數時,會監聽(阻塞) timeout 秒;設置為 0 時立即返回,設置為 -1 時一直監聽。
在監聽時有數據活躍的連接時其返回活躍的文件句柄列表(此處為 socket 文件句柄)。
close關閉 epoll
現在了解了 epoll 后,我們就可以來看 ioloop 了 (如果對 epoll 還有疑問可以看這兩篇資料: epoll 的原理是什么、百度百科:epoll)
tornado.ioloop很多初學者一定好奇 tornado 運行服務器最后那一句 tornado.ioloop.IOLoop.current().start() 到底是干什么的。 我們先不解釋作用,來看看這一句代碼背后到底都在干什么。
先貼 ioloop 代碼:
from __future__ import absolute_import, division, print_function, with_statement import datetime import errno import functools import heapq # 最小堆 import itertools import logging import numbers import os import select import sys import threading import time import traceback import math from tornado.concurrent import TracebackFuture, is_future from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try: import signal except ImportError: signal = None if PY3: import _thread as thread else: import thread _POLL_TIMEOUT = 3600.0 class TimeoutError(Exception): pass class IOLoop(Configurable): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() @staticmethod def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance @staticmethod def initialized(): """Returns true if the singleton instance has been created.""" return hasattr(IOLoop, "_instance") def install(self): assert not IOLoop.initialized() IOLoop._instance = self @staticmethod def clear_instance(): """Clear the global `IOLoop` instance. .. versionadded:: 4.0 """ if hasattr(IOLoop, "_instance"): del IOLoop._instance @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @staticmethod def clear_current(): IOLoop._current.instance = None @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current() def close(self, all_fds=False): raise NotImplementedError() def add_handler(self, fd, handler, events): raise NotImplementedError() def update_handler(self, fd, events): raise NotImplementedError() def remove_handler(self, fd): raise NotImplementedError() def set_blocking_signal_threshold(self, seconds, action): raise NotImplementedError() def set_blocking_log_threshold(self, seconds): self.set_blocking_signal_threshold(seconds, self.log_stack) def log_stack(self, signal, frame): gen_log.warning("IOLoop blocked for %f seconds in %s", self._blocking_signal_threshold, "".join(traceback.format_stack(frame))) def start(self): raise NotImplementedError() def _setup_logging(self): if not any([logging.getLogger().handlers, logging.getLogger("tornado").handlers, logging.getLogger("tornado.application").handlers]): logging.basicConfig() def stop(self): raise NotImplementedError() def run_sync(self, func, timeout=None): future_cell = [None] def run(): try: result = func() if result is not None: from tornado.gen import convert_yielded result = convert_yielded(result) except Exception: future_cell[0] = TracebackFuture() future_cell[0].set_exc_info(sys.exc_info()) else: if is_future(result): future_cell[0] = result else: future_cell[0] = TracebackFuture() future_cell[0].set_result(result) self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) if timeout is not None: timeout_handle = self.add_timeout(self.time() + timeout, self.stop) self.start() if timeout is not None: self.remove_timeout(timeout_handle) if not future_cell[0].done(): raise TimeoutError("Operation timed out after %s seconds" % timeout) return future_cell[0].result() def time(self): return time.time() ...
IOLoop 類首先聲明了 epoll 監聽事件的宏定義,當然,如前文所說,我們只要關心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
類中的方法有很多,看起來有點暈,但其實我們只要關心 IOLoop 核心功能的方法即可,其他的方法在明白核心功能后也就不難理解了。所以接下來我們著重分析核心代碼。
instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 這些方法不用在意細節,總之現在記住它們都是為了讓 IOLoop 類變成一個單例,保證從全局上調用的都是同一個 IOLoop 就好。
你一定疑惑 IOLoop 為何沒有 __init__, 其實是因為要初始化成為單例,IOLoop 的 new 函數已經被改寫了,同時指定了 initialize 做為它的初始化方法,所以此處沒有 __init__ 。 說到這,ioloop 的代碼里好像沒有看到 new 方法,這又是什么情況? 我們先暫時記住這里。
接著我們來看這個初始化方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is None: raise RuntimeError("current IOLoop already exists") self.make_current() def make_current(self): IOLoop._current.instance = self
what? 里面只是判斷了是否第一次初始化或者調用 self.make_current() 初始化,而 make_current() 里也僅僅是把實例指定為自己,那么初始化到底去哪了?
然后再看看 start() 、 run() 、 close() 這些關鍵的方法都成了返回 NotImplementedError 錯誤,全部未定義?!跟網上搜到的源碼分析完全不一樣啊。 這時候看下 IOLoop 的繼承關系,原來問題出在這里,之前的 tornado.ioloop 繼承自 object 所以所有的一切都自己實現,而現在版本的 tornado.ioloop 則繼承自 Configurable 看起來現在的 IOLoop 已經成為了一個基類,只定義了接口。 所以接著看 Configurable 代碼:
tornado.util.Configurableclass Configurable(object): __impl_class = None __impl_kwargs = None def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(*args, **init_kwargs) return instance @classmethod def configurable_base(cls): """Returns the base class of a configurable hierarchy. This will normally return the class in which it is defined. (which is *not* necessarily the same as the cls classmethod parameter). """ raise NotImplementedError() @classmethod def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError() def initialize(self): """Initialize a `Configurable` subclass instance. Configurable classes should use `initialize` instead of ``__init__``. .. versionchanged:: 4.2 Now accepts positional arguments in addition to keyword arguments. """ @classmethod def configure(cls, impl, **kwargs): """Sets the class to use when the base class is instantiated. Keyword arguments will be saved and added to the arguments passed to the constructor. This can be used to set global defaults for some parameters. """ base = cls.configurable_base() if isinstance(impl, (unicode_type, bytes)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs @classmethod def configured_class(cls): """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class @classmethod def _save_configuration(cls): base = cls.configurable_base() return (base.__impl_class, base.__impl_kwargs) @classmethod def _restore_configuration(cls, saved): base = cls.configurable_base() base.__impl_class = saved[0] base.__impl_kwargs = saved[1]
之前我們尋找的 __new__ 出現了! 注意其中這句: impl = cls.configured_class() impl 在這里就是 epoll ,它的生成函數是 configured_class(), 而其方法里又有 base.__impl_class = cls.configurable_default() ,調用了 configurable_default() 。而 Configurable 的 configurable_default():
def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError()
顯然也是個接口,那么我們再回頭看 ioloop 的 configurable_default():
def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
原來這是個工廠函數,根據不同的操作系統返回不同的事件池(linux 就是 epoll, mac 返回 kqueue,其他就返回普通的 select。 kqueue 基本等同于 epoll, 只是不同系統對其的不同實現)
現在線索轉移到了 tornado.platform.epoll.EPollIOLoop 上,我們再來看看 EPollIOLoop:
tornado.platform.epoll.EPollIOLoopimport select from tornado.ioloop import PollIOLoop class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop 完全繼承自 PollIOLoop (注意這里是 PollIOLoop 不是 IOLoop)并只是在初始化時指定了 impl 是 epoll,所以看起來我們用 IOLoop 初始化最后初始化的其實就是這個 PollIOLoop,所以接下來,我們真正需要理解和閱讀的內容應該都在這里:
tornado.ioloop.PollIOLoopclass PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, "fileno"): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self): ... try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it"s # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don"t want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of "now" to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn"t fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, "Interrupted system call") if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: # If we"re in the IOLoop"s thread, we know it"s not currently # polling. If we"re not, and we added the first callback to an # empty list, we may need to wake it up (it may wake up on its # own, but an occasional extra wake is harmless). Waking # up a polling IOLoop is relatively expensive, so we try to # avoid it when we can. self._waker.wake() def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add # it normally (modulo the NullContext) self.add_callback(callback, *args, **kwargs) else: # If we"re on the IOLoop"s thread, we cannot use # the regular add_callback because it may deadlock on # _callback_lock. Blindly insert into self._callbacks. # This is safe because the GIL makes list.append atomic. # One subtlety is that if the signal interrupted the # _callback_lock block in IOLoop.start, we may modify # either the old or new version of self._callbacks, # but either way will work. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
果然, PollIOLoop 繼承自 IOLoop 并實現了它的所有接口,現在我們終于可以進入真正的正題了
ioloop 分析首先要看的是關于 epoll 操作的方法,還記得前文說過的 epoll 只需要四個 api 就能完全操作嘛? 我們來看 PollIOLoop 的實現:
epoll 操作def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:這個三個方法分別對應 epoll_ctl 中的 add 、 modify 、 del 參數。 所以這三個方法實現了 epoll 的 epoll_ctl 。
epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已經完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 這個相當于 epoll_create 。
epoll_wait:epoll_wait 操作則在 start() 中:event_pairs = self._impl.poll(poll_timeout)
epoll_close:而 epoll 的 close 則在 PollIOLoop 中的 close 方法內調用: self._impl.close() 完成。
initialize接下來看 PollIOLoop 的初始化方法中作了什么:
def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 指定 epoll if hasattr(self._impl, "fileno"): set_close_exec(self._impl.fileno()) # fork 后關閉無用文件描述符 self.time_func = time_func or time.time # 指定獲取當前時間的函數 self._handlers = {} # handler 的字典,儲存被 epoll 監聽的 handler,與打開它的文件描述符 ( file descriptor 簡稱 fd ) 一一對應 self._events = {} # event 的字典,儲存 epoll 返回的活躍的 fd event pairs self._callbacks = [] # 儲存各個 fd 回調函數的列表 self._callback_lock = threading.Lock() # 指定進程鎖 self._timeouts = [] # 將是一個最小堆結構,按照超時時間從小到大排列的 fd 的任務堆( 通常這個任務都會包含一個 callback ) self._cancellations = 0 # 關于 timeout 的計數器 self._running = False # ioloop 是否在運行 self._stopped = False # ioloop 是否停止 self._closing = False # ioloop 是否關閉 self._thread_ident = None # 當前線程堆標識符 ( thread identify ) self._blocking_signal_threshold = None # 系統信號, 主要用來在 epoll_wait 時判斷是否會有 signal alarm 打斷 epoll self._timeout_counter = itertools.count() # 超時計數器 ( 暫時不是很明白具體作用,好像和前面的 _cancellations 有關系? 請大神講講) self._waker = Waker() # 一個 waker 類,主要是對于管道 pipe 的操作,因為 ioloop 屬于底層的數據操作,這里 epoll 監聽的是 pipe self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 將管道加入 epoll 監聽,對于 web server 初始化時只需要關心 READ 事件
除了注釋中的解釋,還有幾點補充:
close_exec 的作用: 子進程在fork出來的時候,使用了寫時復制(COW,Copy-On-Write)方式獲得父進程的數據空間、 堆和棧副本,這其中也包括文件描述符。剛剛fork成功時,父子進程中相同的文件描述符指向系統文件表中的同一項,接著,一般我們會調用exec執行另一個程序,此時會用全新的程序替換子進程的正文,數據,堆和棧等。此時保存文件描述符的變量當然也不存在了,我們就無法關閉無用的文件描述符了。所以通常我們會fork子進程后在子進程中直接執行close關掉無用的文件描述符,然后再執行exec。 所以 close_exec 執行的其實就是 關閉 + 執行的作用。 詳情可以查看: 關于linux進程間的close-on-exec機制
Waker(): Waker 封裝了對于管道 pipe 的操作:
def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) set_close_exec(r) set_close_exec(w) self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.write(b"x") except IOError: pass def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass def close(self): self.reader.close() self.writer.close()
可以看到 waker 把 pipe 分為讀、 寫兩個管道并都設置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中寫入隨意字符從而釋放管道。
startioloop 最核心的部分:
def start(self): if self._running: # 判斷是否已經運行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 設置停止為假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 獲得當前線程標識符 self._running = True # 設置運行 old_wakeup_fd = None if hasattr(signal, "set_wakeup_fd") and os.name == "posix": try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服務器進程正式開始,類似于其他服務器的 serve_forever with self._callback_lock: # 加鎖,_callbacks 做為臨界區不加鎖進行讀寫會產生臟數據 callbacks = self._callbacks # 讀取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用于存放這個周期內已過期( 已超時 )的任務 if self._timeouts: # 判斷 _timeouts 里是否有數據 now = self.time() # 獲取當前時間,用來判斷 _timeouts 里的任務有沒有超時 while self._timeouts: # _timeouts 有數據時一直循環, _timeouts 是個最小堆,第一個數據永遠是最小的, 這里第一個數據永遠是最接近超時或已超時的 if self._timeouts[0].callback is None: # 超時任務無回調 heapq.heappop(self._timeouts) # 直接彈出 self._cancellations -= 1 # 超時計數器 -1 elif self._timeouts[0].deadline <= now: # 判斷最小的數據是否超時 due_timeouts.append(heapq.heappop(self._timeouts)) # 超時就加到已超時列表里。 else: break # 因為最小堆,如果沒超時就直接退出循環( 后面的數據必定未超時 ) if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # 當超時計數器大于 512 并且 大于 _timeouts 長度一半( >> 為右移運算, 相當于十進制數據被除 2 )時,清零計數器,并剔除 _timeouts 中無 callbacks 的任務 self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 進行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 運行 callbacks 里所有的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 運行所有已過期任務的 callback callbacks = callback = due_timeouts = timeout = None # 釋放內存 if self._callbacks: # _callbacks 里有數據時 poll_timeout = 0.0 # 設置 epoll_wait 時間為0( 立即返回 ) elif self._timeouts: # _timeouts 里有數據時 poll_timeout = self._timeouts[0].deadline - self.time() # 取最小過期時間當 epoll_wait 等待時間,這樣當第一個任務過期時立即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 如果最小過期時間大于默認等待時間 _POLL_TIMEOUT = 3600,則用 3600,如果最小過期時間小于0 就設置為0 立即返回。 else: poll_timeout = _POLL_TIMEOUT # 默認 3600 s 等待時間 if not self._running: # 檢查是否有系統信號中斷運行,有則中斷,無則繼續 break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 開始 epoll_wait 之前確保 signal alarm 都被清空( 這樣在 epoll_wait 過程中不會被 signal alarm 打斷 ) try: event_pairs = self._impl.poll(poll_timeout) # 獲取返回的活躍事件隊 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # epoll_wait 結束, 再設置 signal alarm self._events.update(event_pairs) # 將活躍事件加入 _events while self._events: fd, events = self._events.popitem() # 循環彈出事件 try: fd_obj, handler_func = self._handlers[fd] # 處理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 確保發生異常也繼續運行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 開頭部分對應,但是不是很清楚作用,求老司機帶帶路stop
def stop(self): self._running = False self._stopped = True self._waker.wake()
這個很簡單,設置判斷條件,然后調用 self._waker.wake() 向 pipe 寫入隨意字符喚醒 ioloop 事件循環。 over!
總結噗,寫了這么長,終于寫完了。 經過分析,我們可以看到, ioloop 實際上是對 epoll 的封裝,并加入了一些對上層事件的處理和 server 相關的底層處理。
最后,感謝大家不辭辛苦看到這,文中理解有誤的地方還請多多指教!
原文地址
作者:rapospectre
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/45479.html
摘要:最大的特點就是其支持異步,所以它有著優異的性能。的代碼結構可以在其官網了解,本文著重分析的實現。事件驅動模型的大致思路的方法用于啟動事件循環。行文比較草率,如有錯誤和不足之處,敬請指正。 0. 簡介 tornado是一個用Python語言寫成的Web服務器兼Web應用框架,由FriendFeed公司在自己的網站FriendFeed中使用,被Facebook收購以后框架以開源軟件形式開放...
摘要:清楚了以上流程,我們直接來看函數主要用作初始化應用監聽端口以及啟動。其中就是保存聊天室所有聊天消息的結構。關于的解讀我會放到閱讀源碼時講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應用: chatdemo 首先看 chatdemo 的目錄結構: ├── chatdemo.py ├── ...
摘要:源碼之分析的協程原理分析版本為支持異步,實現了一個協程庫。提供了回調函數注冊當異步事件完成后,調用注冊的回調中間結果保存結束結果返回等功能注冊回調函數,當被解決時,改回調函數被調用。相當于喚醒已經處于狀態的父協程,通過回調函數,再執行。 tornado 源碼之 coroutine 分析 tornado 的協程原理分析 版本:4.3.0 為支持異步,tornado 實現了一個協程庫。 ...
摘要:主要是為了實現系統之間的雙向解耦而實現的。問題及優化隊列過長問題使用上述方案的異步非阻塞可能會依賴于的任務隊列長度,若隊列中的任務過多,則可能導致長時間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個用python編寫的一個強大的、可擴展的異步HTTP服務器,同時也是一個web開發框架。tornado是一個非阻塞式web服務器,其速度相當快。...
摘要:對參數類型進行檢驗,這里為當參數類型不合適是會拋出一個異常。將使用的第二個參數值作為默認值。而請求將從格式中取得指定的文本。這里需要正則表達式相關的知識,到了后面的學習中,必要時再去深入學習。到目前我們使用了,還支持任何合法的請求。 參考書籍《Introduction to Tornado》1.1 Tornado是什么?Tornado是使用Python編寫的一個強大的、可擴展的Web服...
閱讀 707·2021-11-18 10:02
閱讀 2243·2021-11-15 18:13
閱讀 3165·2021-11-15 11:38
閱讀 2956·2021-09-22 15:55
閱讀 3680·2021-08-09 13:43
閱讀 2450·2021-07-25 14:19
閱讀 2459·2019-08-30 14:15
閱讀 3453·2019-08-30 14:15