摘要:為了避免與參數(shù)混淆,我們將其稱為綁定鍵。直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費者。在這種設置中,使用路由鍵發(fā)布到交換機的消息將被路由到隊列。所有其他消息將被丟棄。
源碼:https://github.com/ltoddy/rabbitmq-tutorial
路由 本章節(jié)教程重點介紹的內(nèi)容在之前的教程中,我們構建了一個簡單的日志系統(tǒng) 我們能夠?qū)⑷罩鞠V播給許多接收者。
在本教程中,我們將添加一個功能 - 我們將只能訂閱一部分消息。例如,我們只能將重要的錯誤消息引導到日志文件(以節(jié)省磁盤空間),同時仍然能夠在控制臺上打印所有日志消息。
綁定在前面的例子中,我們已經(jīng)創(chuàng)建了綁定。您可能會回想一下代碼:
channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)
綁定是交換和隊列之間的關系。這可以簡單地理解為: the queue is interested in messages from this exchange.
綁定可以使用額外的routing_key參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為綁定鍵。這就是我們?nèi)绾问褂靡粋€鍵創(chuàng)建一個綁定:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key="black")
綁定鍵的含義取決于交換類型。我們之前使用的 fanout 交換簡單地忽略了它的價值。
直接交換我們之前教程的日志記錄系統(tǒng)將所有消息廣播給所有消費者。我們希望將其擴展為允許根據(jù)其進行嚴格的過濾消息。
例如,我們可能希望將嚴重錯誤的日志消息寫入磁盤,而不會寫入警告或信息日志消息。
我們正在使用fanout交換,這不會給我們太多的靈活性 - 它只能無意識地播放。
我們將使用direct交換。direct交換背后的路由算法很簡單 - 消息進入隊列,其綁定密鑰與消息的路由密鑰完全匹配。
為了說明這一點,請考慮以下設置:
在這個設置中,我們可以看到有兩個隊列綁定的直接交換機X. 第一個隊列用綁定鍵orange綁定,第二個隊列有兩個綁定,一個綁定鍵為black,另一個為green。
在這種設置中,使用路由鍵orange發(fā)布到交換機的消息 將被路由到隊列Q1。帶有black或gree路由鍵的消息將進入Q2。所有其他消息將被丟棄。
多個綁定使用相同的綁定密鑰綁定多個隊列是完全合法的。在我們的例子中,我們可以使用綁定鍵black添加X和Q1之間的綁定。
在這種情況下,direct交換就像fanout一樣,并將消息廣播到所有匹配的隊列。帶有路由鍵black的消息將傳送到Q1和Q2。
我們將使用這個模型用于我們的日志系統(tǒng)。取而代之的fanout,我們將消息發(fā)送到direct交換。我們將提供嚴格的日志作為路由鍵(routing key)。
這樣接收腳本將能夠選擇想要接收的消息。我們先關注發(fā)出日志的實現(xiàn)。
像往常一樣,我們需要首先創(chuàng)建一個交換:
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
我們準備發(fā)送一條消息:
channel.basic_publish(exchange="direct_logs", routing_key="", body=message)
為了簡化事情,我們將假設“severity”可以是"info","warning","error"之一。
訂閱接收郵件的方式與上一個教程中的一樣,只有一個例外 - 我們將為每個我們感興趣的嚴重程度創(chuàng)建一個新綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity)把它放在一起
emit_log_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") severity = sys.args[1:] if len(sys.argv) > 2 else "info" message = " ".join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
receive_logs_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity) print(" [*] Waiting for logs. To exit press CTRL+C") def callback(cb, method, properities, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
如果只想保存"warning"和"error"(而不是"info")將消息記錄到文件中,只需打開一個控制臺并輸入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
如果您希望在屏幕上看到所有日志消息,請打開一個新終端并執(zhí)行以下操作:
python receive_logs_direct.py info warning error
例如,要輸出error日志消息,只需輸入:
python emit_log_direct.py error "Run. Run. Or it will explode."
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/41441.html
摘要:單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。消息將使用由三個字兩個點組成的路由鍵發(fā)送。另一方面,只會進入第一個隊列,而只會進入第二個隊列。不匹配任何綁定,因此將被丟棄。代碼幾乎與前一個教程中的代碼相同。 源碼:https://github.com/ltoddy/rabbitmq-tutorial Topics (using the Pika Python client)...
摘要:每當我們收到一條消息,這個回調(diào)函數(shù)就被皮卡庫調(diào)用。接下來,我們需要告訴這個特定的回調(diào)函數(shù)應該從我們的隊列接收消息為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。生產(chǎn)者計劃將在每次運行后停止歡呼我們能夠通過發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個消息代理:它接受和轉發(fā)消息。你...
摘要:我們將任務封裝為消息并將其發(fā)送到隊列。為了確保消息永不丟失,支持消息確認。沒有任何消息超時當消費者死亡時,將重新傳遞消息。發(fā)生這種情況是因為只在消息進入隊列時調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊列 showImg(https://segmentfault.com/img/r...
摘要:交易所在本教程的前幾部分中,我們發(fā)送消息并從隊列中接收消息。消費者是接收消息的用戶的應用程序。中的消息傳遞模型的核心思想是生產(chǎn)者永遠不會將任何消息直接發(fā)送到隊列中。交換和隊列之間的關系稱為綁定。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 發(fā)布 / 訂閱 (using the Pika Python client) 本章節(jié)教程重點介紹的...
摘要:通常用于命名回調(diào)隊列。對每個響應執(zhí)行的回調(diào)函數(shù)做了一個非常簡單的工作,對于每個響應消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數(shù)并保存回調(diào)函數(shù)將使用這個值來捕獲適當?shù)捻憫? 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
閱讀 2040·2023-04-25 14:50
閱讀 2919·2021-11-17 09:33
閱讀 2623·2019-08-30 13:07
閱讀 2849·2019-08-29 16:57
閱讀 916·2019-08-29 15:26
閱讀 3560·2019-08-29 13:08
閱讀 2003·2019-08-29 12:32
閱讀 3397·2019-08-26 13:57