摘要:本文最先發(fā)布在博客這篇文章將講解并發(fā)編程的基本操作。并發(fā)是指能夠多任務處理,并行則是是能夠同時多任務處理。雖然自帶了很好的類庫支持多線程進程編程,但眾所周知,因為的存在,很難做好真正的并行。
本文最先發(fā)布在博客:https://blog.ihypo.net/151628...
這篇文章將講解 Python 并發(fā)編程的基本操作。并發(fā)和并行是對孿生兄弟,概念經(jīng)常混淆。并發(fā)是指能夠多任務處理,并行則是是能夠同時多任務處理。Erlang 之父 Joe Armstrong 有一張非常有趣的圖說明這兩個概念:
我個人更喜歡的一種說法是:并發(fā)是宏觀并行而微觀串行。
GIL雖然 Python 自帶了很好的類庫支持多線程/進程編程,但眾所周知,因為 GIL 的存在,Python 很難做好真正的并行。
GIL 指全局解釋器鎖,對于 GIL 的介紹:
全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設(shè)計語言解釋器用于同步線程的一種機制,它使得任何時刻僅有一個線程在執(zhí)行。
維基百科
其實與其說 GIL 是 Python 解釋器的限制,不如說是 CPython 的限制,因為 Python 為了保障性能,底層大多使用 C 實現(xiàn)的,而 CPython 的內(nèi)存管理并不是線程安全的,為了保障整體的線程安全,解釋器便禁止多線程的并行執(zhí)行。
因為 Python 社區(qū)認為操作系統(tǒng)的線程調(diào)度已經(jīng)非常成熟了,沒有必要自己再實現(xiàn)一遍,因此 Python 的線程切換基本是依賴操作系統(tǒng),在實際的使用中,對于單核 CPU,GIL 并沒有太大的影響,但對于多核 CPU 卻引入了線程顛簸(thrashing)問題。
線程顛簸是指作為單一資源的 GIL 鎖,在被多核心競爭強占時資源額外消耗的現(xiàn)象。
比如下圖,線程1 在釋放 GIL 鎖后,操作系統(tǒng)喚醒了 線程2,并將 線程2 分配給 核心2 執(zhí)行,但是如果此時 線程2 卻沒有成功獲得 GIL 鎖,只能再次被掛起。此時切換線程、切換上下文的資源都將白白浪費。
因此,Python 多線程程序在多核 CPU 機器下的性能不一定比單核高。那么如果是計算密集型的程序,一般還是考慮用 C 重寫關(guān)鍵部分,或者使用多進程避開 GIL。
多線程在 Python 中使用多線程,有 thread 和 threading 可供原則,thread 提供了低級別的、原始的線程以及一個簡單的鎖,因為 thread 過于簡陋,線程管理容易出現(xiàn)人為失誤,因此官方更建議使用 threading,而 threading 也不過是對 thread 的封裝和補充。(Python3 中 thread 被改名為 _thread)。
在 Python 中創(chuàng)建線程非常簡單:
import time import threading def do_task(task_name): print("Get task: {}".format(task_name)) time.sleep(1) print("Finish task: {}".format(task_name)) if __name__ == "__main__": tasks = [] for i in range(0, 10): # 創(chuàng)建 task tasks.append(threading.Thread( target=do_task, args=("task_{}".format(i),))) for t in tasks: # 開始執(zhí)行 task t.start() for t in tasks: # 等待 task 執(zhí)行完畢 # 完畢前會阻塞住主線程 t.join() print("Finish.")
直接創(chuàng)建線程簡單優(yōu)雅,如果邏輯復雜,也可以通過繼承 Thread 基類完成多線程:
import time import threading class MyTask(threading.Thread): def __init__(self, task_name): super(MyTask, self).__init__() self.task_name = task_name def run(self): print("Get task: {}".format(self.task_name)) time.sleep(1) print("Finish task: {}".format(self.task_name)) if __name__ == "__main__": tasks = [] for i in range(0, 10): # 創(chuàng)建 task tasks.append(MyTask("task_{}".format(i))) for t in tasks: # 開始執(zhí)行 task t.start() for t in tasks: # 等待 task 執(zhí)行完畢 # 完畢前會阻塞住主線程 t.join() print("Finish.")多進程
在 Python 中,可以使用 multiprocessing 庫來實現(xiàn)多進程編程,和多線程一樣,有兩種方法可以使用多進程編程。
直接創(chuàng)建進程:
import time import random import multiprocessing def do_something(task_name): print("Get task: {}".format(task_name)) time.sleep(random.randint(1, 5)) print("Finish task: {}".format(task_name)) if __name__ == "__main__": tasks = [] for i in range(0, 10): # 創(chuàng)建 task tasks.append(multiprocessing.Process( target=do_something, args=("task_{}".format(i),))) for t in tasks: # 開始執(zhí)行 task t.start() for t in tasks: # 等待 task 執(zhí)行完畢 # 完畢前會阻塞住主線程 t.join() print("Finish.")
繼承進程父類:
import time import random import multiprocessing class MyTask(multiprocessing.Process): def __init__(self, task_name): super(MyTask, self).__init__() self.task_name = task_name def run(self): print("Get task: {}".format(self.task_name)) time.sleep(random.randint(1, 5)) print("Finish task: {}".format(self.task_name)) if __name__ == "__main__": tasks = [] for i in range(0, 10): # 創(chuàng)建 task tasks.append(MyTask("task_{}".format(i))) for t in tasks: # 開始執(zhí)行 task t.start() for t in tasks: # 等待 task 執(zhí)行完畢 # 完畢前會阻塞住主線程 t.join() print("Finish.")
multiprocessing 除了常用的多進程編程外,我認為它最大的意義在于提供了一套規(guī)范,在該庫下有一個 dummy 模塊,即 multiprocessing.dummy,里面對 threading 進行封裝,提供了和 multiprocessing 相同 API 的線程實現(xiàn),換句話說,class::multiprocessing.Process 提供的是進程任務類,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的講一個多進程程序改為多線程:
import time import random from multiprocessing.dummy import Process class MyTask(Process): def __init__(self, task_name): super(MyTask, self).__init__() self.task_name = task_name def run(self): print("Get task: {}".format(self.task_name)) time.sleep(random.randint(1, 5)) print("Finish task: {}".format(self.task_name)) if __name__ == "__main__": tasks = [] for i in range(0, 10): # 創(chuàng)建 task tasks.append(MyTask("task_{}".format(i))) for t in tasks: # 開始執(zhí)行 task t.start() for t in tasks: # 等待 task 執(zhí)行完畢 # 完畢前會阻塞住主線程 t.join() print("Finish.")
無論是多線程還是多進程編程,這也是我一般會選擇 multiprocessing 的原因。
除了直接創(chuàng)建進程,還可以用進程池(或者 multiprocessing.dummy 里的進程池):
import time import random from multiprocessing import Pool def do_task(task_name): print("Get task: {}".format(task_name)) time.sleep(random.randint(1, 5)) print("Finish task: {}".format(task_name)) if __name__ == "__main__": pool = Pool(5) for i in range(0, 10): # 創(chuàng)建 task pool.apply_async(do_task, ("task_{}".format(i),)) pool.close() pool.join() print("Finish.")
線程池:
import time import random from multiprocessing.dummy import Pool def do_task(task_name): print("Get task: {}".format(task_name)) time.sleep(random.randint(1, 5)) print("Finish task: {}".format(task_name)) if __name__ == "__main__": pool = Pool(5) for i in range(0, 10): # 創(chuàng)建 task pool.apply_async(do_task, ("task_{}".format(i),)) pool.close() pool.join() print("Finish.")
這里示例有個問題,pool 在 join 前需要 close 掉,否則就會拋出異常,不過 Python 之禪的作者 Tim Peters 給出解釋:
As to Pool.close(), you should call that when - and only when - you"re never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.同步原語It"s also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there"s often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you"d otherwise never see.
在多進程編程中,因為進程間的資源隔離,不需要考慮內(nèi)存的線程安全問題,而在多線程編程中便需要同步原語來保存線程安全,因為 Python 是一門簡單的語言,很多操作都是封裝的操作系統(tǒng) API,因此支持的同步原語蠻全,但這里只寫兩種常見的同步原語:鎖和信號量。
通過使用鎖可以用來保護一段內(nèi)存空間,而信號量可以被多個線程共享。
在 threading 中可以看到 Lock 鎖和 RLock 重用鎖兩種鎖,區(qū)別如名。這兩種鎖都只能被一個線程擁有,第一種鎖只能被獲得一次,而重用鎖可以被多次獲得,但也需要同樣次數(shù)的釋放才能真正的釋放。
當多個線程對同一塊內(nèi)存空間同時進行修改的時候,經(jīng)常遇到奇怪的問題:
import time import random from threading import Thread, Lock count = 0 def do_task(): global count time.sleep(random.randint(1, 10) * 0.1) tmp = count tmp += 1 time.sleep(random.randint(1, 10) * 0.1) count = tmp print(count) if __name__ == "__main__": tasks = [] for i in range(0, 10): tasks.append(Thread(target=do_task)) for t in tasks: t.start() for t in tasks: t.join() print("Finish. Count = {}".format(count))
如上就是典型的非線程安全導致 count 沒有達到預期的效果。而通過鎖便可以控制某一段代碼,或者說某段內(nèi)存空間的訪問:
import time import random from threading import Thread, Lock count = 0 lock = Lock() def do_task(): lock.acquire() global count time.sleep(random.randint(1, 10) * 0.1) tmp = count tmp += 1 time.sleep(random.randint(1, 10) * 0.1) count = tmp print(count) lock.release() if __name__ == "__main__": tasks = [] for i in range(0, 10): tasks.append(Thread(target=do_task)) for t in tasks: t.start() for t in tasks: t.join() print("Finish. Count = {}".format(count))
當然,上述例子非常暴力,直接強行把并發(fā)改為串行。
對于信號量常見于有限資源強占的場景,可以定義固定大小的信號量供多個線程獲取或者釋放,從而控制線程的任務執(zhí)行,比如下面的例子,控制最多有 5 個任務在執(zhí)行:
import time import random from threading import Thread, BoundedSemaphore sep = BoundedSemaphore(5) def do_task(task_name): sep.acquire() print("do Task: {}".format(task_name)) time.sleep(random.randint(1, 10)) sep.release() if __name__ == "__main__": tasks = [] for i in range(0, 10): tasks.append(Thread(target=do_task, args=("task_{}".format(i),))) for t in tasks: t.start() for t in tasks: t.join() print("Finish.")Queue 和 Pipe
因為多進程的內(nèi)存隔離,不會存在內(nèi)存競爭的問題。但同時,多個進程間的數(shù)據(jù)共享成為了新的問題,而進程間通信常見:隊列,管道,信號。
這里只講解隊列和管道。
隊列常見于雙進程模型,一般用作生產(chǎn)者-消費者模式,由生產(chǎn)者進程向隊列中發(fā)布任務,并由消費者從隊列首部拿出任務進行執(zhí)行:
import time from multiprocessing import Process, Queue class Task1(Process): def __init__(self, queue): super(Task1, self).__init__() self.queue = queue def run(self): item = self.queue.get() print("get item: [{}]".format(item)) class Task2(Process): def __init__(self, queue): super(Task2, self).__init__() self.queue = queue def run(self): print("put item: [Hello]") time.sleep(1) self.queue.put("Hello") if __name__ == "__main__": queue = Queue() t1 = Task1(queue) t2 = Task2(queue) t1.start() t2.start() t1.join() print("Finish.")
理論上每個進程都可以向隊列里的讀或者寫,可以認為隊列是半雙工路線。但是往往只有特定的讀進程(比如消費者)和寫進程(比如生產(chǎn)者),盡管這些進程只是開發(fā)者自己定義的。
而 Pipe 更像一個全工路線:
import time from multiprocessing import Process, Pipe class Task1(Process): def __init__(self, pipe): super(Task1, self).__init__() self.pipe = pipe def run(self): item = self.pipe.recv() print("Task1: recv item: [{}]".format(item)) print("Task1: send item: [Hi]") self.pipe.send("Hi") class Task2(Process): def __init__(self, pipe): super(Task2, self).__init__() self.pipe = pipe def run(self): print("Task2: send item: [Hello]") time.sleep(1) self.pipe.send("Hello") time.sleep(1) item = self.pipe.recv() print("Task2: recv item: [{}]".format(item)) if __name__ == "__main__": pipe = Pipe() t1 = Task1(pipe[0]) t2 = Task2(pipe[1]) t1.start() t2.start() t1.join() t2.join() print("Finish.")庫
除了上面介紹的 threading 和 multiprocessing 兩個庫外,還有一個好用的令人發(fā)指的庫 concurrent.futures。和前面兩個庫不同,這個庫是更高等級的抽象,隱藏了很多底層的東西,但也因此非常好用。用官方的例子:
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
該庫中自帶了進程池和線程池,可以通過上下文管理器來管理,而且對于異步任務執(zhí)行完后,結(jié)果的獲得也非常簡單。再拿一個官方的多進程計算的例子作為結(jié)束:
import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print("%d is prime: %s" % (number, prime)) if __name__ == "__main__": main()
歡迎關(guān)注個人公眾號:CS實驗室
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/41698.html
摘要:使用進行并發(fā)編程篇三掘金這是使用進行并發(fā)編程系列的最后一篇。所以我考慮啟用一個本地使用進行并發(fā)編程篇二掘金我們今天繼續(xù)深入學習。 使用 Python 進行并發(fā)編程 - asyncio 篇 (三) - 掘金 這是「使用Python進行并發(fā)編程」系列的最后一篇。我特意地把它安排在了16年最后一天。 重新實驗上篇的效率對比的實現(xiàn) 在第一篇我們曾經(jīng)對比并發(fā)執(zhí)行的效率,但是請求的是httpb...
摘要:本文重點掌握異步編程的相關(guān)概念了解期物的概念意義和使用方法了解中的阻塞型函數(shù)釋放的特點。一異步編程相關(guān)概念阻塞程序未得到所需計算資源時被掛起的狀態(tài)。 導語:本文章記錄了本人在學習Python基礎(chǔ)之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、掌握異步編程的相關(guān)概念;2、了解期物future的概念、意義和使用方法;3、了解Python...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術(shù)專家我看過哪些技術(shù)類書籍。 大家好,我是...
摘要:擴展支持多用戶并發(fā)訪問與線程池。項目請見初學網(wǎng)絡編程之服務器。不允許超過磁盤配額。該文件是一個使用模塊編寫的線程池類。這一步就做到了線程池的作用。 對MYFTP項目進行升級。擴展支持多用戶并發(fā)訪問與線程池。MYFTP項目請見python初學——網(wǎng)絡編程之FTP服務器。 擴展需求 1.在之前開發(fā)的FTP基礎(chǔ)上,開發(fā)支持多并發(fā)的功能2.不能使用SocketServer模塊,必須自己實現(xiàn)多線...
摘要:針對的初學者,從無到有的語言如何入門,主要包括了的簡介,如何下載,如何安裝,如何使用終端,等各種開發(fā)環(huán)境進行開發(fā),中的語法和基本知識概念和邏輯,以及繼續(xù)深入學習的方法。 ...
閱讀 1685·2021-11-15 11:37
閱讀 3415·2021-09-28 09:44
閱讀 1659·2021-09-07 10:15
閱讀 2794·2021-09-03 10:39
閱讀 2695·2019-08-29 13:20
閱讀 1300·2019-08-29 12:51
閱讀 2212·2019-08-26 13:44
閱讀 2131·2019-08-23 18:02