摘要:版權出現則重新調用注冊函數。中實例化,調用用戶定義的函數服務循環監聽端口處理請求調用監視請求,處理異常有請求進來停止循環通知外部,循環已經退出注意的用法,只設置一次,避免使用進行頻繁的設置清除。
SocketServer.py
Creating network servers.contents
SocketServer.py
contents
file head
BaseServer
BaseServer.serve_forever
BaseServer.shutdown
BaseServer.handle_request
BaseServer._handle_request_noblock
BaseServer Overridden functions
TCPServer
UDPServer
ForkingMixIn
ThreadingMixIn
BaseRequestHandler
StreamRequestHandler
DatagramRequestHandler
版權
file head__version__ = "0.4" import socket import select import sys import os import errno try: import threading except ImportError: import dummy_threading as threading __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", "StreamRequestHandler","DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"] if hasattr(socket, "AF_UNIX"): __all__.extend(["UnixStreamServer","UnixDatagramServer", "ThreadingUnixStreamServer", "ThreadingUnixDatagramServer"]) # 出現 EINTR 則重新調用 def _eintr_retry(func, *args): """restart a system call interrupted by EINTR""" while True: try: return func(*args) except (OSError, select.error) as e: if e.args[0] != errno.EINTR: raiseBaseServer
RequestHandlerClass 注冊 handle 函數。
finish_request 中實例化,調用用戶定義的 handle 函數
class BaseServer: timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ passBaseServer.serve_forever
服務循環
監聽端口
處理請求
def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # 調用 select 監視請求,處理 EINTR 異常 r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) # 有請求進來 if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set()BaseServer.shutdown
停止 serve_forever 循環.
__is_shut_down 通知外部,循環已經退出
注意 threading.Event() 的用法,只設置一次,避免使用 Event 進行頻繁的設置/清除。
需要在與 serve_forever 不同的線程中調用.
因為調用 shutdown 后需要 wait 信號量,程序會 block,block 后 serve_forever 無法執行
serve_forever 收到請求后才能退出設置信號量
注意
self.__shutdown_request 的讀寫操作,屬于原子操作,在多線程中使用是安全的
def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait()BaseServer.handle_request
和 serve_forever 并列的函數
如果不調用 server_forever, 在外面循環調用 handle_request
# The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. # 如果用戶使用 socket.settimeout() 設置了超時時間,則選取一個小的 timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) # select,監聽連接,會阻塞直到超時 fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return # 處理請求 self._handle_request_noblock()BaseServer._handle_request_noblock
真正的請求處理函數
get_request: 接收請求 accept
verify_request: 驗證,做一些驗證工作,比如 ip 過濾
process_request: 處理請求,子類重寫該方法后,需要 調用 SocketServer.BaseServer.process_request,
BaseServer.process_request 中有 BaseRequestHandler 的回調動作,實例化用戶定義的 handler, __init__ 中完成對 handle() 的調用
shutdown_reques: 關閉連接
def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: # 接收請求 # get_request 由子類實現,一般為接收請求,返回 socket request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) else: self.shutdown_request(request)BaseServer Overridden functions
def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print "-"*40 print "Exception happened during processing of request from", print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print "-"*40TCPServer
shutdown_request 先調用 socket.shutdown 后調用 socket.close
close()releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, callshutdown() beforeclose().
Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. Ifhow is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half (e.g. on Mac OS X, shutdown(SHUT_WR) does not allow further reads on the other end of the connection).
class TCPServer(BaseServer): address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by select(). """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() # 調用 shutdown 后調用 close,立即關閉并釋放資源 def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()UDPServer
UDPServer get_request 返回的是一個 (data, socket) 的 tuple,而 TCPServer 返回的是 socket
handle 中要區分處理
msg, sock = self.request
msg 已經獲取,無需額外 recv
對于數據的傳送, 你應該使用 socket 的 sendto() 和 recvfrom() 方法。 盡管傳統的 send() 和 recv() 也可以達到同樣的效果, 但是前面的兩個方法對于 UDP 連接而言更普遍。
from python3-cookbook
from SocketServer import BaseRequestHandler, UDPServer import time class TimeHandler(BaseRequestHandler): def handle(self): print("Got connection from", self.client_address) # Get message and client socket msg, sock = self.request resp = time.ctime() sock.sendto(resp.encode("ascii"), self.client_address) if __name__ == "__main__": serv = UDPServer(("", 20000), TimeHandler) serv.serve_forever() #----------------------------- >>> from socket import socket, AF_INET, SOCK_DGRAM >>> s = socket(AF_INET, SOCK_DGRAM) >>> s.sendto(b"", ("localhost", 20000)) 0 >>> s.recvfrom(8192) ("Thu Dec 20 10:01:01 2018", ("127.0.0.1", 20000))
class UDPServer(TCPServer): """UDP server class.""" allow_reuse_address = False socket_type = socket.SOCK_DGRAM max_packet_size = 8192 def get_request(self): data, client_addr = self.socket.recvfrom(self.max_packet_size) return (data, self.socket), client_addr def server_activate(self): # No need to call listen() for UDP. pass def shutdown_request(self, request): # No need to shutdown anything. self.close_request(request) def close_request(self, request): # No need to close anything. passForkingMixIn
典型的 fork 使用,這里我們能看到 fork 多進程的典型使用
限定最大進程數,保證系統資源不至于耗盡
父進程 wait defunct 進程
fork 后父進程返回
子進程處理請求后 _exit()
class ForkingMixIn: """Mix-in class to handle each request in a new process.""" timeout = 300 active_children = None max_children = 40 def collect_children(self): """Internal routine to wait for children that have exited.""" if self.active_children is None: return while len(self.active_children) >= self.max_children: try: pid, _ = os.waitpid(-1, 0) self.active_children.discard(pid) except OSError as e: if e.errno == errno.ECHILD: # we don"t have any children, we"re done self.active_children.clear() elif e.errno != errno.EINTR: break # Now reap all defunct children. for pid in self.active_children.copy(): try: pid, _ = os.waitpid(pid, os.WNOHANG) # if the child hasn"t exited yet, pid will be 0 and ignored by # discard() below self.active_children.discard(pid) except OSError as e: if e.errno == errno.ECHILD: # someone else reaped it self.active_children.discard(pid) def handle_timeout(self): """Wait for zombies after self.timeout seconds of inactivity. May be extended, do not override. """ self.collect_children() def process_request(self, request, client_address): """Fork a new subprocess to process the request.""" self.collect_children() pid = os.fork() if pid: # Parent process if self.active_children is None: self.active_children = set() self.active_children.add(pid) self.close_request(request) #close handle in parent process return else: # Child process. # This must never return, hence os._exit()! try: self.finish_request(request, client_address) self.shutdown_request(request) os._exit(0) except: try: self.handle_error(request, client_address) self.shutdown_request(request) finally: os._exit(1)ThreadingMixIn
ThreadingMixIn 重載了 process_request 函數
創建一個線程
在線程中處理請求
啟動線程
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass if hasattr(socket, "AF_UNIX"): class UnixStreamServer(TCPServer): address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer): address_family = socket.AF_UNIX class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): passBaseRequestHandler
基礎請求類,對外提供三個接口
setup()
handle()
finish()
使用時繼承該類,通過 BaseServer 注冊
BaseServer.finish_request 中實例化 BaseRequestHandler 類,在 __init__函數調用中完成繼承類重載的 handle() 接口的調用
class BaseRequestHandler: def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): passStreamRequestHandler
提供文件操作接口
class StreamRequestHandler(BaseRequestHandler): """Define self.rfile and self.wfile for stream sockets.""" # Default buffer sizes for rfile, wfile. # We default rfile to buffered because otherwise it could be # really slow for large data (a getc() call per byte); we make # wfile unbuffered because (a) often after a write() we want to # read and we need to flush the line; (b) big writes to unbuffered # files are typically optimized by stdio even when big reads # aren"t. rbufsize = -1 wbufsize = 0 # A timeout to apply to the request socket, if not None. timeout = None # Disable nagle algorithm for this socket, if True. # Use only when wbufsize != 0, to avoid small packets. disable_nagle_algorithm = False def setup(self): self.connection = self.request if self.timeout is not None: self.connection.settimeout(self.timeout) if self.disable_nagle_algorithm: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) self.rfile = self.connection.makefile("rb", self.rbufsize) self.wfile = self.connection.makefile("wb", self.wbufsize) def finish(self): if not self.wfile.closed: try: self.wfile.flush() except socket.error: # A final socket error may have occurred here, such as # the local error ECONNABORTED. pass self.wfile.close() self.rfile.close()DatagramRequestHandler
class DatagramRequestHandler(BaseRequestHandler): """Define self.rfile and self.wfile for datagram sockets.""" def setup(self): try: from cStringIO import StringIO except ImportError: from StringIO import StringIO self.packet, self.socket = self.request self.rfile = StringIO(self.packet) self.wfile = StringIO() def finish(self): self.socket.sendto(self.wfile.getvalue(), self.client_address)版權
作者:bigfish
許可協議:許可協議 知識共享署名-非商業性使用 4.0 國際許可協議
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/42860.html
摘要:對背后運行機制感興趣,參考網上資料,結合源碼分析函數運行時的機制,主要整理出函數調用棧。以分析首先官方文檔經典示例現在來分析啟動時發生了什么代碼只列出用到的函數,去掉注釋等函數導入運行函數主要運行調用返回類,然后調用返回類的。 對flask背后運行機制感興趣,參考網上資料,結合源碼分析run函數運行時的機制,主要整理出函數調用棧。以flask0.1分析 首先Flask官方文檔經典示例 ...
摘要:對于網絡編程來說,免不了要用到模塊。表示另一端的地址。以上主要是針對流數據的編程。對于協議的數據,處理略有不同。通過傳入對象調用來監聽對象的文件描述符,一旦發現對象就緒,就通知應用程序進行相應的讀寫操作。 對于python網絡編程來說,免不了要用到socket模塊。下面分享一下個人對python socket的一些理解。 socket編程步驟 服務端創建一個socket,綁定地址和端...
閱讀 1783·2023-04-25 22:42
閱讀 2215·2021-09-22 15:16
閱讀 3494·2021-08-30 09:44
閱讀 490·2019-08-29 16:44
閱讀 3310·2019-08-29 16:20
閱讀 2518·2019-08-29 16:12
閱讀 3390·2019-08-29 16:07
閱讀 670·2019-08-29 15:08