摘要:平均每個消費者將得到相同數量的消息。消息確認完成任務可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認。沒有任何消息超時當這個消費者中止了,將會重新分配消息時。這是因為只是調度消息時,消息進入隊列。
介紹
在上一個 Hello World 教程中,我們編寫了從指定隊列發送和接收消息的程序。在這篇文章中,我們將創建一個工作隊列,用于在多個工人(消費者)之間分配耗時的任務。
工作隊列(又名任務隊列)背后的主要思想是避免立即執行資源密集型任務,必須等待它完成。相反,我們計劃稍后完成任務。我們將任務封裝為消息并將其發送到隊列中。后臺運行的一個工作進程將彈出任務并最終執行該任務。當你運行許多工人(消費者)時,任務將在他們之間分擔。
這個概念在Web應用程序中尤其有用,因為在短HTTP請求中不可能處理復雜的任務。
先決條件在本教程的前一部分,我們發送了一條包含“Hello World”的消息。現在,我們將發送支持復雜任務的字符串。我們沒有一個真實環境的任務,如圖像進行調整或PDF文件的渲染,讓我們利用sleep()模擬真實環境的業務功能。我們將字符串中的點數作為其復雜度;每個點都將占“工作”的一秒鐘。例如,由Hello...描述的一個偽任務…需要三秒。
new_task.php我們會稍微修改send.php代碼從我們先前的例子,允許任意的消息是從命令行發送。這一計劃將任務分配給我們的工作隊列,所以我們命名它 new_task.php:
$data = implode(" ", array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent ", $data, " ";
我們的上一個版本的receive.php腳本也需要一些改變:它需要假第二工作在消息體中每一點。它會從隊列彈出消息和執行任務,所以讓我們把命名worker.php:
$callback = function($msg){ echo " [x] Received ", $msg->body, " "; //根據"."數量個數獲取延遲時間,單位秒 sleep(substr_count($msg->body, ".")); //模擬業務執行時間延遲 echo " [x] Done", " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback);單worker簡單運行測試 消費者
php worker.php消息生產者
php new_task.php "A very hard task which takes two seconds.."循環調度
一個使用任務隊列的優點是容易并行工作的能力。如果我們積壓了大量的工作,我們可以增加更多的工人,這樣就可以輕松地規模化。
首先,讓我們嘗試同時運行兩worker.php腳本。他們都會從隊列中獲得消息,看看效果如何?讓我們看看。
你需要打開三個console命令。兩將運行worker.php腳本。這些控制臺將是我們的兩個消費者C1和C2。
消費者1php worker.php消費者2
php worker.php消息生產者
php new_task.php msg1...
默認情況下,RabbitMQ將會發送的每一條消息給下一個消費者,在序列。平均每個消費者將得到相同數量的消息。這種分發消息的方式稱為循環輪詢。試著用三個或更多的工人。
消息確認完成任務可能需要幾秒鐘。你可能遇到如果一個消費者開始一個長期的任務,并且只完成了部分任務,那么會發生什么?。我們目前的代碼,一旦RabbitMQ發送一個消息給客戶立即標記為刪除。在這種情況下,如果您中止一個消費者,我們將丟失它正在處理的消息。我們還將丟失發送給該消費者所有的尚未處理的消息。
如果我們不想失去任何任務。如果一個消費者意外中止了,我們希望把任務交給另一個消費者。
為了確保消息不會丟失,RabbitMQ支持消息確認。ACK(nowledgement)消費者返回的結果告訴RabbitMQ有一條消息收到,你可以自由可控的刪除他
如果一個消費者中止了(其通道關閉,連接被關閉,或TCP連接丟失)不發送ACK,RabbitMQ將會理解這個消息并沒有完全處理,將它重新加入隊列。如果有其他用戶同時在線,它就會快速地傳遞到另一個消費者。這樣,即使意外中止了,也可以確保沒有丟失信息。
沒有任何消息超時;當這個消費者中止了,RabbitMQ將會重新分配消息時。即使處理消息花費很長很長時間也很好。
消息確認是默認關閉。可通過設置的第四個參數basic_consume設置為false(true意味著沒有ACK)和從消費者發送合適的確認,一旦我們完成一個任務。
$callback = function($msg){ echo " [x] Received ", $msg->body, " "; sleep(substr_count($msg->body, ".")); echo " [x] Done", " "; $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; $channel->basic_consume("task_queue", "", false, false, false, false, $callback);
使用此代碼,我們可以確信,即使在處理消息時使用Ctrl + C殺死一名消費者,也不會丟失任何東西。消費者中止都未確認的消息后很快會被重新分配。
忘了確認(Forgotten acknowledgment)丟失ACK確認是一個常見的錯誤。這是一個容易犯的錯誤,但后果很嚴重。當你的客戶退出,消息會被重新分配(這可能看起來像是隨機的分配),RabbitMQ將會消耗更多的內存,它不會釋放任何延遲確認消息。
為了調試這種錯誤,你可以使用rabbitmqctl打印messages_unacknowledged字段:
rabbitmqctl list_queues name messages_ready messages_unacknowledged消息持久化(Message durability)
我們已經學會了如何確保即使消費者死了,任務也不會丟失。但是如果RabbitMQ服務器停止了,我們的任務仍然有可能會丟失。
當RabbitMQ退出或崩潰了,會丟失隊列和消息除非你不要。要確保消息不會丟失,需要兩件事:我們需要將隊列和消息都標記為持久的。
首先,我們需要確保RabbitMQ永遠不會丟失隊列。為了做到這一點,我們需要聲明它是持久的。為此我們通過queue_declare作為第三參數為true:
$channel->queue_declare("hello", false, true, false, false);
雖然這個命令本身是正確的,但它在我們當前的設置中不起作用。這是因為我們已經定義了一個名為hello的隊列,該隊列不持久。RabbitMQ不允許你重新定義現有隊列用不同的參數,將返回一個錯誤的任何程序,試圖這么做。但有一個快速的解決方法-讓我們聲明一個名稱不同的隊列,例如task_queue:
$channel->queue_declare("task_queue", false, true, false, false);
需要應用到生產者和消費者代碼中設置為true。
在這一點上,我們可以確保即使RabbitMQ重啟了,task_queue隊列不會丟失。現在我們要標記我們的消息持續通過設置delivery_mode = 2消息屬性,amqpmessage作為屬性數組的一部分。
$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );關于消息持久性的說明(Note on message persistence)
將消息標記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ保存信息到磁盤上,還有一個短的時間窗口時,RabbitMQ 已經接受信息并沒有保存它。另外,RabbitMQ不做fsync(2)每一個消息--它可能只是保存到緩存并沒有真正寫入到磁盤。持久性保證不強,但對于我們的簡單任務隊列來說已經足夠了。如果你需要更強的保證,那么你可以使用消費者確認。
公平調度您可能已經注意到,調度仍然不完全按照我們的要求工作。例如,在一個有兩個消費者的情況下,當所有的奇數信息都很重,甚至很輕的消息,一個消費者會一直忙,而另一個消費者幾乎不做任何工作。嗯,RabbitMQ不知道發生了什么事,仍將均勻消息發送。
這是因為RabbitMQ只是調度消息時,消息進入隊列。當存在未確認的消息時。它只是盲目的分發n-th條消息給n-th個消費者。
為了改變這個分配方式,我們可以調用basic_qos方法,設置參數prefetch_count = 1。這告訴RabbitMQ不要在一個時間給一個消費者多個消息。或者,換句話說,在處理和確認以前的消息之前,不要向消費者發送新消息。相反,它將發送給下一個仍然不忙的消費者。
$channel->basic_qos(null, 1, null);
關于隊列大小的注釋(Note about queue size)源碼 new_task.php如果所有的消費者都很忙,你的隊列填滿了。你會想留意到這一點,也許增加更多的工人,或者有其他的策略。
channel(); $channel->queue_declare("task_queue", false, true, false, false); $data = implode(" ", array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, "", "task_queue"); echo " [x] Sent ", $data, " "; $channel->close(); $connection->close(); ?>worker.php
channel(); $channel->queue_declare("task_queue", false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg){ echo " [x] Received ", $msg->body, " "; sleep(substr_count($msg->body, ".")); echo " [x] Done", " "; $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; $channel->basic_qos(null, 1, null); $channel->basic_consume("task_queue", "", false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
使用消息的確認和預取,你可以設置一個工作隊列。耐久性的配置選項讓任務存在,即使RabbitMQ重啟。
學習如何向許多消費者傳遞同樣的信息, 你可以閱讀下一章節:RabbitMQ+PHP 教程三(Publish/Subscribe)。
翻譯來自 RabbitMQ - RabbitMQ tutorial - Work Queues
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/26023.html
摘要:在中間的框是一個隊列的消息緩沖區,保持代表的消費。本教程介紹,這是一個開放的通用的協議消息。我們將在本教程中使用,解決依賴管理。發送者將連接到,發送一條消息,然后退出。注意,這與發送發布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉發消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:參考文檔依賴包安裝環境配置環境變量增加內容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執行看是否能打開,用退出,注意后面的點號,那是的結束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...
摘要:在客戶端中,當我們將隊列名稱作為空字符串提供時,我們創建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機生成的隊列名稱。交換和隊列之間的關系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創建了一個工作隊列。工作隊列背后的假設是每個任務都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者發送消息。此模式稱為發布/訂閱。 ...
摘要:這樣的消息分發機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經聲明的隊列中收發...
閱讀 968·2023-04-26 02:49
閱讀 1180·2021-11-25 09:43
閱讀 2551·2021-11-18 10:02
閱讀 2926·2021-10-18 13:32
閱讀 1288·2019-08-30 13:54
閱讀 2084·2019-08-30 12:58
閱讀 3017·2019-08-29 14:06
閱讀 2159·2019-08-28 18:10