国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RabbitMQ AMQP 消息模型攻略

xietao3 / 1354人閱讀

摘要:近期對消息隊列比較感興趣因此特意看了一下相關的知識不過在學時對的消息模型總是理解的不透徹于是在官網上找了一篇介紹消息模型的文章詳細地看了一下還是要感嘆一下啊官網的文章果然是最權威的看了以后有了不小的收獲下面是我學習消息模型時的記錄其內容大部

近期對消息隊列比較感興趣, 因此特意看了一下 RabbitMQ 相關的知識, 不過在學 RabbitMQ 時, 對 AMQP 的消息模型總是理解的不透徹, 于是在官網上找了一篇介紹 AMQP 消息模型的文章, 詳細地看了一下.
還是要感嘆一下啊, 官網的文章果然是最權威的, 看了以后有了不小的收獲.
下面是我學習 AMQP 消息模型時的記錄, 其內容大部分是翻譯自官網, 部分添加了自己的理解.

原文

https://www.rabbitmq.com/tuto...

AMQP 消息模型簡介

AMQP 的消息模型如下圖所示:

通過此圖我們可以知道, 一個消息的發送流程有如下幾個步驟:

消息生產者將消息發布(Public)到 Exchange 中.

Exchange 根據隊列的綁定關系將消息分發到不同的 Queue 中.

AMQP broker 根據訂閱規則將消息發送給消費者 或 消費者自行根據需要從消息隊列中獲取消息.

Exchange 和 Exchange 類型

Exchange 的主要任務是接收消息并將消息路由到0個或多個 Queue 中, 而路由的算法受 Exchange 類型和綁定(binding) 關系的影響. AMQP 0-9-1 broker 提供如下四個 exchange 類型:

類型 默認預定義的名字
Direct Exchange 空字符串和 amq.direct
Fanout Exchange amq.fanout
Topic Exchange amq.topic
Headers Exchange amq.match(在 RabbitMQ 中, 額外提供amq.headers)

每個 Exchange 都有如下幾個屬性:

Name, Exchange 的 名字

Durability, 是否是持久的 Exchange, 當為真時, broker 重啟后也會保留此 Exchange

Auto-delete, 當為真時, 如果所有綁定的的 Queue 都不再使用時, 此 Exchange 會自動刪除

關于默認 Exchange

默認的 exchange 是一個由 broker 預創建的匿名的(即名字是空字符串) direct exchagne. 對于簡單的程序來說, 默認的 exchange 有一個實用的屬性: 如果沒有顯示地綁定 Exchnge, 那么創建的每個 queue 都會自動綁定到這個默認的 exchagne 中, 并且此時這個 queue 的 route key 就是這個queue 的名字.

例如當我們聲明了一個名為 "search-indexing-online" 的 queue, 那么 AMQP broker 會以 "search-indexing-online" 作為 route key 將此 queue 綁定到默認的 exchange 中. 因此當一個消息以 route key 為 "search-indexing-online" 投遞到默認的 exchange 中時, 此消息就會被路由到這個 queue 中去. 換句話說, 由于有默認的 exchagne 的存在, 我們就好像可以直接將消息投遞到指定的 queue 中去而不需要經過 exchange 一樣.
例如:
Send:

public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + message + """);
        
        channel.close();
        connection.close();
    }
}

Recv:

public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received "" + message + """);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

在這個例子中, 我們并沒有定義 exchange, 也沒有顯示地將 queue 綁定到 exchange 中, 因此 queue "hello" 就自動綁定到默認的 exchange 中了, 并且在默認的 exchange 中, 其 route key 和 queue 名一致, 即 "hello".
由于這個原因, 我們就可以使用:

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

來發送消息. 調用 channel.basicPublish 時, 第一個參數是 exchange 名, 為空就是默認的 exchange, 第二個參數是 route key, 和 queue 名相同.

direct exchange

direct exchange 可以使用如下圖表示:

direct exchange 根據消息的 route key 來將消息分發到不同的 queue 中. direct exchange 適合用于消息的單播發送. direct exchange 的工作流程如下:

一個 queue 使用 K 作為 route key 綁定到 direct exchange 中.

當direct exchange 收到一個 route key 為 R 的消息時, 如果 R == K, 則此 exchange 會將此消息路由到此 queue 中.

