小編寫這篇文章的主要目的,主要是給大家介紹關于python3 queue多線程通信,這里面有很多的技術性的難點,那么,該怎么去進行處理呢,下面小編給大家進行詳細的解答一下。
queue分類
python3 queue分三類:
先進先出隊列
后進先出的棧
優先級隊列
他們的導入方式分別是:
from queue import Queue from queue import LifoQueue from queue import
具體方法見下面引用說明。
Queue對象已經包含了必要的鎖,所以你可以通過它在多個線程間多安全地共享數據。當使用隊列時,協調生產者和消費者的關閉問題可能會有一些麻煩。一個通用的解決方法是在隊列中放置一個特殊的值,當消費者讀到這個值的時候,終止執行。
例如:
from queue import Queue from threading import Thread #用來表示終止的特殊對象 _sentinel=object() #A thread that produces data def producer(out_q): for i in range(10): print("生產") out_q.put(i) out_q.put(_sentinel) #A thread that consumes data def consumer(in_q): while True: data=in_q.get() if data is _sentinel: in_q.put(_sentinel) break else: print("消費",data) #Create the shared queue and launch both threads q=Queue() t1=Thread(target=consumer,args=(q,)) t2=Thread(target=producer,args=(q,)) t1.start() t2.start()
結果:
本例里面有一個不尋常的位置:購買者在學到這些特殊值過后馬上又將它放返回序列中,將它傳下去。那樣,任何竊聽這一個序列的用戶進程就能夠關閉所有了。雖然序列是一種常見的線程間通信制度,但仍然能自己根據構建自已的程序設計并添加所需的鎖和同步機制來實現線程間通信。最常見的方法是使用Condition變量來包裝你的程序設計。下邊這個例子演示了如何創建一個線程安全的優先級隊列。
import heapq import threading class PriorityQueue: def __init__(self): self._queue=[] self._count=0 self._cv=threading.Condition() def put(self,item,priority): with self._cv: heapq.heappush(self._queue,(-priority,self._count,item)) self._count+=1 self._cv.notify() def get(self): with self._cv: while len(self._queue)==0: self._cv.wait() return heapq.heappop(self._queue)[-1]
例子二、task_done和join
使用隊列來進行線程間通信是一個單向、不確定的過程。通常情況下,你沒有辦法知道接收數據的線程是什么時候接收到的數據并開始工作的。不過隊列對象提供一些基本完成的特性,比如下邊這個例子中的task_done()和join():
from queue import Queue from threading import Thread class Producer(Thread): def __init__(self,q): super().__init__() self.count=5 self.q=q def run(self): while self.count>0: print("生產") if self.count==1: self.count-=1 self.q.put(2) else: self.count-=1 self.q.put(1) class Consumer(Thread): def __init__(self,q): super().__init__() self.q=q def run(self): while True: print("消費") data=self.q.get() if data==2: print("stop because data=",data) #任務完成,從隊列中清除一個元素 self.q.task_done() break else: print("data is good,data=",data) #任務完成,從隊列中清除一個元素 self.q.task_done() def main(): q=Queue() p=Producer(q) c=Consumer(q) p.setDaemon(True) c.setDaemon(True) p.start() c.start() #等待隊列清空 q.join() print("queue is complete") if __name__=='__main__': main()
結果:
例子三、多線程里用queue
設置倆隊列,一個是要做的任務隊列todo_queue,一個是已經完成的隊列done_queue。
每次執行線程,先從todo_queue隊列里取出一個值,然后執行完,放入done_queue隊列。
如果todo_queue為空,就退出。
import logging import logging.handlers import threading import queue log_mgr=None todo_queue=queue.Queue() done_queue=queue.Queue() class LogMgr: def __init__(self,logpath): self.LOG=logging.getLogger('log') loghd=logging.handlers.RotatingFileHandler(logpath,"a",0,1) fmt=logging.Formatter("%(asctime)s%(threadName)-10s%(message)s","%Y-%m-%d%H:%M:%S") loghd.setFormatter(fmt) self.LOG.addHandler(loghd) self.LOG.setLevel(logging.INFO) def info(self,msg): if self.LOG is not None: self.LOG.info(msg) class Worker(threading.Thread): global log_mgr def __init__(self,name): threading.Thread.__init__(self) self.name=name def run(self): while True: try: task=todo_queue.get(False) if task: log_mgr.info("HANDLE_TASK:%s"%task) done_queue.put(1) except queue.Empty: break return def main(): global log_mgr log_mgr=LogMgr("mylog") for i in range(30): todo_queue.put("data"+str(i)) workers=[] for i in range(3): w=Worker("worker"+str(i)) workers.append(w) for i in range(3): workers<i>.start() for i in range(3): workers<i>.join() total_num=done_queue.qsize() log_mgr.info("TOTAL_HANDLE_TASK:%d"%total_num) exit(0) if __name__=='__main__': main()
輸出日志文件結果:
到此為止,小編就給大家介紹到這里了,希望可以給各位讀者帶來幫助。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/127777.html
摘要:第二節將任務添加到隊列上一個栗子只是簡單實現了下網頁與后臺的通信你可以在這里處理任何你想要的操作你已經點到我了但由于是同一個進程,如果你做了很耗時的操作,比如下載一張圖片之類的操作你會發現,窗口卡住了,一般表現為窗口泛白,出現未響應的提示但 第二節 將任務添加到隊列! 上一個栗子只是簡單實現了下網頁與后臺的通信 def clickMe(self): #你可以在這里處理任何你想要...
摘要:,,等實用方法可以獲取一個隊列的當前大小和狀態。但要注意,這些方法都不是線程安全的。可能你對一個隊列使用判斷出這個隊列為空,但同時另外一個線程可能已經向這個隊列中插入一個數據項。 python 多線程編程 使用回調方式 import time def countdown(n): while n > 0: print(T-minus, n) n -...
摘要:默認值為,指定為時代表可以阻塞,若同時指定,在超時時返回。當消費者線程調用意味著有消費者取得任務并完成任務,未完成的任務數就會減少。當未完成的任務數降到,解除阻塞。 學習契機 最近的一個項目中在使用grpc時遇到一個問題,由于client端可多達200,每個端口每10s向grpc server發送一次請求,server端接受client的請求后根據request信息更新數據庫,再將數據...
摘要:進程線程切換都需要使用一定的時間。子進程在中,如果要運行系統命令,會使用來運行,官方建議使用方法來運行系統命令,更高級的用法是直接使用其接口。 多線程 簡單示例 對于CPU計算密集型的任務,python的多線程跟單線程沒什么區別,甚至有可能會更慢,但是對于IO密集型的任務,比如http請求這類任務,python的多線程還是有用處。在日常的使用中,經常會結合多線程和隊列一起使用,比如,以...
摘要:在中由于歷史原因使得中多線程的效果非常不理想使得任何時刻只能利用一個核并且它的調度算法簡單粗暴多線程中讓每個線程運行一段時間然后強行掛起該線程繼而去運行其他線程如此周而復始直到所有線程結束這使得無法有效利用計算機系統中的局部性頻繁的線程切換 GIL 在Python中,由于歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,...
閱讀 919·2023-01-14 11:38
閱讀 891·2023-01-14 11:04
閱讀 750·2023-01-14 10:48
閱讀 2039·2023-01-14 10:34
閱讀 956·2023-01-14 10:24
閱讀 834·2023-01-14 10:18
閱讀 506·2023-01-14 10:09
閱讀 583·2023-01-14 10:02