摘要:上一篇我們介紹了包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。倒排索引保存在本地一個名為的文件中。運行示例如下這個模塊沒有使用并發(fā),主要作用是為使用包編寫的服務(wù)器提供支持。
async/await語法asyncio 上一篇我們介紹了 asyncio 包,以及如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)。在這一篇,我們主要介紹使用 asyncio 包編程的兩個例子。
我們先介紹下 async/await 語法,要不然看完這篇可能會困惑,為什么之前使用 asyncio.coroutine 裝飾器 和 yield from,這里都是 用的 async 和 await?
python并發(fā)2:使用asyncio處理并發(fā)
async/await 是Python3.5 的新語法,語法如下:
async def read_data(db): pass
async 是明確將函數(shù)聲明為協(xié)程的關(guān)鍵字,即使沒有await表達式,函數(shù)執(zhí)行也會返回一個協(xié)程對象。
在協(xié)程函數(shù)內(nèi)部,可以在某個表達式之前使用 await 關(guān)鍵字來暫停協(xié)程的執(zhí)行,以等待某協(xié)程完成:
async def read_data(db): data = await db.fetch("SELECT ...")
這個代碼如果使用 asyncio.coroutine 裝飾器語法為:
@asyncio.coroutine def read_data(db): data = yield from db.fetch("SELECT ...")
這兩段代碼執(zhí)行的結(jié)果是一樣的,也就是說 可以把 asyncio.coroutine 替換為 async, yield from 替換為 await。
使用新的語法有什么好處呢:
使生成器和協(xié)程的概念更容易理解,因為語法不同
可以消除由于重構(gòu)時不小心移出協(xié)程中yield 聲明而導(dǎo)致的不明確錯誤,這回導(dǎo)致協(xié)程變成普通的生成器。
使用 asyncio 包編寫服務(wù)器這個例子主要是使用 asyncio 包 和 unicodedata 模塊,實現(xiàn)通過規(guī)范名稱查找Unicode 字符。
我們先來看一下代碼:
# charfinder.py import sys import re import unicodedata import pickle import warnings import itertools import functools from collections import namedtuple RE_WORD = re.compile("w+") RE_UNICODE_NAME = re.compile("^[A-Z0-9 -]+$") RE_CODEPOINT = re.compile("U+[0-9A-F]{4, 6}") INDEX_NAME = "charfinder_index.pickle" MINIMUM_SAVE_LEN = 10000 CJK_UNI_PREFIX = "CJK UNIFIED IDEOGRAPH" CJK_CMP_PREFIX = "CJK COMPATIBILITY IDEOGRAPH" sample_chars = [ "$", # DOLLAR SIGN "A", # LATIN CAPITAL LETTER A "a", # LATIN SMALL LETTER A "u20a0", # EURO-CURRENCY SIGN "u20ac", # EURO SIGN ] CharDescription = namedtuple("CharDescription", "code_str char name") QueryResult = namedtuple("QueryResult", "count items") def tokenize(text): """ :param text: :return: return iterable of uppercased words """ for match in RE_WORD.finditer(text): yield match.group().upper() def query_type(text): text_upper = text.upper() if "U+" in text_upper: return "CODEPOINT" elif RE_UNICODE_NAME.match(text_upper): return "NAME" else: return "CHARACTERS" class UnicodeNameIndex: # unicode name 索引類 def __init__(self, chars=None): self.load(chars) def load(self, chars=None): # 加載 unicode name self.index = None if chars is None: try: with open(INDEX_NAME, "rb") as fp: self.index = pickle.load(fp) except OSError: pass if self.index is None: self.build_index(chars) if len(self.index) > MINIMUM_SAVE_LEN: try: self.save() except OSError as exc: warnings.warn("Could not save {!r}: {}" .format(INDEX_NAME, exc)) def save(self): with open(INDEX_NAME, "wb") as fp: pickle.dump(self.index, fp) def build_index(self, chars=None): if chars is None: chars = (chr(i) for i in range(32, sys.maxunicode)) index = {} for char in chars: try: name = unicodedata.name(char) except ValueError: continue if name.startswith(CJK_UNI_PREFIX): name = CJK_UNI_PREFIX elif name.startswith(CJK_CMP_PREFIX): name = CJK_CMP_PREFIX for word in tokenize(name): index.setdefault(word, set()).add(char) self.index = index def word_rank(self, top=None): # (len(self.index[key], key) 是一個生成器,需要用list 轉(zhuǎn)成列表,要不然下邊排序會報錯 res = [list((len(self.index[key], key)) for key in self.index)] res.sort(key=lambda item: (-item[0], item[1])) if top is not None: res = res[:top] return res def word_report(self, top=None): for postings, key in self.word_rank(top): print("{:5} {}".format(postings, key)) def find_chars(self, query, start=0, stop=None): stop = sys.maxsize if stop is None else stop result_sets = [] for word in tokenize(query): # tokenize 是query 的生成器 a b 會是 ["a", "b"] 的生成器 chars = self.index.get(word) if chars is None: result_sets = [] break result_sets.append(chars) if not result_sets: return QueryResult(0, ()) result = functools.reduce(set.intersection, result_sets) result = sorted(result) # must sort to support start, stop result_iter = itertools.islice(result, start, stop) return QueryResult(len(result), (char for char in result_iter)) def describe(self, char): code_str = "U+{:04X}".format(ord(char)) name = unicodedata.name(char) return CharDescription(code_str, char, name) def find_descriptions(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe(char) def get_descriptions(self, chars): for char in chars: yield self.describe(char) def describe_str(self, char): return "{:7} {} {}".format(*self.describe(char)) def find_description_strs(self, query, start=0, stop=None): for char in self.find_chars(query, start, stop).items: yield self.describe_str(char) @staticmethod # not an instance method due to concurrency def status(query, counter): if counter == 0: msg = "No match" elif counter == 1: msg = "1 match" else: msg = "{} matches".format(counter) return "{} for {!r}".format(msg, query) def main(*args): index = UnicodeNameIndex() query = " ".join(args) n = 0 for n, line in enumerate(index.find_description_strs(query), 1): print(line) print("({})".format(index.status(query, n))) if __name__ == "__main__": if len(sys.argv) > 1: main(*sys.argv[1:]) else: print("Usage: {} word1 [word2]...".format(sys.argv[0]))
這個模塊讀取Python內(nèi)建的Unicode數(shù)據(jù)庫,為每個字符名稱中的每個單詞建立索引,然后倒排索引,存入一個字典。
例如,在倒排索引中,"SUN" 鍵對應(yīng)的條目是一個集合,里面是名稱中包含"SUN" 這個詞的10個Unicode字符。倒排索引保存在本地一個名為charfinder_index.pickle 的文件中。如果查詢多個單詞,會計算從索引中所得集合的交集。
運行示例如下:
>>> main("rook") # doctest: +NORMALIZE_WHITESPACE U+2656 ? WHITE CHESS ROOK U+265C ? BLACK CHESS ROOK (2 matches for "rook") >>> main("rook", "black") # doctest: +NORMALIZE_WHITESPACE U+265C ? BLACK CHESS ROOK (1 match for "rook black") >>> main("white bishop") # doctest: +NORMALIZE_WHITESPACE U+2657 ? WHITE CHESS BISHOP (1 match for "white bishop") >>> main("jabberwocky"s vest") (No match for "jabberwocky"s vest")
這個模塊沒有使用并發(fā),主要作用是為使用 asyncio 包編寫的服務(wù)器提供支持。
下面我們來看下 tcp_charfinder.py 腳本:
# tcp_charfinder.py import sys import asyncio # 用于構(gòu)建索引,提供查詢方法 from charfinder import UnicodeNameIndex CRLF = b" " PROMPT = b"?> " # 實例化UnicodeNameIndex 類,它會使用charfinder_index.pickle 文件 index = UnicodeNameIndex() async def handle_queries(reader, writer): # 這個協(xié)程要傳給asyncio.start_server 函數(shù),接收的兩個參數(shù)是asyncio.StreamReader 對象和 asyncio.StreamWriter 對象 while True: # 這個循環(huán)處理會話,直到從客戶端收到控制字符后退出 writer.write(PROMPT) # can"t await! # 這個方法不是協(xié)程,只是普通函數(shù);這一行發(fā)送 ?> 提示符 await writer.drain() # must await! # 這個方法刷新writer 緩沖;因為它是協(xié)程,所以要用 await data = await reader.readline() # 這個方法也是協(xié)程,返回一個bytes對象,也要用await try: query = data.decode().strip() except UnicodeDecodeError: # Telenet 客戶端發(fā)送控制字符時,可能會拋出UnicodeDecodeError異常 # 我們這里默認發(fā)送空字符 query = "x00" client = writer.get_extra_info("peername") # 返回套接字連接的遠程地址 print("Received from {}: {!r}".format(client, query)) # 在控制臺打印查詢記錄 if query: if ord(query[:1]) < 32: # 如果收到控制字符或者空字符,退出循環(huán) break # 返回一個生成器,產(chǎn)出包含Unicode 碼位、真正的字符和字符名稱的字符串 lines = list(index.find_description_strs(query)) if lines: # 使用默認的UTF-8 編碼把lines 轉(zhuǎn)換成bytes 對象,并在每一行末添加回車符合換行符 # 參數(shù)列表是一個生成器 writer.writelines(line.encode() + CRLF for line in lines) writer.write(index.status(query, len(lines)).encode() + CRLF) # 輸出狀態(tài) await writer.drain() # 刷新輸出緩沖 print("Sent {} results".format(len(lines))) # 在服務(wù)器控制臺記錄響應(yīng) print("Close the client socket") # 在控制臺記錄會話結(jié)束 writer.close() # 關(guān)閉StreamWriter流 def main(address="127.0.0.1", port=2323): # 添加默認地址和端口,所以調(diào)用默認可以不加參數(shù) port = int(port) loop = asyncio.get_event_loop() # asyncio.start_server 協(xié)程運行結(jié)束后, # 返回的協(xié)程對象返回一個asyncio.Server 實例,即一個TCP套接字服務(wù)器 server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro) # 驅(qū)動server_coro 協(xié)程,啟動服務(wù)器 host = server.sockets[0].getsockname() # 獲得這個服務(wù)器的第一個套接字的地址和端口 print("Serving on {}. Hit CTRL-C to stop.".format(host)) # 在控制臺中顯示地址和端口 try: loop.run_forever() # 運行事件循環(huán) main 函數(shù)在這里阻塞,直到服務(wù)器的控制臺中按CTRL-C 鍵 except KeyboardInterrupt: # CTRL+C pressed pass print("Server shutting down.") server.close() # server.wait_closed返回一個 future # 調(diào)用loop.run_until_complete 方法,運行 future loop.run_until_complete(server.wait_closed()) loop.close() # 終止事件循環(huán) if __name__ == "__main__": main(*sys.argv[1:])
運行 tcp_charfinders.py
python tcp_charfinders.py
打開終端,使用 telnet 命令請求服務(wù),運行結(jié)果如下所示:
main 函數(shù)幾乎會立即顯示 Serving on... 消息,然后在調(diào)用loop.run_forever() 方法時阻塞。這時,控制權(quán)流動到事件循環(huán)中,而且一直等待,偶爾會回到handle_queries 協(xié)程,這個協(xié)程需要等待網(wǎng)絡(luò)發(fā)送或接收數(shù)據(jù)時,控制權(quán)又交給事件循環(huán)。
handle_queries 協(xié)程可以處理多個客戶端發(fā)來的多次請求。只要有新客戶端連接服務(wù)器,就會啟動一個handle_queries 協(xié)程實例。
handle_queries 的I/O操作都是使用bytes格式。我們從網(wǎng)絡(luò)得到的數(shù)據(jù)要解碼,發(fā)出去的數(shù)據(jù)也要編碼
asyncio包提供了高層的流API,提供了現(xiàn)成的服務(wù)器,我們只需要實現(xiàn)一個處理程序。詳細信息可以查看文檔:https://docs.python.org/3/library/asyncio-stream.html
雖然,asyncio包提供了服務(wù)器,但是功能相對來說還是比較簡陋的,現(xiàn)在我們使用一下 基于asyncio包的 web 框架 sanci,用它來實現(xiàn)一個http版的簡易服務(wù)器
使用 sanic 包編寫web 服務(wù)器sanic 的簡單入門在上一篇文章有介紹,python web 框架 Sanci 快速入門
Sanic 是一個和類Flask 的基于Python3.5+的web框架,提供了比較高階的API,比如路由、request參數(shù),response等,我們只需要實現(xiàn)處理邏輯即可。
下邊是使用 sanic 實現(xiàn)的簡易的 字符查詢http web 服務(wù):
from sanic import Sanic from sanic import response from charfinder import UnicodeNameIndex app = Sanic() index = UnicodeNameIndex() html_temp = "{char}
" @app.route("/charfinder") # app.route 函數(shù)的第一個參數(shù)是url path,我們這里指定路徑是charfinder async def charfinder(request): # request.args 可以取到url 的查詢參數(shù) # ?key1=value1&key2=value2 的結(jié)果是 {"key1": ["value1"], "key2": ["value2"]} # 我們這里支持傳入多個查詢參數(shù),所以這里使用 request.args.getlist("char") # 如果我們 使用 request.args.get("char") 只能取到第一個參數(shù) query = request.args.getlist("char") query = " ".join(query) lines = list(index.find_description_strs(query)) # 將得到的結(jié)果生成html html = " ".join([html_temp.format(char=line) for line in lines]) return response.html(html) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000) # 設(shè)置服務(wù)器運行地址和端口號
對比兩段代碼可以發(fā)現(xiàn),使用 sanic 非常簡單。
運行服務(wù):
python http_charsfinder.py
我們在瀏覽器輸入地址 http://0.0.0.0:8000/charfinde... 結(jié)果示例如下
現(xiàn)在對比下兩段代碼在TCP 的示例中,服務(wù)器通過main函數(shù)下的這兩行代碼創(chuàng)建并排定運行時間:
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) server = loop.run_until_complete(server_coro)
而在sanic的HTTP示例中,使用,創(chuàng)建服務(wù)器:
app.run(host="0.0.0.0", port=8000)
這兩個看起來運行方式完全不同,但如果我們翻開sanic的源碼會看到 app.run() 內(nèi)部是調(diào)用 的 server_coroutine = loop.create_server()創(chuàng)建服務(wù)器,
server_coroutine 是通過 loop.run_until_complete()驅(qū)動的。
所以說,為了啟動服務(wù)器,這兩個都是由 loop.run_until_complete 驅(qū)動,完成運行的。只不過 sanic 封裝了run 方法,使得使用更加方便。
這里可以得到一個基本事實:只有驅(qū)動協(xié)程,協(xié)程才能做事,而驅(qū)動 asyncio.coroutine 裝飾的協(xié)程有兩種方式,使用 yield from 或者傳給asyncio 包中某個參數(shù)為協(xié)程或future的函數(shù),例如 run_until_complete
現(xiàn)在如果你搜索 cjk,會得到7萬多條數(shù)據(jù)3M 的一個html文件,耗時大約2s,這如果是生產(chǎn)服務(wù)的一個請求,耗時2s是不能接收的,我們可以使用分頁,這樣我們可以每次只取200條數(shù)據(jù),當用戶想看更多數(shù)據(jù)時再使用 ajax 或者 websockets發(fā)送下一批數(shù)據(jù)。
這一篇我們使用 asyncio 包實現(xiàn)了TCP服務(wù)器,使用sanic(基于asyncio sanic 默認使用 uvloop替代asyncio)實現(xiàn)了HTTP服務(wù)器,用于按名稱搜索Unicode 字符。但是并沒有涉及服務(wù)器并發(fā)部分,這部分可以以后再討論。
參考鏈接這一篇還是 《流暢的python》asyncio 一章的讀書筆記,下一篇將是python并發(fā)的第三篇,《使用線程處理并發(fā)》。
Python 3.5將支持Async/Await異步編程:http://www.infoq.com/cn/news/2015/05/python-async-await
python web 框架 Sanci 快速入門
python并發(fā)2:使用asyncio處理并發(fā)
最后,感謝女朋友支持。
>歡迎關(guān)注 | >請我喝芬達 |
---|---|
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/38648.html
摘要:并發(fā)用于制定方案,用來解決可能但未必并行的問題。在協(xié)程中使用需要注意兩點使用鏈接的多個協(xié)程最終必須由不是協(xié)程的調(diào)用方驅(qū)動,調(diào)用方顯式或隱式在最外層委派生成器上調(diào)用函數(shù)或方法。對象可以取消取消后會在協(xié)程當前暫停的處拋出異常。 導(dǎo)語:本文章記錄了本人在學習Python基礎(chǔ)之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、了解asyncio...
摘要:具有以下基本同步原語子進程提供了通過創(chuàng)建和管理子進程的。雖然隊列不是線程安全的,但它們被設(shè)計為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標準庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個版本,asyncio又做了比較大的調(diào)整,把這個庫的API分為了 高層級API和...
摘要:是之后引入的標準庫的,這個包使用事件循環(huán)驅(qū)動的協(xié)程實現(xiàn)并發(fā)。沒有能從外部終止線程,因為線程隨時可能被中斷。上一篇并發(fā)使用處理并發(fā)我們介紹過的,在中,只是調(diào)度執(zhí)行某物的結(jié)果。 asyncio asyncio 是Python3.4 之后引入的標準庫的,這個包使用事件循環(huán)驅(qū)動的協(xié)程實現(xiàn)并發(fā)。asyncio 包在引入標準庫之前代號 Tulip(郁金香),所以在網(wǎng)上搜索資料時,會經(jīng)常看到這種花的...
摘要:我們以請求網(wǎng)絡(luò)服務(wù)為例,來實際測試一下加入多線程之后的效果。所以,執(zhí)行密集型操作時,多線程是有用的,對于密集型操作,則每次只能使用一個線程。說到這里,對于密集型,可以使用多線程或者多進程來提高效率。 為了提高系統(tǒng)密集型運算的效率,我們常常會使用到多個進程或者是多個線程,python中的Threading包實現(xiàn)了線程,multiprocessing 包則實現(xiàn)了多進程。而在3.2版本的py...
摘要:創(chuàng)建第一個協(xié)程推薦使用語法來聲明協(xié)程,來編寫異步應(yīng)用程序。協(xié)程兩個緊密相關(guān)的概念是協(xié)程函數(shù)通過定義的函數(shù)協(xié)程對象調(diào)用協(xié)程函數(shù)返回的對象。它是一個低層級的可等待對象,表示一個異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...
閱讀 3032·2020-01-08 12:17
閱讀 2000·2019-08-30 15:54
閱讀 1159·2019-08-30 15:52
閱讀 2043·2019-08-29 17:18
閱讀 1054·2019-08-29 15:34
閱讀 2468·2019-08-27 10:58
閱讀 1870·2019-08-26 12:24
閱讀 381·2019-08-23 18:23