摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒(méi)有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
工作隊(duì)列(using the Pika Python client)
本章節(jié)教程重點(diǎn)介紹的內(nèi)容在第一篇教程中,我們編寫(xiě)了用于從命名隊(duì)列發(fā)送和接收消息的程序。在這一個(gè)中,我們將創(chuàng)建一個(gè)工作隊(duì)列,用于在多個(gè)工作人員之間分配耗時(shí)的任務(wù)。
工作隊(duì)列(又名:任務(wù)隊(duì)列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),并且必須等待它完成。相反,我們安排稍后完成任務(wù)。我們將任務(wù)封裝 為消息并將其發(fā)送到隊(duì)列。
在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多工人時(shí),任務(wù)將在他們之間共享。
這個(gè)概念在Web應(yīng)用程序中特別有用,因?yàn)樵诙痰腍TTP請(qǐng)求窗口中無(wú)法處理復(fù)雜的任務(wù)。
在本教程的前一部分中,我們發(fā)送了一條包含“Hello World!”的消息?,F(xiàn)在我們將發(fā)送代表復(fù)雜任務(wù)的字符串。
我們沒(méi)有真實(shí)世界的任務(wù),比如要調(diào)整大小的圖像或要渲染的PDF文件,所以讓我們假裝我們很忙 - 使用 time.sleep() 函數(shù)來(lái)偽裝它。
我們將把字符串中的點(diǎn)(".")數(shù)作為復(fù)雜度; 每一個(gè)點(diǎn)都會(huì)占用一秒的“工作”。例如,Hello ... 描述的假任務(wù)將需要三秒鐘。
我們稍微修改前面例子中的send.py代碼,以允許從命令行發(fā)送任意消息。這個(gè)程序?qū)讶蝿?wù)安排到我們的工作隊(duì)列中,所以讓我們把它命名為new_task.py:
import sys message = " ".join(sys.argv[1:]) or "Hello World" channel.basic_publish(exchange="", routing_key="hello", body=message) print(" [x] Sent %r" % message)
我們的舊版receive.py腳本也需要進(jìn)行一些更改:它需要為郵件正文中的每個(gè)點(diǎn)偽造第二個(gè)工作。它會(huì)從隊(duì)列中彈出消息并執(zhí)行任務(wù),所以我們稱(chēng)之為worker.py:
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done")循環(huán)調(diào)度
使用任務(wù)隊(duì)列的優(yōu)點(diǎn)之一是可以輕松地平行工作。如果我們正在積累積壓的工作,我們可以增加更多的工作人員,并且這種方式很容易擴(kuò)展。
首先,我們?cè)囍瑫r(shí)運(yùn)行兩個(gè)worker.py腳本。他們都會(huì)從隊(duì)列中獲取消息,但具體到底是什么?讓我們來(lái)看看。
您需要打開(kāi)三個(gè)控制臺(tái)。兩個(gè)將運(yùn)行worker.py腳本。這些控制臺(tái)將成為我們的兩個(gè)消費(fèi)者 - C1和C2。
默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者將獲得相同數(shù)量的消息。這種分配消息的方式稱(chēng)為循環(huán)法。請(qǐng)嘗試與三名或更多的工人。
消息確認(rèn)做任務(wù)可能需要幾秒鐘的時(shí)間。你可能想知道如果其中一個(gè)消費(fèi)者開(kāi)始一項(xiàng)長(zhǎng)期任務(wù)并且只是部分完成而死亡會(huì)發(fā)生什么。
用我們目前的代碼,一旦RabbitMQ將消息傳遞給客戶,它立即將其標(biāo)記為刪除。在這種情況下,如果你殺了一個(gè)工人,我們將失去剛剛處理的信息。
我們也會(huì)失去所有派發(fā)給這個(gè)特定工作人員但尚未處理的消息。
但我們不想失去任何任務(wù)。如果一名工人死亡,我們希望將任務(wù)交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持消息確認(rèn)。消費(fèi)者發(fā)回ack(請(qǐng)求)告訴RabbitMQ已經(jīng)收到,處理了特定的消息,并且RabbitMQ可以自由刪除它。
如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失),RabbitMQ將理解消息未被完全處理,并將重新排隊(duì)。如果有其他消費(fèi)者同時(shí)在線,它會(huì)迅速將其重新發(fā)送給另一位消費(fèi)者。
這樣,即使工作人員偶爾死亡,也可以確保沒(méi)有任何信息丟失。
沒(méi)有任何消息超時(shí); 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新傳遞消息。即使處理消息需要非常很長(zhǎng)的時(shí)間也沒(méi)關(guān)系。
消息確認(rèn)默認(rèn)是被打開(kāi)的。在前面的例子中,我們通過(guò) no_ack = True 標(biāo)志明確地將它們關(guān)閉。一旦我們完成了一項(xiàng)任務(wù),現(xiàn)在是時(shí)候清除這個(gè)標(biāo)志并且發(fā)送工人的正確確認(rèn)。
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue="hello")
使用這段代碼,我們可以確定,即使在處理消息時(shí)使用CTRL + C來(lái)殺死一個(gè)工作者,也不會(huì)丟失任何東西。工人死后不久,所有未確認(rèn)的消息將被重新發(fā)送。
消息持久性我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。
當(dāng)RabbitMQ退出或崩潰時(shí),它會(huì)忘記隊(duì)列和消息,除非您告訴它不要。需要做兩件事來(lái)確保消息不會(huì)丟失:我們需要將隊(duì)列和消息標(biāo)記為持久。
首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)失去我們的隊(duì)列。為了做到這一點(diǎn),我們需要宣布它是持久的:
channel.queue_declare(queue="hello", durable=True)
雖然這個(gè)命令本身是正確的,但它在我們的設(shè)置中不起作用。那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為hello的隊(duì)列 ,這個(gè)隊(duì)列并不"耐用"。
RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有的隊(duì)列,并會(huì)向任何試圖執(zhí)行該操作的程序返回錯(cuò)誤。
但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱(chēng)的隊(duì)列,例如task_queue:
channel.queue_declare(queue="task_queue", durable=True)
此queue_declare更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。
此時(shí)我們確信,即使RabbitMQ重新啟動(dòng),task_queue隊(duì)列也不會(huì)丟失。現(xiàn)在我們需要將消息標(biāo)記為持久 - 通過(guò)提供值為2的delivery_mode屬性。
channel.basic_publish(exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 ))公平派遣
您可能已經(jīng)注意到調(diào)度仍然無(wú)法完全按照我們的要求工作。例如,在有兩名工人的情況下,當(dāng)所有奇怪的信息都很重,甚至信息很少時(shí),一名工作人員會(huì)一直很忙,
另一名工作人員幾乎不會(huì)做任何工作。那么,RabbitMQ不知道任何有關(guān)這一點(diǎn),并仍將均勻地發(fā)送消息。
發(fā)生這種情況是因?yàn)镽abbitMQ只在消息進(jìn)入隊(duì)列時(shí)調(diào)度消息。它沒(méi)有考慮消費(fèi)者未確認(rèn)消息的數(shù)量。它只是盲目地將第n條消息分發(fā)給第n位消費(fèi)者。
為了解決這個(gè)問(wèn)題,我們可以使用basic.qos方法和設(shè)置prefetch_count = 1。這告訴RabbitMQ一次不要向工作人員發(fā)送多個(gè)消息。
或者換句話說(shuō),不要向工作人員發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)工作人員。
channel.basic_qos(prefetch_count=1)把它放在一起
我們的new_task.py腳本的最終代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="task_queue", durable=True) message = " ".join(sys.argv[1:]) or "Hello World" channel.basic_publish(exchange="", routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 )) print(" [x] Sent %r" % message) connection.close()
而我們的工人 worker.py:
#!/usr/bin/env python import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="task_queue", durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b".")) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue="hello") channel.basic_qos(prefetch_count=1) print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming()
使用消息確認(rèn)和prefetch_count,您可以設(shè)置一個(gè)工作隊(duì)列。即使RabbitMQ重新啟動(dòng),持久性選項(xiàng)也可讓任務(wù)繼續(xù)存在。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/44704.html
摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊(duì)列中接收消息。消費(fèi)者是接收消息的用戶的應(yīng)用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列中。交換和隊(duì)列之間的關(guān)系稱(chēng)為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點(diǎn)介紹的...
摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫(kù)調(diào)用。接下來(lái),我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過(guò)發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...
摘要:通常用于命名回調(diào)隊(duì)列。對(duì)每個(gè)響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個(gè)非常簡(jiǎn)單的工作,對(duì)于每個(gè)響應(yīng)消息它檢查是否是我們正在尋找的。在這個(gè)方法中,首先我們生成一個(gè)唯一的數(shù)并保存回調(diào)函數(shù)將使用這個(gè)值來(lái)捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠(yuǎn)程過(guò)程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
摘要:為了避免與參數(shù)混淆,我們將其稱(chēng)為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費(fèi)者。在這種設(shè)置中,使用路由鍵發(fā)布到交換機(jī)的消息將被路由到隊(duì)列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節(jié)教程重點(diǎn)介紹的內(nèi)容 在之前的教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收...
摘要:?jiǎn)卧~可以是任何東西,但通常它們指定了與該消息相關(guān)的一些功能。消息將使用由三個(gè)字兩個(gè)點(diǎn)組成的路由鍵發(fā)送。另一方面,只會(huì)進(jìn)入第一個(gè)隊(duì)列,而只會(huì)進(jìn)入第二個(gè)隊(duì)列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個(gè)教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
閱讀 1238·2021-09-26 09:46
閱讀 1590·2021-09-06 15:00
閱讀 719·2019-08-30 15:52
閱讀 1124·2019-08-29 13:10
閱讀 1284·2019-08-26 13:47
閱讀 1484·2019-08-26 13:35
閱讀 2032·2019-08-23 18:38
閱讀 729·2019-08-23 17:59