摘要:徘徊和行程所用的時間使用指數分布生成,我們將時間設為分鐘數,以便顯示清楚。迭代表示各輛出租車的進程在各輛出租車上調用函數,預激協程。
前兩篇我們已經介紹了python 協程的使用和yield from 的原理,這一篇,我們用一個例子來揭示如何使用協程在單線程中管理并發活動。。
什么是離散事件仿真Wiki上的定義是:
離散事件仿真將系統隨時間的變化抽象成一系列的離散時間點上的事件,通過按照事件時間順序處理事件來演進,是一種事件驅動的仿真世界觀。離散事件仿真將系統的變化看做一個事件,因此系統任何的變化都只能是通過處理相應的事件來實現,在兩個相鄰的事件之間,系統狀態維持前一個事件發生后的狀態不變。
人話說就是一種把系統建模成一系列事件的仿真系統。在離散事件仿真中,仿真“鐘”向前推進的量不是固定的,而是直接推進到下一個事件模型的模擬時間。
假設我們抽象模擬出租車的運營過程,其中一個事件是乘客上車,下一個事件則是乘客下車。不管乘客做了5分鐘還是50分鐘,一旦下車,仿真鐘就會更新,指向此次運營的結束時間。
事件?是不是想到了協程!
協程恰好為實現離散事件仿真提供了合理的抽象。
出租車對運營仿真第一門面向對象的語音 Simula 引入協程這個概念就是為了支持仿真。
Simpy 是一個實現離散事件仿真的Python包,通過一個協程表示離散事件仿真系統的各個進程。
仿真程序會創建幾輛出租車,每輛出租車會拉幾個乘客,然后回家。出租車會首先駛離車庫,四處徘徊,尋找乘客;拉到乘客后,行程開始;乘客下車后,繼續四處徘徊。
徘徊和行程所用的時間使用指數分布生成,我們將時間設為分鐘數,以便顯示清楚。
完整代碼如下:(taxi_sim.py)
#! -*- coding: utf-8 -*- import random import collections import queue import argparse DEFAULT_NUMBER_OF_TAXIS = 3 DEFAULT_END_TIME = 180 SEARCH_DURATION = 5 TRIP_DURATION = 20 DEPARTURE_INTERAVAL = 5 # time 是事件發生的仿真時間,proc 是出租車進程實例的編號,action是描述活動的字符串 Event = collections.namedtuple("Event", "time proc action") # 開始 出租車進程 # 每輛出租車調用一次taxi_process 函數,創建一個生成器對象,表示各輛出租車的運營過程。 def taxi_process(ident, trips, start_time=0): """ 每次狀態變化時向創建事件,把控制權交給仿真器 :param ident: 出租車編號 :param trips: 出租車回家前的行程數量 :param start_time: 離開車庫的時間 :return: """ time = yield Event(start_time, ident, "leave garage") # 產出的第一個Event for i in range(trips): # 每次行程都會執行一遍這個代碼塊 # 產出一個Event實例,表示拉到了乘客 協程在這里暫停 等待下一次send() 激活 time = yield Event(time, ident, "pick up passenger") # 產出一個Event實例,表示乘客下車 協程在這里暫停 等待下一次send() 激活 time = yield Event(time, ident, "drop off passenger") # 指定的行程數量完成后,for 循環結束,最后產出 "going home" 事件。協程最后一次暫停 yield Event(time, ident, "going home") # 協程執行到最后 拋出StopIteration 異常 def compute_duration(previous_action): """使用指數分布計算操作的耗時""" if previous_action in ["leave garage", "drop off passenger"]: # 新狀態是四處徘徊 interval = SEARCH_DURATION elif previous_action == "pick up passenger": # 新狀態是開始行程 interval = TRIP_DURATION elif previous_action == "going home": interval = 1 else: raise ValueError("Unkonw previous_action: %s" % previous_action) return int(random.expovariate(1/interval)) + 1 # 開始仿真 class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() # 帶優先級的隊列 會按時間正向排序 self.procs = dict(procs_map) # 從獲取的procs_map 參數中創建本地副本,為了不修改用戶傳入的值 def run(self, end_time): """ 調度并顯示事件,直到時間結束 :param end_time: 結束時間 只需要指定一個參數 :return: """ # 調度各輛出租車的第一個事件 for iden, proc in sorted(self.procs.items()): first_event = next(proc) # 預激協程 并產出一個 Event 對象 self.events.put(first_event) # 把各個事件加到self.events 屬性表示的 PriorityQueue對象中 # 此次仿真的主循環 sim_time = 0 # 把 sim_time 歸0 while sim_time < end_time: if self.events.empty(): # 事件全部完成后退出循環 print("*** end of event ***") break current_event = self.events.get() # 獲取優先級最高(time 屬性最小)的事件 sim_time, proc_id, previous_action = current_event # 更新 sim_time print("taxi:", proc_id, proc_id * " ", current_event) active_proc = self.procs[proc_id] # 從self.procs 字典中獲取表示當前活動的出租車協程 next_time = sim_time + compute_duration(previous_action) try: next_event = active_proc.send(next_time) # 把計算得到的時間發送給出租車協程。協程會產出下一個事件,或者拋出 StopIteration except StopIteration: del self.procs[proc_id] # 如果有異常 表示已經退出, 刪除這個協程 else: self.events.put(next_event) # 如果沒有異常,把next_event 加入到隊列 else: # 如果超時 則走到這里 msg = "*** end of simulation time: {} event pendding ***" print(msg.format(self.events.qsize())) def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None): """初始化隨機生成器,構建過程,運行仿真程序""" if seed is not None: random.seed(seed) # 獲取可復現的結果 # 構建taxis 字典。值是三個參數不同的生成器對象。 taxis = {i: taxi_process(i, (i + 1) * 2, i*DEPARTURE_INTERAVAL) for i in range(num_taxis)} sim = Simulator(taxis) sim.run(end_time) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Taxi fleet simulator.") parser.add_argument("-e", "--end-time", type=int, default=DEFAULT_END_TIME, help="simulation end time; default=%s" % DEFAULT_END_TIME) parser.add_argument("-t", "--taxis", type=int, default=DEFAULT_NUMBER_OF_TAXIS, help="number of taxis running; default = %s" % DEFAULT_NUMBER_OF_TAXIS) parser.add_argument("-s", "--seed", type=int, default=None, help="random generator seed (for testing)") args = parser.parse_args() main(args.end_time, args.taxis, args.seed)
運行程序,
# -s 3 參數設置隨機生成器的種子,以便調試的時候隨機數不變,輸出相同的結果 python taxi_sim.py -s 3
輸出結果如下圖
從結果我們可以看出,3輛出租車的行程是交叉進行的。不同顏色的箭頭代表不同出租車從乘客上車到乘客下車的跨度。
從結果可以看出:
出租車每5隔分鐘從車庫出發
0 號出租車2分鐘后拉到乘客(time=2),1號出租車3分鐘后拉到乘客(time=8),2號出租車5分鐘后拉到乘客(time=15)
0 號出租車拉了兩個乘客
1 號出租車拉了4個乘客
2 號出租車拉了6個乘客
在此次示中,所有排定的事件都在默認的仿真時間內完成
我們先在控制臺中調用taxi_process 函數,自己駕駛一輛出租車,示例如下:
In [1]: from taxi_sim import taxi_process # 創建一個生成器,表示一輛出租車 編號是13 從t=0 開始,有兩次行程 In [2]: taxi = taxi_process(ident=13, trips=2, start_time=0) In [3]: next(taxi) # 預激協程 Out[3]: Event(time=0, proc=13, action="leave garage") # 發送當前時間 在控制臺中,變量_綁定的是前一個結果 # _.time + 7 是 0 + 7 In [4]: taxi.send(_.time+7) Out[4]: Event(time=7, proc=13, action="pick up passenger") # 這個事件有for循環在第一個行程的開頭產出 # 發送_.time+12 表示這個乘客用時12分鐘 In [5]: taxi.send(_.time+12) Out[5]: Event(time=19, proc=13, action="drop off passenger") # 徘徊了29 分鐘 In [6]: taxi.send(_.time+29) Out[6]: Event(time=48, proc=13, action="pick up passenger") # 乘坐了50分鐘 In [7]: taxi.send(_.time+50) Out[7]: Event(time=98, proc=13, action="drop off passenger") # 兩次行程結束 for 循環結束產出"going home" In [8]: taxi.send(_.time+5) Out[8]: Event(time=103, proc=13, action="going home") # 再發送值,會執行到末尾 協程返回后 拋出 StopIteration 異常 In [9]: taxi.send(_.time+10) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last)in () ----> 1 taxi.send(_.time+10) StopIteration:
在這個示例中,我們用控制臺模擬仿真主循環。從taxi協程中產出的Event實例中獲取 .time 屬性,隨意加一個數,然后調用send()方法發送兩數之和,重新激活協程。
在taxi_sim.py 代碼中,出租車協程由 Simulator.run 方法中的主循環驅動。
Simulator 類的主要數據結構如下:
self.events
PriorityQueue 對象,保存Event實例。元素可以放進PriorityQueue對象中,然后按 item[0](對象的time 屬性)依序取出(按從小到大)。
self.procs
一個字典,把出租車的編號映射到仿真過程的進程(表示出租車生成器的對象)。這個屬性會綁定前面所示的taxis字典副本。
優先隊列是離散事件仿真系統的基礎構件:創建事件的順序不定,放入這種隊列后,可以按各個事件排定的順序取出。
比如,我們把兩個事件放入隊列:
Event(time=14, proc=0, action="pick up passenger") Event(time=10, proc=1, action="pick up passenger")
這個意思是 0號出租車14分拉到一個乘客,1號出租車10分拉到一個乘客。但是主循環獲取的第一個事件將是
Event(time=10, proc=1, action="pick up passenger")
下面我們分析一下仿真系統的主算法--Simulator.run 方法。
迭代表示各輛出租車的進程
在各輛出租車上調用next()函數,預激協程。
把各個事件放入Simulator類的self.events屬性中。
滿足 sim_time < end_time 條件是,運行仿真系統的主循環。
檢查self.events 屬性是否為空;如果為空,跳出循環
從self.events 中獲取當前事件
顯示獲取的Event對象
獲取curent_event 的time 屬性,更新仿真時間
把時間發送給current_event 的pro屬性標識的協程,產出下一個事件
把next_event 添加到self.events 隊列中,排定 next_event
我們代碼中 while 循環有一個else 語句,仿真系統到達結束時間后,代碼會執行else中的語句。
這個示例主要是想說明如何在一個主循環中處理事件,以及如何通過發送數據驅動協程,同時解釋了如何使用生成器代替線程和回調,實現并發。
并發: 多個任務交替執行
并行: 多個任務同時執行
到這里 Python協程系列的三篇文章就結束了。
前兩篇文章我們會看到,協程做面向事件編程時,會不斷把控制權讓步給主循環,激活并向前運行其他協程,從而執行各個并發活動。
協程一種協作式多任務:協程顯式自主的把控制權讓步給中央調度程序。
多線程實現的是搶占式多任務。調度程序可以在任何時刻暫停線程,把控制權交給其他線程
python 協程1:協程10分鐘入門
python 協程2:yield from 從入門到精通
再次說明一下,這幾篇是《流暢的python》一書的讀書筆記,作者提供了大量的擴展閱讀,有興趣的可以看一下。
擴展閱讀Generator Tricks for Systems Programmers
A Curious Course on Coroutines and Concurrency
Generators: The Final Frontier
greedy algorithm with coroutines
BinaryTree類、一個簡單的XML解析器、和一個任務調度器Proposal for a yield from statement for Python
考慮用協程操作多個函數
最后,感謝女朋友支持。
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/44436.html
摘要:仿真示例出租車進程。每次狀態變化時向仿真程序產出一個事件結束出租車進程出租車仿真程序主程序。 這個簡單的例子讓我們比較淺顯易懂的看到了事件驅動型框架的運作方式,即在單個線程中使用一個主循環驅動協程執行并發活動。 使用協程做面向事件編程時,協程會不斷的把控制權讓步給主循環,激活并向前運行其他協程,從而執行各個并發活動。這是一種協作多任務:協程顯示的把控制權讓步給中央調度程序。 仿真示例 ...
摘要:于此同時,會阻塞,等待終止。子生成器返回之后,解釋器會拋出異常,并把返回值附加到異常對象上,只是委派生成器恢復。實例運行完畢后,返回的值綁定到上。這一部分處理調用方通過方法傳入的異常。之外的異常會向上冒泡。 上一篇python協程1:yield的使用介紹了: 生成器作為協程使用時的行為和狀態 使用裝飾器預激協程 調用方如何使用生成器對象的 .throw(...) 和 .close()...
摘要:協程,又稱微線程,纖程。最大的優勢就是協程極高的執行效率。生產者產出第條數據返回更新值更新消費者正在調用第條數據查看當前進行的線程函數中有,返回值為生成器庫實現協程通過提供了對協程的基本支持,但是不完全。 協程,又稱微線程,纖程。英文名Coroutine協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。 最大的優勢就是協程極高...
摘要:新語法表達式語句可以被用在賦值表達式的右側在這種情況下,它就是表達式。表達式必須始終用括號括起來,除非它是作為頂級表達式而出現在賦值表達式的右側。 showImg(https://segmentfault.com/img/bVbnQsb?w=4344&h=2418);PEP原文 : https://www.python.org/dev/pe... PEP標題: Coroutines v...
閱讀 3585·2021-11-24 10:19
閱讀 3724·2021-09-30 09:47
閱讀 1289·2019-08-30 15:56
閱讀 788·2019-08-29 15:11
閱讀 904·2019-08-29 13:43
閱讀 3567·2019-08-28 18:25
閱讀 2160·2019-08-26 13:27
閱讀 1436·2019-08-26 11:44