摘要:解決方法有兩種。代碼然而這段代碼只有在運行在處的時候才能用中斷,即前你按有效,一旦后則完全無效建議先確認是否真的需要用到多進程,如果是多的程序建議用多線程或協(xié)程,計算特別多則用多進程。
本文理論上對multiprocessing.dummy的Pool同樣有效。
python2.x中multiprocessing提供的基于函數(shù)進程池,join后陷入內(nèi)核態(tài),按下ctrl+c不能停止所有的進程并退出。即必須ctrl+z后找到殘留的子進程,把它們干掉。先看一段ctrl+c無效的代碼:
#!/usr/bin/env python import multiprocessing import os import time def do_work(x): print "Work Started: %s" % os.getpid() time.sleep(10) return x * x def main(): pool = multiprocessing.Pool(4) try: result = pool.map_async(do_work, range(8)) pool.close() pool.join() print result except KeyboardInterrupt: print "parent received control-c" pool.terminate() pool.join() if __name__ == "__main__": main()
這段代碼運行后,按^c一個進程也殺不掉,最后會殘留包括主進程在內(nèi)共5個進程(1+4),kill掉主進程能讓其全部退出。很明顯,使用進程池時KeyboardInterrupt不能被進程捕捉。解決方法有兩種。
方案一下面這段是python源碼里multiprocessing下的pool.py中的一段,ApplyResult就是Pool用來保存函數(shù)運行結(jié)果的類
class ApplyResult(object): def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self
而下面這段代碼也是^c無效的代碼
if __name__ == "__main__": import threading cond = threading.Condition(threading.Lock()) cond.acquire() cond.wait() print "done"
很明顯,threading.Condition(threading.Lock())對象無法接收KeyboardInterrupt,但稍微修改一下,給cond.wait()一個timeout參數(shù)即可,這個timeout可以在map_async后用get傳遞,把
result = pool.map_async(do_work, range(4))
改為
result = pool.map_async(do_work, range(4)).get(1)
就能成功接收^c了,get里面填1填99999還是0xffff都行
方案二另一種方法當(dāng)然就是自己寫進程池了,需要使用隊列,貼一段代碼感受下
#!/usr/bin/env python import multiprocessing, os, signal, time, Queue def do_work(): print "Work Started: %d" % os.getpid() time.sleep(2) return "Success" def manual_function(job_queue, result_queue): signal.signal(signal.SIGINT, signal.SIG_IGN) while not job_queue.empty(): try: job = job_queue.get(block=False) result_queue.put(do_work()) except Queue.Empty: pass #except KeyboardInterrupt: pass def main(): job_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue() for i in range(6): job_queue.put(None) workers = [] for i in range(3): tmp = multiprocessing.Process(target=manual_function, args=(job_queue, result_queue)) tmp.start() workers.append(tmp) try: for worker in workers: worker.join() except KeyboardInterrupt: print "parent received ctrl-c" for worker in workers: worker.terminate() worker.join() while not result_queue.empty(): print result_queue.get(block=False) if __name__ == "__main__": main()方案三
使用一個全局變量eflag作標(biāo)識,讓SIG_INT信號綁定一個處理函數(shù),在其中對eflag的值更改,線程的函數(shù)中以eflag的值判定作為while的條件,把語句寫在循環(huán)里,老實說這個方案雖然可以用,但是簡直太差勁。線程肯定是可行的,進程應(yīng)該還需要多帶帶共享變量,非常不推薦的方式
這個必須要提一下,我發(fā)現(xiàn)segmentfault上都有人被誤導(dǎo)了
理論上,在Pool初始化時傳遞一個initializer函數(shù),讓子進程忽略SIGINT信號,也就是^c,然后Pool進行terminate處理。代碼
#!/usr/bin/env python import multiprocessing import os import signal import time def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN) def run_worker(x): print "child: %s" % os.getpid() time.sleep(20) return x * x def main(): pool = multiprocessing.Pool(4, init_worker) try: results = [] print "Starting jobs" for x in range(8): results.append(pool.apply_async(run_worker, args=(x,))) time.sleep(5) pool.close() pool.join() print [x.get() for x in results] except KeyboardInterrupt: print "Caught KeyboardInterrupt, terminating workers" pool.terminate() pool.join() if __name__ == "__main__": main()
然而這段代碼只有在運行在time.sleep(5)處的時候才能用ctrl+c中斷,即前5s你按^c有效,一旦pool.join()后則完全無效!
建議先確認是否真的需要用到多進程,如果是IO多的程序建議用多線程或協(xié)程,計算特別多則用多進程。如果非要用多進程,可以利用Python3的concurrent.futures包(python2.x也能裝),編寫更加簡單易用的多線程/多進程代碼,其使用和Java的concurrent框架有些相似.
經(jīng)過親自驗證,ProcessPoolExecutor是沒有^c的問題的,要用多進程建議使用它
http://bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool#comment12678760_6191991
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/37677.html
摘要:很簡單,這個模塊實現(xiàn)了開辟一塊共享內(nèi)存空間,就好比中的方法一樣,有興趣的同學(xué)可以去查閱。查了下資料,返回的對象控制了一個進程,可用于多進程之間的安全通信,其支持的類型有和等。 有關(guān)于 multiprocessing 中共享變量的問題 現(xiàn)在的cpu都很強大,比方我用的至強2620有24核可以同時工作,并行執(zhí)行進程程序。這在計算密集型的程序是很需要的,如沙漠中的綠洲,令人重獲新生。那么,問...
摘要:說一下進程線程以及多任務(wù)多進程多線程和協(xié)程進程概念一個程序?qū)?yīng)一個進程,這個進程被叫做主進程,而一個主進程下面還有許多子進程。避免了由于系統(tǒng)在處理多進程或者多線程時,切換任務(wù)時需要的等待時間。 showImg(https://segmentfault.com/img/bVbuYxg?w=3484&h=2480); 閱讀本文大約需要 10 分鐘。 14.說一下進程、線程、以及多任務(wù)(多進...
摘要:本節(jié)講學(xué)習(xí)的多進程。和之前的的不同點是丟向的函數(shù)有返回值,而的沒有返回值。所以接下來讓我們來看下這兩個進程是否會出現(xiàn)沖突。 本節(jié)講學(xué)習(xí)Python的多進程。 一、多進程和多線程比較 多進程 Multiprocessing 和多線程 threading 類似, 他們都是在 python 中用來并行運算的. 不過既然有了 threading, 為什么 Python 還要出一個 multip...
摘要:主進程會等待所有的子進程先結(jié)束,然后再結(jié)束主進程。關(guān)閉進程池,關(guān)閉后實例不再接收新的請求等待實例中的所有子進程執(zhí)行完畢,主進程才會退出,必須放在語句之后。主進程一般都用來等待,任務(wù)在子進程中執(zhí)行。 多任務(wù):同一個時間段中,執(zhí)行多個函數(shù)/運行多個程序. 操作系統(tǒng)可以同時運行多個任務(wù):操作系統(tǒng)輪流讓各個任務(wù)交替執(zhí)行,任務(wù)1執(zhí)行0.01秒,切換到任務(wù)2,任務(wù)2執(zhí)行0.01秒,再切換到任務(wù)3,...
摘要:目前開發(fā)中有遇到進程間需要共享數(shù)據(jù)的情況所以研究了下主要會以為例子說明下進程間共享同一個父進程使用說明創(chuàng)建一個對象創(chuàng)建一個創(chuàng)建一個測試程序創(chuàng)建進程池進行測試簡單的源碼分析這時我們再看一個例子創(chuàng)建一個對象創(chuàng)建一個創(chuàng)建一個測試程序創(chuàng)建進程池進行 目前開發(fā)中有遇到進程間需要共享數(shù)據(jù)的情況. 所以研究了下multiprocessing.Manager, 主要會以dict為例子, 說明下進程間共...
閱讀 1563·2021-11-19 09:55
閱讀 2787·2021-09-06 15:02
閱讀 3555·2019-08-30 15:53
閱讀 1099·2019-08-29 16:36
閱讀 1242·2019-08-29 16:29
閱讀 2294·2019-08-29 15:21
閱讀 632·2019-08-29 13:45
閱讀 2687·2019-08-26 17:15