direct exchange 經常用于在多個 worker 中分配任務(即一個 Master 和多個相同的 Slave). 當使用這個模型時, 需要注意的是:

When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.

即 AMQP 0-9-1 的負載均衡是以consumer為單位的, 而不是以 queue 為單位.

fanout exchange

一個 fanout exchange 會將消息分發給所有綁定到此 exchange 的 queue 中, 而不會考慮 queue 的 route key. 即如果有 N 個 Queue 綁定到一個 fanout exchange 時, 那么當此 exchange 收到消息時, 會將此消息分發到這 N 個 queue 中. 由于此性質, fanout exchange 也常用消息的廣播(broadcast).
fanout 可以使用下圖表示:

topic exchange

topic exchange 會根據 route key 將消息分發到與此消息的 route key 相匹配的并且綁定到此 exchagne 中的 queue 中(如果有多個 queue 使用了相同的 route key 綁定到此 exchange, 那么這些 queue 都會收到消息). 根據此性質, topic exchange 經常用于實現 publish/subscribe 模型, 即消息的多播模型.

header exchange

header exchange 不使用 route key 作為路由的依據, 而是使用消息頭屬性來路由消息.

Queue

AMQP 中的 隊列 的概念和其他消息隊列中 隊列 的概念類似, 它有如下幾個重要的概念:

Name, 名字

Durable, 是否是持久的. 當為真時, 即使 broker 重啟時, 此 queue 也不會被刪除.

Exclusive, 是否是獨占的, 當為真時, 表示此 queue 只能有一個消費者, 并且當此消費者的連接斷開時, 此 queue 會被刪除.

Auto-delete, 當為真時, 此 隊列 會在最后一個消費者取消訂閱時被刪除.

在使用一個 隊列 時, 需要先進行聲明. 如果我們聲明的隊列不存在, 那么 broker 就會自動創建它. 不過如果此隊列已經存在時, 我們就需要注意了, 若我們聲明的隊列的屬性和已存在的隊列的屬性一致, 則不會有任何的問題, 但是如果先后兩次聲明的隊列的屬性不一致, 則會有 PRECONDITION_FAILED 錯誤(錯誤碼為406).

關于隊列名

AMQP 的隊列名不能以 "amq." 開頭, 因為這樣的隊列名是 AMQP broker 內部所使用的. 當我們使用了這樣的隊列名時, 那么會有一個 ACCESS_REFUSED 錯誤(錯誤碼為 403)

關于持久隊列

持久隊列會被持久化到磁盤中, 因此即使 broker 重啟了, 持久隊列也依然存在.
不過需要注意的是, 不要將持久隊列和消息的持久化混淆. 當 broker 重啟時, 持久隊列會自動重新聲明, 然而只有隊列中的持久化消息(persistent message)才會被恢復.

隊列的綁定

隊列的綁定關系是 exchagne 用于消息路由的規則, 即一個 exchange 能夠將消息路由到某個隊列的前提是此隊列已經綁定到這個 exchange 中了. 當隊列綁定到一個 exchange 中時, 我們還可以設置一個額外的參數, 即 route key, 這個 key 會被 direct exchange 和 topic exchange 作為額外的路由信息而使用, 換句話說, route key 扮演著過濾器的角色.
當一個消息沒有被路由到任意的隊列時(例如此 exchange 沒有任何的 queue 綁定著), 那么此時會根據消息的屬性來決定是將此消息丟棄還是返回給生產者.

消費者

AMQP 0-9-1 支持兩種消息分發模式:

push 模式, 即 broker 主動推送消息給消費者

pull 模式, 即消費者主動從 broker 中拉取消息.

在 push 模式時, 應用程序需要告知 broker 它對哪些消息感興趣, 即也就是我們所說的訂閱一個消息主題. 每個消費者都有一個惟一的標識符, 即consumer tag, 我們可以用這個 tag 來取消一個消費者對某個主題的訂閱(unsubscribe).

消息的 ACK

AMQP 0-9-1 有兩種消息 ACK 模式:

自動 ACK 模式

手動 ACK 模式

