摘要:這樣的消息分發機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。
工作隊列
在第一篇中,我們寫了一個程序從已經聲明的隊列中收發消息,在這篇中,我們會創建一個工作隊列(Work Queue)來分發works里面的耗時任務。
其主要思想就是避免立即執行耗資源的任務,并等待它完成。相反的,我們要讓這些任務在稍后的一個時間執行。我們把任務封裝成一個消息放到隊列中。
一個工作進程會在后臺執行,取出(Pop)任務并最終會完成這項任務,當你運行多個work的時候,這些任務會在它們之間共享。
這個概念在web應用中也是非常有用的,當在一個http請求窗口中不可能完成一個復雜的任務時候。
準備在之前的引導中,我們發送了一個’Hello World‘的消息。現在我們要發送一個字符串代表一個復雜的任務,我們沒有像調整
圖片大小或者渲染一個pdf文件這樣的在真實場景中的任務,所以我們使用setTimeout來模擬我們正處于忙碌狀態。我們把‘.’的數量代表這個字符串的復雜度;
每一個‘." 會消耗一秒鐘,例:一個模擬的任務"Hello..." 會消耗三秒鐘。
從之前的例子,我們稍稍修改一下send.js 的代碼,允許命令行可以發送任意的消息。這個程序在工作隊列中安排好任務,所以我們稱它new_task.js
var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg);
我們的之前的receive.js同樣需要一些改變,需要對消息內容中的每個"."模擬成一個會消耗一秒的任務。它要從隊列中取出一條消息并執行這個任務,我們把它稱作worker.js
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); }, secs * 1000); }, {noAck: true});
注意我們模擬的執行時間
執行我們的程序
shell1$ ./worker.js shell2$ ./new_task.js循環調度
使用任務隊列(Task Queue)的其中的一個優勢是有簡化并行工作的能力。如果我們有很多堆積的未完成的任務,我們只需添加更多的worker來進行擴展。
首先,我們嘗試同時啟動兩個worker.js,他們都會從隊列中受到消息,但是實際上呢?我們來看看
你需要打開第三個命令行,兩個來運行worker.js腳本,我們稱作C1,C2
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
在第三個命令行工具中,我們會發布新的任務,一旦你啟動消費者,你可以發布一些消息:
shell3$ ./new_task.js First message. shell3$ ./new_task.js Second message.. shell3$ ./new_task.js Third message... shell3$ ./new_task.js Fourth message.... shell3$ ./new_task.js Fifth message.....
讓我們看看什么被分發到我們的worker
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "First message." [x] Received "Third message..." [x] Received "Fifth message....."
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "Second message.." [x] Received "Fourth message...."
默認情況下,RabbitMQ會依次地把消息推送到下一個消費者,平均每個消費者會得到相同數量的消息。這樣的消息分發機制稱作輪詢。可以嘗試3個或更多的worker。
## 消息確認 (Message acknowledgment)
要完成一個任務需要一些事件,你可能會想,當一個消費者開始執行一個長的任務但只執行一部分就die了會發生什么。就我們當前的代碼,一旦RabbitMQ 分發了一條消息到消費者那邊,就會立即從存儲中移除這條消息。這樣的話,如果你殺掉了進程,我們將會丟失這條正在被處理的消息。
我們也同樣丟失了我們發送給這個進程的但還沒被處理的消息。
但是我們不想丟失任何的任務,如果一個進程掛掉,我們希望這個任務會被分發到其他的進程。
為了確保每一條消息絕不會丟失,RabbitMQ支持 消息確認,一個ack標志會從消費者那邊返回去通知RabbitMQ當前的這個消息已經收到并且已經完成,于是RabbitMQ就可以取刪掉這個任務了。
如果一個消費者掛了(通道被關閉,連接關閉,或者TCP連接丟失)而沒有發送ack標志,RabbitMQ會明白這條任務還沒被執行完,并會重新放回隊列中,如果當時有其他的消費者在線,這個消息會被快速地發送給其他的消費者。這樣的話你就可以保證沒有消息會遺失,即使進程只是偶爾會掛掉。
不管消息處理是否超時,RabbitMQ只會在消費者掛掉的時候重新分發消息。這對于那些要處理很久很久的消息也是好的(add:不會被判定為noack,而重新分發)
在之前的例子中,消息確認是被關閉的,是時候打開它了,使用{noAck: false}(你也可以移除這個操作選項)選項,當我們完成這個任務的時候發送一個正確的消息確認。
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false});
使用這樣的代碼,你可以確定即使在它還在處理消息的時候你使用CTRL+C 殺掉進程也不會有數據丟失。在進程掛了之后,所有的未被確認的消息會被重新分發。
## 忘記確認 這是一個普遍的錯誤,丟失ack。只是一個簡單的錯誤,但結果確實很嚴重的。當客戶端停止的時候,消息會被重新分發(像是被隨機分發),但是RabbitMQ會占用越來越多的內存當它不能取釋放掉任何未被確認的消息。 為了調試這種類型的錯誤,你可以使用`rabbitmqctl`來輸出未被確認的消息字段: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.消息持久化
我們學習了確保在進程掛掉仍保證任務不會被丟失,但我們的任務還是會在RabbitMQ服務停止的時候丟失。
當RabbitMQ 退出或者崩潰,除非你叫它不要丟失,不然隊列和消息都會丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。
首先,我們要讓RabbitMQ 不會丟失隊列,為此,我們要先聲明
ch.assertQueue("hello", {durable: true});
盡管這樣的操作是對的,但是在我們現在的配置中是不起作用的,這是因為我們已經定義了一個未持久化的叫做hello的隊列,RabbitMQ不允許你改變一個已經存在的隊列的參數,如果你這樣做,程序將會返回錯誤。
但是有一個快速的辦法 --- 讓我們定義一個新的隊列,叫做task_queue
ch.assertQueue("task_queue", {durable: true});
這個durable選項,需要消費者和生產者都去使用。
此時我們能保證task_queue隊列不會在RabbitMQ重啟的時候丟失,現在我們需要對消息進行持久化 --- 使用presistent的Channel.sendToQueue選項,
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
注意:消息持久化 消息持久化,不能完全地保證消息不會丟失,盡管它告訴RabbitMQ要把消息存到磁盤當中,總存在一個RabbitMQ接收到消息,但還未處理完的情況。另外,RabbitMQ并不是對每個消息做到幀同步,有可能只是被寫到緩存中,還沒被寫到磁盤。 消息持久化不能完全保證,但已經遠遠滿足我們的簡單的工作隊列的需求,如果你需要更強的持久化的保證,你可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。均衡調度(Fair dispatch)
你可能已經注意到現在的調度并不是我們想要的,例:在有兩個worker的情況下,當所有的奇數消息都是重的而偶數消息是輕量的,那會有一個worker會一直處于忙碌狀態,而另一個worker幾乎不工作,
RabbitMQ,并不知道這些情況,只知道持續地均勻地分發消息。
這樣發生的原因是RabbitMQ只是在消息進入隊列的時候進行分發的工作,不管消費者的未確認的消息的數量,只是一味地分發第N條消息給第N個消費者。
為了解決這樣的問題,我們使用方法prefetch,并設置值為1,表示RabbitMQ不會同時給一個worker超過一條消息,即,不會分發一條新的消息直到worker完成并且發送ack標志。否則,RabbitMQ會把消息發送給下一個不在忙碌狀態的worker.
ch.prefetch(1);
注意隊列的大小 如果所有的worker都處于忙碌狀態,你的隊列可以被填滿,你可能需要一個監控,或者添加更多的worker,或者有其他的解決方案。整合
最后的new_task.js的代碼:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
new_task.js source
worker.js的代碼:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; ch.assertQueue(q, {durable: true}); ch.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false}); }); });
worker.js source
使用消息確認和預處理,你可以建立一個工作隊列。持久化選項使得消息可以在RabbitMQ會重啟的情況下得以保留。
獲得更多的關于Channel的方法和消息的屬性,你可以瀏覽amqplib docs
翻譯:Joursion
日期 :2016/12/25
歡迎交流,學習。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/88111.html
摘要:生產者只能把消息發到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規則被定義為交換類型。有一點很關鍵,向不存在的交換器發布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。 發布與訂閱 (Publish/Subscribe) 在之前的章節中,我們創建了工作隊列,之前的工作隊列的假設是每個任務只被分發到一個worker。在這一節中,我們會做一些完全不一...
摘要:允許接收和轉發消息。一個等待接收消息的程序是一個消費者。發送者會先連接到發送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因為想入門第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:發布訂閱模式在之前的文章里,創建了。我們稱之為發布訂閱模式。其實我們是用到了默認的,用空字符串來標識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現在開始這個就會將推向我們的隊列了。 發布訂閱模式 在之前的文章里,創建了work queue。work queue中,每一個task都會派發給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:平均每個消費者將得到相同數量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個 Hello World 教程中,我們編寫了從指定隊列發送...
閱讀 3128·2023-04-25 15:02
閱讀 2838·2021-11-23 09:51
閱讀 2048·2021-09-27 13:47
閱讀 2004·2021-09-13 10:33
閱讀 991·2019-08-30 15:54
閱讀 2652·2019-08-30 15:53
閱讀 2869·2019-08-29 13:58
閱讀 902·2019-08-29 13:54