摘要:我們以前的教程中的日志系統將所有消息廣播給所有消費者。我們希望擴展這一點,允許基于其一定嚴重性程度來過濾消息。在這種情況下,交換機將表現為交換機,并將消息發送到所有匹配隊列。
using php-amqplib 前提必讀
本教程假設 RabbitMQ 是運行在標準端口上運行(5672).
如果您使用不同的主機、端口或憑據,則連接設置需要調整。
如果您在本教程中遇到困難,可以通過郵件列表與我們聯系。
在前面的教程中,我們構建了一個簡單的日志系統。我們能夠向許多接收者廣播日志消息。
開始在本教程中,我們將為它添加一個特性——我們將只可能訂閱消息的一個子集。例如,我們只能夠將關鍵錯誤消息直接指向日志文件(以節省磁盤空間),同時仍然能夠打印控制臺上的所有日志消息。
綁定(Bindings)在前面的示例中,我們已經創建綁定。您可能還記得代碼:
$channel->queue_bind($queue_name, "logs");
綁定是交換和隊列之間的一種關系。這可以簡單地理解為:隊列對來自此交換的消息感興趣。
綁定可以采取額外的routing_key參數。避免混淆和$channel::basic_publish參數我們要叫它綁定key。這就是我們如何用鍵創建綁定的原因:
$binding_key = "black"; $channel->queue_bind($queue_name, $exchange_name, $binding_key);
綁定鍵的含義取決于交換類型。我們以前使用的fanout交換將忽略了它的值。
Direct exchange我們以前的教程中的日志系統將所有消息廣播給所有消費者。我們希望擴展這一點,允許基于其一定嚴重性程度來過濾消息。例如,我們可能希望將日志消息寫入到磁盤的腳本只接收關鍵錯誤,而不會在警告或信息日志消息上浪費磁盤空間。
我們使用的是fanout交換機,這并不能給我們帶來很大的靈活性——它只能進行無意識的廣播。
我們將使用direct交換機替代。direct交換機背后的路由算法很簡單-消息傳遞到隊列,其綁定鍵完全匹配消息的路由鍵。
為了說明這一點,請考慮以下設置:
在這個設置中,我們可以看到兩個隊列綁定到它的direct交換機X。第一個隊列與綁定鍵orange綁定,第二個綁定有兩個綁定,一個綁定鍵black,另一個綁定green。
在這樣的設置中,將路由消息發送到Exchange的路由密鑰orange將被路由到隊列Q1。帶有black或green路由鍵的消息將轉到Q2。所有其他消息都將被丟棄。
多個綁定 (Multiple bindings)用相同的綁定鍵綁定多個隊列是完全合法的。在我們的示例中,我們可以在綁定綁定鍵X和Q1之間添加一個綁定。在這種情況下,direct交換機將表現為fanout交換機,并將消息發送到所有匹配隊列。將帶有路由鍵black的消息發送給Q1和Q2。
Emitting logs我們將使用這個模型作為我們的日志系統。我們將把消息發送給direct交換機,而不是fanout交換機。我們將提供日志嚴重性作為路由鍵。這樣,接收腳本將能夠選擇它想要接收的嚴重性。讓我們先專注于發布日志。
和以往一樣,我們需要首先創建一個交換:
$channel->exchange_declare("direct_logs", "direct", false, false, false);
我們已經準備好發送消息了:
$channel->exchange_declare("direct_logs", "direct", false, false, false); $channel->basic_publish($msg, "direct_logs", $severity);
為了簡化事情,我們會假設嚴重錯誤有可以是info, warning, error的一種。
訂閱 (Subscribing)接收消息將與前面的教程一樣,只有一個例外——我們將為我們感興趣的每個嚴重性創建一個新的綁定。
foreach($severities as $severity) { $channel->queue_bind($queue_name, "direct_logs", $severity); }
代碼都放在一起:
emit_log_direct.php源碼:
channel(); $channel->exchange_declare("direct_logs", "direct", false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : "info"; $data = implode(" ", array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, "direct_logs", $severity); echo " [x] Sent ",$severity,":",$data," "; $channel->close(); $connection->close(); ?>
receive_logs_direct.php源碼:
channel(); $channel->exchange_declare("direct_logs", "direct", false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if(empty($severities )) { file_put_contents("php://stderr", "Usage: $argv[0] [info] [warning] [error] "); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, "direct_logs", $severity); } echo " [*] Waiting for logs. To exit press CTRL+C", " "; $callback = function($msg){ echo " [x] ",$msg->delivery_info["routing_key"], ":", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
如果您只想保存warning 和 error(不包含info)日志消息到文件,只需打開控制臺并鍵入:
php receive_logs_direct.php warning error > logs_from_rabbit.log
如果您想查看屏幕上的所有日志消息,請打開一個新的終端并執行:
php receive_logs_direct.php info warning error # => [*] Waiting for logs. To exit press CTRL+C
例如,觸發錯誤日志消息:
php emit_log_direct.php error "Run. Run. Or it will explode." # => [x] Sent "error":"Run. Run. Or it will explode."
(全部源碼 emit_log_direct.php source and receive_logs_direct.php source)
學習如何基于模式偵聽消息, 你可以閱讀下一章節:RabbitMQ+PHP 教程五(Topics)。
翻譯來自 RabbitMQ - RabbitMQ tutorial - Routing
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28316.html
摘要:在客戶端中,當我們將隊列名稱作為空字符串提供時,我們創建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機生成的隊列名稱。交換和隊列之間的關系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創建了一個工作隊列。工作隊列背后的假設是每個任務都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費者發送消息。此模式稱為發布/訂閱。 ...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:前提必讀本教程假設是安裝在標準端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。 在哪里得到幫助...
摘要:為了避免與參數混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章節教程重點介紹的內容 在之前的教程中,我們構建了一個簡單的日志系統 我們能夠將日志消息廣播給許多接收...
閱讀 2032·2021-11-08 13:14
閱讀 2940·2021-10-18 13:34
閱讀 2029·2021-09-23 11:21
閱讀 3591·2019-08-30 15:54
閱讀 1760·2019-08-30 15:54
閱讀 2931·2019-08-29 15:33
閱讀 2581·2019-08-29 14:01
閱讀 1948·2019-08-29 13:52