摘要:基礎教程注本文是對眾多博客的學習和總結,可能存在理解錯誤。消息的應答現在存在這樣一種場景,消費者取到消息,然后創建任務開始執行。如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發送給該隊列的其他消費者。造成了負載不均衡。
RabbitMQ 基礎教程(2) - Work Queue
注:本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。
如果你喜歡我的文章,可以關注我的私人博客:http://blog-qeesung.rhcloud.com/
在上一篇文章 RabbitMQ 基礎教程(1) - Hello World 中,我們已經簡單的介紹了RabbitMQ以及如何發送和接收一個消息。接下來我們將繼續深入RabbitMQ,研究一下消息隊列(Work Queue)
消息隊列消息的發布者發布一個消息到消息隊列中,然后信息的消費者取出消息進行消費。
queue +-------------+ +--+--+--+--+--+--+ +-------------+ | producer |----->|m1|m2| ... | | |---->| consumer | +-------------+ +--+--+--+--+--+--+ +-------------+
但是實際情況往往比這個要復雜,假如我們有多個信息的發布者和多個信息的消費者,那RabbitMQ又將會是怎么工作呢?
+--------------+ +--------------+ | producer1 +- / | consumer1 | +--------------+ - queue /- +--------------+ +--------------+ - +---+---+---+----+ /- +--------------+ | producer2 +---->X|m1 |m2 |m3 |... |---->| consumer2 | +--------------+ /- +---+---+---+----+ - +--------------+ +--------------+ /- - +--------------+ | ... |/ | ... | +--------------+ +--------------+Round-robin 分發算法
RabbitMQ中,如果有多個消費者同時消費同一個消息隊列,那么就通過Round-robin算法將消息隊列中的消息均勻的分配給每一個消費者。
這個算法其實很簡單,每收到一個新的消息,就將這個消息分發給上下一個消費者。比如上一個消費者是consumer-n,那么有新消息來的時候就將這個新的消息發布到consumer-n+1,以此類推,如果到了最后一個消費者,那么就又從第一個開始。即:consumer-index = (consumer-index + 1) mod consumer-number
為了演示,首先來做幾項準備工作。
定義任務 task.js
/** * 創建一個任務 * @param taskName 任務名字 * @param costTime 任務話費的時間 * @param callback 任務結束以后的回調函數 * @constructor */ function Task(taskName ,costTime , callback){ if(typeof(costTime) !== "number") costTime = 0; // no delay there setTimeout(function () { console.log(taskName+" finished"); if(callback && typeof (callback) === "function") callback(); } , 1000*costTime); };
串行化的消息任務結構
任務發布者負責將該結構發布到隊列中,然后消費者取出消息,新建任務開始執行。
{ taskName : "taskname", costTime : 1 }
創建任務消息 task-producer.js
var amqp = require("amqplib/callback_api"); // 連接上RabbitMQ服務器 amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; // 得到發送消息的數目,默認發送4個 var name; var cost; (function () { if(process.argv.length < 4 ) { console.error("ERROR : usage - node rabbit-producer"); process.exit(-1); } name = process.argv[2]; cost = +process.argv[3]; })(); // 新建隊列,然后將隊列中的消息持久化取消 ch.assertQueue(q, {durable: true}); // 將任務串行化存入Buffer中,并推入隊列 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true}); console.log(" [x] Sent "+name); setTimeout(function () { process.exit(0); },500); }); });
消費任務消息 task-consumer.js
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監聽隊列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime); }, {noAck: true}); }); });
現在開啟兩個消費者進程來等待消費tasks隊列中的消息
# shell1 node task-consumer.js # shell2 node task-consumer.js
然后向隊列中推入三個消息
# shell3 node task-producer.js task1 0 node task-producer.js task2 0 node task-producer.js task3 0
運行結果
# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 已經通過Round-robin算法將消息隊列中的消息分配到連接的消費者中了.消息,隊列持久化
細心的讀者可能已經發現了我們在聲明隊列和發送消息的代碼塊中改動了一小部分的代碼,那就是
// 聲明隊列 ch.assertQueue(q, {durable: true}); // 發送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
通過將隊列的durable配置參數生命為true可以保證在RabbitMQ服務器退出或者異常終止的情況下不會丟失消息隊列,注意這里只是不會丟失消息隊列,并不是消息隊列中沒有被消費的消息不會丟失。
為了保證消息隊列中的消息不會丟失,就需要在發送消息時指定persistent選項,這里并不能百分之百的保證消息不會丟失,因為從隊列中有新的消息,到將隊列中消息持久化到磁盤這一段時間之內是無法保證的。
消息的應答現在存在這樣一種場景,消費者取到消息,然后創建任務開始執行。但是任務執行到一半就拋出異常,那么這個任務算是沒有被成功執行的。
在我們之前的代碼實現中,都是消息隊列中有新的消息,馬上就這個消息分配給消費者消費,不管消費者對消息處理結果如何,消息隊列會馬上將已經分配的消息從消息隊列中刪除。如果這個任務非常重要,或者一定要執行成功,那么一旦任務在執行過程中拋出異常,那么這個任務就再也找不回來了,這是非常可怕的事情。
還好在RabbitMQ中我們可以為已經分配的消息和消息隊列之間創建一個應答關系:
如果消息處理成功,那么就發送一個答復給消息隊列,告訴它:我已經成功處理消息,不再需要這條消息了,你可以刪除了,于是消息隊列就將已經應答的消息從消息隊列中刪除。
如果處理失敗,也就是沒有收到應答,那么就將這條消息重新發送給該隊列的其他消費者。
要在消費者和消息隊列之間建立這種應答關系我們只需要將channel的consume函數的noAck參數設成false就可以了。
ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime); }, {noAck: false}); // 這里設置成false
下面我們就模擬一下消息處理失敗的場景:
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監聽隊列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務 new Task(obj.taskName,obj.costTime,function(){ if(obj.taskName === "task2") throw new Error("Test error"); else ch.ack(msg); }); // 如果是任務二,那么就拋出異常。 }, {noAck: false}); }); });
按照上面的腳本執行順序,我們在執行一遍腳本: consumer2得到執行task2消息,然后馬上拋出異常退出進行,然后消息隊列再將這個消息分配給cosumer1,接著也執行失敗了,退出進程,最終消息隊列中將只會有一個task2的消息存在。
啟動消費者等待消息
# shell1 開啟消費者1 node rabbit-consumer.js # shell2 開啟消費者2 node rabbit-consumer.js
創建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 10 node rabbit-producer.js task3 0
我們能來看一下結果:
# shell2 消費者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 消費者2執行任務2的時候拋出異常,task2將會重新發送給消費者1 ... throw new Error("Error test"); # shell1 消費者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished Get the task task2 # 消費者1接收到任何2 task2 finished ... throw new Error("Error test"); # 也拋出異常了
最終會在消息隊列中剩下一條未消費的信息。
更加均衡的負載這里有一點需要注意,如果你將noAck選項設置成了false,那么如果消息處理成功,一定要進行應答,負責消息隊列中的消息會越來越多,直到撐爆內存。
在上文中我們聽到過消息隊列通過Round-robin算法來將消息分配給消費者,但是這個分配過程是盲目的。比如現在有兩個消費者,consumer1和consumer2,按照Round-robin算法就會將奇數編號的任務發配給consumer1,將偶數編號的任務分配給consumer2,但是這些任務恰好有一個特性,奇數編號的任務比較繁重,而偶數編號的任務就比較簡單。
那么這就會造成一個問題,那就是consumer1會被累死,而consumer2會被閑死。造成了負載不均衡。要是每一個消息都被成功消費以后告訴消息隊列,然后消息隊列再將新的消息分配給空閑下來的消費者不就好了。
RabbitMQ中的確有這樣的一個配置選項。那就是ch.prefetch(1);
我們現在就來模擬一下
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監聽隊列上面的消息 ch.prefetch(1); // 添加這一行 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); new Task(obj.taskName,obj.costTime ,function () { ch.ack(msg); }); }, {noAck: false}); }); });
啟動消費者等待消息
# shell1 開啟消費者1 node rabbit-consumer.js # shell2 開啟消費者2 node rabbit-consumer.js
創建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 20 node rabbit-producer.js task3 0 node rabbit-producer.js task4 20
# shell1 開啟消費者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任務馬上結束 task1 finished Get the task task3 # 任務馬上結束 task3 finished Get the task task4 # 任務四被分配到consumer1中了 task4 finished # shell2 開啟消費者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/86315.html
摘要:平均每個消費者將得到相同數量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個 Hello World 教程中,我們編寫了從指定隊列發送...
摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:在中間的框是一個隊列的消息緩沖區,保持代表的消費。本教程介紹,這是一個開放的通用的協議消息。我們將在本教程中使用,解決依賴管理。發送者將連接到,發送一條消息,然后退出。注意,這與發送發布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
摘要:這樣的消息分發機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經聲明的隊列中收發...
摘要:消息持久化控制的屬性就是消息的持久化。當生產者發送的消息路由鍵為時,兩個消費者都會收到消息并處理當生產者發送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認機制在中,可以通過持久化數據解決服務器異常的數據丟失問題。 一、內容大綱&使用場景 1. 消息隊列解決了什么問題? 異步處理 應用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...
閱讀 3152·2021-10-08 10:04
閱讀 1089·2021-09-30 09:48
閱讀 3459·2021-09-22 10:53
閱讀 1680·2021-09-10 11:22
閱讀 1694·2021-09-06 15:00
閱讀 2152·2019-08-30 15:56
閱讀 716·2019-08-30 15:53
閱讀 2285·2019-08-30 13:04