摘要:一旦有事件產(chǎn)生可能是一次出現(xiàn)好多個事件,就會按照優(yōu)先級依次調(diào)用每個事件的回調(diào)函數(shù)。注意,是有超時的,所以一些無法以文件描述符的形式存在的事件也可以有機會被觸發(fā)。
這一篇主要想跟大家分享一下 Gevent 實現(xiàn)的基礎(chǔ)邏輯,也是有同學(xué)對這個很感興趣,所以貼出來跟大家一起分享一下。
Greenlet我們知道 Gevent 是基于 Greenlet 實現(xiàn)的,greenlet 有的時候也被叫做微線程或者協(xié)程。其實 Greenlet 本身非常簡單,其自身實現(xiàn)的功能也非常直接。區(qū)別于常規(guī)的編程思路——順序執(zhí)行、調(diào)用進棧、返回出棧—— Greenlet 提供了一種在不同的調(diào)用棧之間自由跳躍的功能。從一個簡單的例子來看一下吧(摘自官方文檔):
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
這里,每一個 greenlet 就是一個調(diào)用棧——您可以把他想象成一個線程,只不過真正的線程可以并行執(zhí)行,而同一時刻只能有一個 greenlet 在執(zhí)行(同一線程里)。正如例子中最后三句話,我們創(chuàng)建了 gr1 和 gr2 兩個不同的調(diào)用棧空間,入口函數(shù)分別是 test1 和 test2;這最后一句 gr1.switch() 得多解釋一點。
因為除了 gr1 和 gr2,我們還有一個棧空間,也就是所有 Python 程序都得有的默認的棧空間——我們暫且稱之為 main,而這一句 gr1.switch() 恰恰實現(xiàn)了從 main 到 gr1 的跳躍,也就是從當(dāng)前的棧跳到指定的棧。這時,就猶如常規(guī)調(diào)用 test1() 一樣,gr1.switch() 的調(diào)用暫時不會返回結(jié)果,程序會跳轉(zhuǎn)到 test1 繼續(xù)執(zhí)行;只不過區(qū)別于普通函數(shù)調(diào)用時 test1() 會向當(dāng)前棧壓棧,而 gr1.switch() 則會將當(dāng)前棧存檔,替換成 gr1 的棧。如圖所示:
對于這種棧的切換,我們有時也稱之為執(zhí)行權(quán)的轉(zhuǎn)移,或者說 main 交出了執(zhí)行權(quán),同時 gr1 獲得了執(zhí)行權(quán)。Greenlet 在底層是用匯編實現(xiàn)的這樣的切換:把當(dāng)前的棧(main)相關(guān)的寄存器啊什么的保存到內(nèi)存里,然后把原本保存在內(nèi)存里的 gr1 的相關(guān)信息恢復(fù)到寄存器里。這種操作速度非常快,比操作系統(tǒng)對多進程調(diào)度的上下文切換還要快。代碼在這里,有興趣的同學(xué)可以一起研究一下(其中 switch_x32_unix.h 是我寫的哈哈)。
回到前面的例子,最后一句 gr1.switch() 調(diào)用將執(zhí)行點跳到了 gr1 的第一句,于是輸出了 12。隨后順序執(zhí)行到 gr2.switch(),繼而跳轉(zhuǎn)到 gr2 的第一句,于是輸出了 56。接著又是 gr1.switch(),跳回到 gr1,從之前跳出的地方繼續(xù)——對 gr1 而言就是 gr2.switch() 的調(diào)用返回了結(jié)果 None,然后輸出 34。
這個時候 test1 執(zhí)行到頭了,gr1 的棧里面空了。Greenlet 設(shè)計了 parent greenlet 的概念,就是說,當(dāng)一個 greenlet 的入口函數(shù)執(zhí)行完之后,會自動切換回其 parent。默認情況下,greenlet 的 parent 就是創(chuàng)建該 greenlet 時所在的那個棧,前面的例子中,gr1 和 gr2 都是在 main 里被創(chuàng)建的,所以他們倆的 parent 都是 main。所以當(dāng) gr1 結(jié)束的時候,會回到 main 的最后一句,接著 main 結(jié)束了,所以整個程序也就結(jié)束了——78 從來沒有被執(zhí)行到過。另外,greenlet 的 parent 也可以手工設(shè)置。
簡單來看,greenlet 只是為 Python 語言增加了創(chuàng)建多條執(zhí)行序列的功能,而且多條執(zhí)行序列之間的切換還必須得手動顯式調(diào)用 switch() 才行;這些都跟異步 I/O 沒有必然關(guān)系。
gevent.sleep接著來看 Gevent。最簡單的一個 Gevent 示例就是這樣的了:
import gevent gevent.sleep(1)
貌似非常簡單的一個 sleep,卻包含了 Gevent 的關(guān)鍵結(jié)構(gòu),讓我們仔細看一下 sleep 的實現(xiàn)吧。代碼在 gevent/hub.py:
def sleep(seconds=0): hub = get_hub() loop = hub.loop hub.wait(loop.timer(seconds))
這里我把一些當(dāng)前用不著的代碼做了一些清理,只留下了三句關(guān)鍵的代碼,其中就有 Gevent 的兩個關(guān)鍵的部件——hub 和 loop。loop 是 Gevent 的核心部件,也就是主循環(huán)核心,默認是用 Cython 寫的 libev 的包裝(所以性能杠杠滴),稍后會在詳細提到它。hub 則是一個 greenlet,里面跑著 loop。
hub 是一個單例,從 get_hub() 的源碼就可以看出來:
import _thread _threadlocal = _thread._local() def get_hub(*args, **kwargs): global _threadlocal try: return _threadlocal.hub except AttributeError: hubtype = get_hub_class() hub = _threadlocal.hub = hubtype(*args, **kwargs) return hub
所以第一次執(zhí)行 get_hub() 的時候,就會創(chuàng)建一個 hub 實例:
class Hub(greenlet): loop_class = config("gevent.core.loop", "GEVENT_LOOP") def __init__(self): greenlet.__init__(self) loop_class = _import(self.loop_class) self.loop = loop_class()
同樣這是一段精簡了的代碼,反映了一個 hub 的關(guān)鍵屬性——loop。loop 實例隨著 hub 實例的創(chuàng)建而創(chuàng)建,默認的 loop 就是 gevent/core.ppyx 里的 class loop,也可以通過環(huán)境變量 GEVENT_LOOP 來自定義。
值得注意的是,截止到 hub = get_hub() 和 loop = hub.loop,我們都只是創(chuàng)建了 hub 和 loop,并沒有真正開始跑我們的主循環(huán)。稍安勿躁,第三句就要開始了。
loop 有一堆接口,對應(yīng)著底層 libev 的各個功能,詳見此處。我們這里用到的是 timer(seconds),該函數(shù)返回的是一個 watcher 對象,對應(yīng)著底層 libev 的 watcher 概念。我們大概能猜到,這個 watcher 對象會在幾秒鐘之后做一些什么事情,但是具體怎么做,讓我們一起看看 hub.wait() 的實現(xiàn)吧。
def wait(self, watcher): waiter = Waiter() watcher.start(waiter.switch) waiter.get()
代碼也不長,不過能看到 watcher 的接口 watcher.start(method),也就是說,當(dāng)給定的幾秒鐘過了之后,會調(diào)用這里給的函數(shù),也就是 waiter.switch。讓我們再看一下這里用到的 Waiter,都是在同一個文件 hub.py 里面:
from greenlet import getcurrent class Waiter(object): def __init__(self): self.hub = get_hub() self.greenlet = None def switch(self): assert getcurrent() is self.hub self.greenlet.switch() def get(self): assert self.greenlet is None self.greenlet = getcurrent() try: self.hub.switch() finally: self.greenlet = None
這里同樣刪掉了大量干擾因素。根據(jù)前面 wait() 的定義,我們會先創(chuàng)建一個 waiter,然后調(diào)用其 get(),隨后幾秒鐘之后 loop 會調(diào)用其 switch()。一個個看。
get() 一上來會保證自己不會被同時調(diào)用到(assert),接著就去獲取了當(dāng)前的 greenlet,也就是調(diào)用 get() 時所處的棧,一直往前找,找到 sleep(1),所以 getcurrent() 的結(jié)果是 main。Waiter 隨后將 main 保存在了 self.greenlet 引用中。
下面的一句話是重中之重了,self.hub.switch()!由不管任何上下文中,直接往 hub 里跳。由于這是第一次跳進 hub 里,所以此時 loop 就開始運轉(zhuǎn)了。
正巧,我們之前已經(jīng)通過 loop.timer(1) 和 watcher.start(waiter.switch),在 loop 里注冊了說,1 秒鐘之后去調(diào)用 waiter.switch,loop 一旦跑起來就會嚴格執(zhí)行之前注冊的命令。所以呢,一秒鐘之后,我們在 hub 的棧中,調(diào)用到了 Waiter.switch()。
在 switch() 里,程序一上來就要驗證當(dāng)前上下文必須得是 hub,翻閱一下前面的代碼,這個是必然的。最后,跳到 self.greenlet!還記得它被設(shè)置成什么了嗎?——main。于是乎,我們就回到了最初的代碼里,gevent.sleep(1) 在經(jīng)過了 1 秒鐘的等待之后終于返回了。
回頭看一下這個過程,其實也很簡單的:當(dāng)我們需要等待一個事件發(fā)生時——比如需要等待 1 秒鐘的計時器事件,我們就把當(dāng)前的執(zhí)行棧跟這個事件做一個綁定(watcher.start(waiter.switch)),然后把執(zhí)行權(quán)交給 hub;hub 則會在事件發(fā)生后,根據(jù)注冊的記錄盡快回到原來的斷點繼續(xù)執(zhí)行。
異步hub 一旦拿到執(zhí)行權(quán),就可以做很多事情了,比如切換到別的 greenlet 去執(zhí)行一些其他的任務(wù),直到這些 greenlet 又主動把執(zhí)行權(quán)交回給 hub。宏觀的來看,就是這樣的:一個 hub,好多個其他的任務(wù) greenlet(其中沒準就包括 main),hub 負責(zé)總調(diào)度,去依次調(diào)用各個任務(wù) greenlet;任務(wù) greenlet 則在執(zhí)行至下一次斷點時,主動切換回 hub。這樣一來,許多個任務(wù) greenlet 就可以看似并行地同步運行了,這種任務(wù)調(diào)度方式叫做協(xié)作式的任務(wù)調(diào)度(cooperative scheduling)。
舉個例子:
import gevent def beep(interval): while True: print("Beep %s" % interval) gevent.sleep(interval) for i in range(10): gevent.spawn(beep, i) beep(20)
例子里我們總共創(chuàng)建了 10 個 greenlet,每一個都會按照不同頻率輸出“蜂鳴”;最后一句的 beep(20) 又讓 main greenlet 也不斷地蜂鳴。算上 hub,這個例子一共會有 12 個不同的 greenlet 在協(xié)作式地運行。
I/OGevent 最主要的功能當(dāng)然是異步 I/O 了。其實,I/O 跟前面 sleep 的例子沒什么本質(zhì)的區(qū)別,只不過 sleep 用的 watcher 是 timer,而 I/O 用到的 watcher 是 io。比如說 wait_read(fileno) 是這樣的:
def wait_read(fileno): hub = get_hub() io = hub.loop.io(fileno, 1) return hub.wait(io)
沒什么太大區(qū)別吧,原理其實都是一樣的。基于這個,我們就可以搞異步 socket 了。socket 的接口較為復(fù)雜,這里提取一些標(biāo)志性的代碼一起讀一下吧:
class socket(object): def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0): self._sock = _realsocket(family, type, proto) # 創(chuàng)建底層的 socket self._sock.setblocking(0) # 將其設(shè)置為非阻塞的 fileno = self._sock.fileno() # 獲得其文件描述符 self.hub = get_hub() # 自己留一份 hub 的引用,省的每次再現(xiàn)取 io = self.hub.loop.io # 快捷方式 self._read_event = io(fileno, 1) # socket 的讀取事件 self._write_event = io(fileno, 2) # socket 的寫入事件 def _wait(self, watcher): assert watcher.callback is None # 一個 socket 只能被一個 greenlet 用 self.hub.wait(watcher) # 見之前的例子,等待一個事件發(fā)生 def recv(self, *args): sock = self._sock while True: try: return sock.recv(*args) # 異步接收,要么立即成功,要么立即失敗 except error as ex: if ex.args[0] != EWOULDBLOCK: # 如果失敗的話,除了是異步等待的情況, raise # 其他情況都報錯 self._wait(self._read_event) # 等待 socket 有數(shù)據(jù)可讀libev
最后提一點關(guān)于 libev 的東西,因為有同學(xué)也問到 Gevent 底層的調(diào)度方式。簡單來說,libev 是依賴操作系統(tǒng)底層的異步 I/O 接口實現(xiàn)的,Linux 用的是 epoll,F(xiàn)reeBSD 則是 kqueue。Python 代碼里,socket 會創(chuàng)建一堆 io watcher,對應(yīng)底層則是將一堆文件描述符添加到一個——比如—— epoll 的句柄里。當(dāng)切換到 hub 之后,libev 會調(diào)用底層的 epoll_wait 來等待這些 socket 中可能出現(xiàn)的事件。一旦有事件產(chǎn)生(可能是一次出現(xiàn)好多個事件),libev 就會按照優(yōu)先級依次調(diào)用每個事件的回調(diào)函數(shù)。注意,epoll_wait 是有超時的,所以一些無法以文件描述符的形式存在的事件也可以有機會被觸發(fā)。關(guān)于 libev 網(wǎng)上還有很多資料,有興趣大家可以自行查閱。
Gevent 的性能調(diào)優(yōu)Gevent 不是銀彈,不能無限制地創(chuàng)建 greenlet。正如多線程編程一樣,用 gevent 寫服務(wù)器也應(yīng)該創(chuàng)建一個“微線程池”,超過池子大小的 spawn 應(yīng)該被阻塞并且開始排隊。只有這樣,才能保證同時運行的 greenlet 數(shù)量不至于多到顯著增加異步等待的恢復(fù)時間,從而保證每個任務(wù)的響應(yīng)速度。其實,當(dāng)池子的大小增加到一定程度之后,CPU 使用量的增速會放緩甚至變?yōu)?0,這時繼續(xù)增加池子大小只能導(dǎo)致回調(diào)函數(shù)開始排隊,不能真正增加吞吐量。正確的做法是增加硬件或者優(yōu)化代碼(提高算法效率、減少無謂調(diào)用等)。
關(guān)于 pool 的大小,我覺得是可以算出來的:
1、在壓力較小、pool 資源充足的情況下,測得單個請求平均處理總時間,記作 Ta
2、根據(jù)系統(tǒng)需求,估計一下能接受的最慢的請求處理時間,記作 Tm
3、設(shè) Ta 中有 Ts 的時間,執(zhí)行權(quán)是不屬于當(dāng)前處理中的 greenlet 的,比如正在進行異步的數(shù)據(jù)庫訪問或是調(diào)用遠端 API 等后端訪問
4、在常規(guī)壓力下,通過測量后端訪問請求處理的平均時間,根據(jù)代碼實際調(diào)用情況測算出 Ts
5、pool 的大小 = (Tm / (Ta - Ts)) * 150%,這里的 150% 是個 buffer 值,拍腦門拍出來的
比如理想情況下平均每個請求處理需要 20ms,其中平均有 15ms 是花在數(shù)據(jù)庫訪問上(假設(shè)數(shù)據(jù)庫性能較為穩(wěn)定,能夠線性 scale)。如果最大能容忍的請求處理時間是 500ms 的話,那池子大小應(yīng)該設(shè)置成 (500 / (20 - 15)) * 150% = 150,也就意味著單進程最大并發(fā)量是 150。
從這個算法也可以看出,花在 Python 端的 CPU 時間越少,系統(tǒng)并發(fā)量就越高,而花在后端訪問上的時間長短對并發(fā)影響不是很大——當(dāng)然了,依然得假設(shè)數(shù)據(jù)庫等后端可以線性 scale。
下面是我之前在 Amazon EC2 m1.small 機器上的部分測試結(jié)果,對比了同步多進程和 Gevent 在處理包含異步 PostgreSQL 和 Redis 訪問的請求時的性能:
Log Format (per actor) handling time for 500 requests / Time receiving 500 responses - time per handling / time per request - raw handling rate / request per second
8 actors, 128 testers: 798 rps on client
1230.08 ms / 5649.88 ms - 2.46 ms / 11.30 ms - 406.48 rps / 88.50 rps 1707.71 ms / 5938.53 ms - 3.42 ms / 11.88 ms - 292.79 rps / 84.20 rps 2219.12 ms / 6324.48 ms - 4.44 ms / 12.65 ms - 225.31 rps / 79.06 rps 1446.94 ms / 5491.89 ms - 2.89 ms / 10.98 ms - 345.56 rps / 91.04 rps 1064.61 ms / 5189.07 ms - 2.13 ms / 10.38 ms - 469.66 rps / 96.36 rps 2099.23 ms / 5844.37 ms - 4.20 ms / 11.69 ms - 238.18 rps / 85.55 rps
1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client
3995.44 ms / 560.62 ms - 7.99 ms / 1.12 ms - 125.14 rps / 891.87 rps 4369.57 ms / 575.34 ms - 8.74 ms / 1.15 ms - 114.43 rps / 869.06 rps 4388.47 ms / 590.63 ms - 8.78 ms / 1.18 ms - 113.93 rps / 846.55 rps 4439.61 ms / 579.39 ms - 8.88 ms / 1.16 ms - 112.62 rps / 862.97 rps 3866.82 ms / 574.92 ms - 7.73 ms / 1.15 ms - 129.31 rps / 869.69 rps
1 async actor with no concurrency limit, 128 testers: 987 rps on client
38191.16 ms / 551.76 ms - 76.38 ms / 1.10 ms - 13.09 rps / 906.20 rps 34354.80 ms / 564.43 ms - 68.71 ms / 1.13 ms - 14.55 rps / 885.84 rps 40397.18 ms / 543.23 ms - 80.79 ms / 1.09 ms - 12.38 rps / 920.42 rps 45406.02 ms / 490.45 ms - 90.81 ms / 0.98 ms - 11.01 rps / 1019.48 rps 37106.92 ms / 581.95 ms - 74.21 ms / 1.16 ms - 13.47 rps / 859.18 rps
能看出來,同樣是 8 的并發(fā)限制,同步比異步處理快兩三倍(但是 load balance 拉低了同步的優(yōu)勢),吞吐量上雖比不上異步,但也不差。在去掉并發(fā)限制之后,吞吐量變化不大,但處理時間翻了 10 倍(因為大量 callback 開始排隊,無法及時被調(diào)用到),且不穩(wěn)定。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/37306.html
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續(xù)部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續(xù)部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續(xù)部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結(jié)果是,不容易持續(xù)集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
閱讀 3645·2021-11-23 09:51
閱讀 1992·2021-11-16 11:42
閱讀 3238·2021-11-08 13:20
閱讀 1097·2019-08-30 15:55
閱讀 2206·2019-08-30 10:59
閱讀 1241·2019-08-29 14:04
閱讀 1023·2019-08-29 12:41
閱讀 2017·2019-08-26 12:22