在自動 ACK 模式下, 當 broker 發送消息成功后, 會立即將此消息從消息隊列中刪除, 而不會等待消費者的 ACK 回復. 而在手動 ACK 模式下, 當 broker 發送消息給消費者時, 不會立即將此消息刪除, 而是需要等待消費者的 ACK 回復后才會刪除消息. 因此在手動 ACK 模式下, 當消費者收到消息并處理完成后, 需要向 broker 顯示地發送 ACK 指令.
在手動 ACK 模式下, 如果消費者因為意外的 crash 而沒有發送 ACK 給 broker, 那么此時 broker 會將此消息轉發給其他的消費者(如果此時沒有消費者了, 那么 broker 會緩存此消息, 直到有新的消費者注冊).

拒絕消息

當一個消費者處理消息失敗或此時不能處理消息時, 那么可以給 broker 發送一個拒絕消息的指令, 并且可以要求 broker 丟棄或重新分發此消息.
不過需要注意的是, 如果此時只有一個消費者, 那么當此消費者拒收消息并要求 broker 重新分發此消息時, 那么就會造成了此消息不斷地分發和拒收, 形成了死循環.

預取消息

通過預取消息機制, 消費者可以一次性批量取出消息, 然后在處理后對這些批量消息進行統一的 ACK 回復, 這樣可以提高消息的吞吐量.
不過, 需要注意的時, RabbitMQ 僅支持 channel 級別的預取消息的數量配置, 不支持基于連接的預取消息數量配置.

連接

AMQP 的連接是長連接, 它是一個使用 TCP 作為可靠傳輸的應用層協議.

通道(Channel)

AMQP 不推薦一個應用程序發起多個對 broker 的連接, 因為這樣會消耗系統資源并且也不利于防火墻的配置. 但是如果應用程序確實需要有多個不互相干擾的連接來進行不同的操作時該怎么辦呢? 為了解決這個問題, AMQP 引入了 Channel 的 概念. 在 AMQP 0-9-1 中, 一個與 broker 的連接是被多個 Channel 復用的, 因此我們可以將 channel 理解為: 一個共享同一個 TCP 連接的輕量級的連接.

基于同一個 TCP 連接的兩個不同的 channel 直接是不會有任何的干擾的(在邏輯上可以等效地理解為兩個獨立的連接), 因此客戶端和 broker 之間交互時, 需要附帶上 channel id.
通常來說, 在一個多線程消費消息的模型中, 每個線程多帶帶打開一個 channel 是一個推薦的做法, 而最好不要在各個線程中共享一個 channel.

Virtual host

為了在一個 broker 中實現不同的相互隔離的環境(例如每個環境中有不同的用戶, 不同的 exchange, 不同的隊列等), AMQP 引入了一個叫做 virtual host(vhost) 的概念. 在連接 broker 時, 客戶端可以指定需要使用哪個 vhost.

本文由 yongshun 發表于個人博客, 采用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題為: RabbitMQ AMQP 消息模型攻略
本文鏈接為: https://segmentfault.com/a/1190000007123977

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66128.html

相關文章

  • 消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!

    摘要:后續介紹交換機,生產者直接將消息投遞到中。消息,服務器和應用程序之間傳送的數據,由和組成。也稱為消息隊列,保存消息并將它們轉發給消費者。主要是應為和有一個綁定的關系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...

    sihai 評論0 收藏0
  • 新手也能看懂,消息隊列其實很簡單

    摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發產生的事務消息存儲在消息隊列中,從而削平高峰期的并發事務。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數接近 16k)。地址:https://github.com/Snailclimb... 本文內容思維導圖:showImg(ht...

    Clect 評論0 收藏0
  • 轉: RabbitMQ與PHP(一)

    摘要:需要特別明確的概念交換機的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個不同的隊列。后端處理,可以針對每一個啟動一個或多個,以提高消息處理的實時性。 RabbitMQ與PHP(一) 項目中使用RabbitMQ作為隊列處理用戶消息通知,消息由前端PHP代碼產生,處理消息使用Python,這就導致代碼一致性問題,調...

    wpw 評論0 收藏0
  • 慕課網_《RabbitMQ消息中間件極速入門與實戰》學習總結

    摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。 慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • RabbitMQ學習筆記

    摘要:消息持久化控制的屬性就是消息的持久化。當生產者發送的消息路由鍵為時,兩個消費者都會收到消息并處理當生產者發送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認機制在中,可以通過持久化數據解決服務器異常的數據丟失問題。 一、內容大綱&使用場景 1. 消息隊列解決了什么問題? 異步處理 應用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...

    zacklee 評論0 收藏0

發表評論

0條評論

xietao3

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<