摘要:消息確認為,會等待的顯式確認。在消息發(fā)送到之后會立刻路由到中,因此未持久化的在重啟后會丟失元數(shù)據(jù)以及綁定,對和消息的持久化無影響。指定如果一個或者有多個的情況下,只有最大的那個才會生效。要求集群中至少要有一個磁盤節(jié)點,儲存了所有的元數(shù)據(jù)。
Connection & Channel
Connection 代表一個 TCP 連接,Channel 是建立在 Connection 上的虛擬連接。RabbitMQ 每條指令都是通過 Channel 完成的。
對于 OS 而言,創(chuàng)建和銷毀 TCP 連接的代價非常高,在高峰期很容易遇到瓶頸。程序中一般會有多個線程需要與 RabbitMQ建立通信,消費或生產(chǎn)消息,通過 TCP 連接復用來減少性能開銷。
Connection 可以創(chuàng)建多個 Channel ,但是 Channel 不是線程安全的所以不能在線程間共享。
Connection 在創(chuàng)建時可以傳入一個 ExecutorService ,這個線程池時給該 Connection 上的 Consumer 用的。
Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在發(fā)送消息時頻繁調用會產(chǎn)生競爭。我們可以認為在 createChannel 方法后 Channel 以及處于開啟狀態(tài)。若在使用過程中 Channel 關閉了,那么只要捕獲拋出的 ShutDownSignalException 就可以了,同時建議捕獲 IOException 以及 SocketException 防止連接意外關閉。
Exchange & Queue消費者和生產(chǎn)者都可以聲明一個已經(jīng)存在的 Exchange 或者 Queue ,前提是參數(shù)完全匹配現(xiàn)有的 Exchange 或者 Queue,否則會拋出異常。
QueueDeclare 參數(shù):
exclusive: 排他隊列,只有同一個 Connection 的 Channel 可以訪問,且在 Connection 關閉或者客戶端退出后自動刪除,即使 durable 為 true 。
queuePurge(String queue):清空隊列
Exchange 可以綁定另一個 Exchange:exchangeBind(String destination, String source, String routeKey), 從 source 到 destination
若業(yè)務允許,則最好預先創(chuàng)建好 Exchange 以及 Queue 并進行綁定(rabbitmqadmin),防止 Exchange 沒有綁定 Queue 或 綁定錯誤的 Queue 而導致消息丟失(關鍵信息應當使用 mandatory 參數(shù))。
Alternate Exchange: 在 Channel.exchangeDeclare 時添加 alternate-exchange 參數(shù)或在 Policy 中聲明。mandatory 為 true 時,未被路由的消息會被發(fā)送到 Alternate Exchange 。建議 Exchange Type 設置為 fanout ,否則當 RoutingKey 依然不匹配就會被返回 Producer。
P.S. 有些書上講備份交換器和 mandatory 參數(shù)一起使用 mandatory 參數(shù)失效是錯的,當 RoutingKey 不匹配 Alternate Exchange 依然會被返回 Producer 。
(rabbitmq v3.7 測試)
MapPublish & Consume Publish Confirmarg = new HashMap () {{ put("alternate-exchange", "alt"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg); channel.exchangeDeclare("alt", "fanout", true, false, null); channel.queueDeclare("normalQueue", true, false, false, null); channel.queueDeclare("notSend", true, false, false, null); channel.queueBind("normalQueue", "normalExchange", "key"); channel.queueBind("notSend", "alt", "");
消息發(fā)送到服務器后可能還沒來的及刷到磁盤中,服務器就掛掉,從而造成消息丟失。 Publish Confirm 能夠在消息確實到達服務器(開啟持久化的消息會在刷入磁盤之后)之后返回一個確認給 Publisher。
通過 channel.confirmSelected 把 Channel 設置為 Confirm 模式,并為 Channel 添加一個 ConfirmLister 來監(jiān)聽返回的確認。
SortedSetunconfirmedSet = new TreeSet<>(); channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { System.out.println("handleAck: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }, (deliveryTag, multiple) -> { System.out.println("handleNack: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }); while (true) { long seq = channel.getNextPublishSeqNo(); channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8)); unconfirmedSet.add(seq); Thread.sleep(1000); }
除了異步處理的方式之外還有批量確認以及事務的方法。批量確認的速度在大量連續(xù)發(fā)送的情況下和異步的方法差不多。不管怎樣這兩種消息確認的方法都要比事務的方式快7倍左右。
Consumer一般應當實現(xiàn) Consumer 接口或者繼承 DefaultConsumer ,Consumer 通過 consumerTag 來進行區(qū)分。
消費消息有兩種方式,一種是 Push ,一種是 Get。
Push 是由 RabbitMQ 以輪詢的方式將消息推送到 Consumer ,方法為 basicConsume 。一般一個 Channel 對應一個 Consumer 。
Get 由客戶端主動從 RabbitMQ 拉取一條消息,方法為 basicGet 。__不能循環(huán)執(zhí)行 basicGet 來代替 basicConsume ,不然會嚴重影響性能。__
消息確認:autoAck 為 false ,RabbitMQ 會等待 basicAck 的顯式確認。除非 Consumer 連接斷開否則一直等待確認。當 Consumer 顯式調用 basicReject 或者 basicNack 并將 requeue 設為 true 后會將消息重新入隊投遞。一般我們在業(yè)務處理完之后再 ack .
mandatory : 當 Exchange 無法匹配 Queue 或 Exchange 時,mandatory 為 true 的消息會被返回給 Producer,否則會被丟棄。 通過 Channel.addReturnListener 來添加 ReturnListener 監(jiān)視器。
queueDeclare 時添加 x-message-ttl 參數(shù),單位毫秒。
Maparg = new HashMap () {{ put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
使用 AMQP.BasicProperties.Builder 創(chuàng)建 AMQP.BasicProperties 并設置 expiration 參數(shù)。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));Dead Letter Exchange (DLX)
Dead Letter(死信):
Basic.Reject / Basic.Nack 并且 requeue 為 true
消息 TTL 過期
隊列達到最大長度
當消息成為 Dead Letter 之后, RabbitMQ 會自動把這個消息發(fā)到 DLX 上。
// 當發(fā)送到 normalQueue 中的消息成為 Dead Letter 之后會自動以 // dead-letter 為 routingKey 發(fā)送到 dlxQueue Exchange Maparg = new HashMap () {{ put("x-dead-letter-exchange", "dlx"); put("x-dead-letter-routing-key", "dead-letter"); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("dlx", "direct", true, false, false, null); channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其他用法:延遲隊列,消息 發(fā)送到一個暫存的、沒有 Consumer 的 Queue 并設置 TTL,Consumer 消費 DLX 綁定的 Queue 的消息,建議給暫存的 Queue 設置一個最大的 TTL,防止消息沒有設置 TTL 而一直堆積在 Queue 中。
Priority消息的消費可以有優(yōu)先級,Queue 的最大優(yōu)先級可以通過 x-max-priority 進行設置。
MapDurabilityarg = new HashMap () {{ put("x-max-priority", 5); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("normalExchange", "direct", true, false, null); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(2); channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Exchange , Queue , 消息都可以進行持久化。在消息發(fā)送到 Exchange 之后會立刻路由到 Queue 中,因此未持久化的 Exchange 在重啟后會丟失 Exchange 元數(shù)據(jù)以及綁定,對 Queue 和消息的持久化無影響。
未持久化的 Queue 在重啟后會丟失,包括 Queue 中的消息,不管消息是否設置了持久化。
未持久化的消息在重啟后會丟失,即使所在的 Queue 已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化 channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化 channel.queueBind("normalQueue", "normalExchange", "key"); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 消息持久化 channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));Qos
Qos 的作用時負載均衡。當一個隊列有兩個 Consumer ,一個性能很好 A,另一個不那么好 B,RabbitMQ 會輪詢,將消息平均地分給這兩個 Consumer。可見 B 上的堆積的消息會越來越多,而 A 上的線程可能會空閑。 Qos 的作用就是防止一個 Consumer 堆積了過多的消息,把這些消息分給其他 Consumer。
global 參數(shù):
channel.basicQos(3, false); // each Consumer limit 3 channel.basicQos(5, true); // this channel limit to 5
global 參數(shù)會讓 RabbitMQ 調用更多資源,盡量不要設置(默認值為 false)。
RelibilityRabbitMQ 支持最少一次和最多一次。
最少一次:
- 啟用 Publisher Confirm 或者 事務保證消息能夠到達服務器。 - 啟用 mandatory 參數(shù)保證消息不回被 Exchange 丟掉。 - 消息和 Queue 開啟持久化。 - Consumer autoAck off, 并確保消息在處理完之后再 ackPolicy
Policy 可以很方便的批量設置 Exchange 以及 Queue 的屬性,但是 Policy 的優(yōu)先級較低,請注意。
Policy 可以通過 HTTP API, web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
vhost : 指定 vhost
proiority : 如果一個 Queue 或者 Exchange 有多個 Policy 的情況下,只有 priority 最大的那個 Policy 才會生效。
apply-to : 應用到
Exchange and Queue
Exchange
Queue
name : Policy 的名字
pattern : Exchange 或者 Queue 名字的正則表達式
defination : 屬性值,可以通過 management > Admin > Policies 的查看。
ClusterRabbitMQ 會把所有的元數(shù)據(jù)存儲到所有的節(jié)點上,但是隊列是分散在集群中所有的節(jié)點上的。
Build A Cluster with docker我們嘗試使用 Docker Compose 創(chuàng)建一個由 3 個服務組成的集群
version: "3" services: node1: image: rabbitmq:3.7-management-alpine container_name: node1 hostname: node1 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5673:5672" - "15673:15672" node2: image: rabbitmq:3.7-management-alpine container_name: node2 hostname: node2 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5674:5672" - "15674:15672" node3: image: rabbitmq:3.7-management-alpine container_name: node3 hostname: node3 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5675:5672" - "15675:15672"
通過設置 hostname ,容器內部的 rabbitmq 的 nodename 就變成類似 rabbitmq@node1。同時集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 來進行互相認證。
啟動服務:
docker-compose up -d
然后將 node2 , node3 加入 node1 ,注意,加入集群之前 RabbitMQ 必須停止:
# 停止 rabbitmq docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node3 rabbitmqctl stop_app # 加入 node1 docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1 docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1 # 重新啟動 docker-compose exec node2 rabbitmqctl start_app docker-compose exec node3 rabbitmqctl start_app
在任意一個節(jié)點上查詢集群狀態(tài):
docker-compose exec node2 rabbitmqctl cluster_status
可以看到如下狀態(tài):
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]手動下線節(jié)點
將節(jié)點從在線狀態(tài)下線, 首先停止節(jié)點,然后重置節(jié)點。
docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node2 rabbitmqctl reset docker-compose exec node2 rabbitmqctl stop_app
在重新啟動服務器之后可以發(fā)現(xiàn)該節(jié)點已經(jīng)脫離了集群。
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node2]}]}, {running_nodes,[rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node2,[]}]}]節(jié)點類型
RabbitMQ 的節(jié)點類型有兩種,一種是 disc , 第二種是 ram。 RabbitMQ 要求集群中至少要有一個磁盤節(jié)點,儲存了所有的元數(shù)據(jù)。當集群中的唯一一個磁盤節(jié)點崩潰后,集群可以繼續(xù)收發(fā)消息,但是不能創(chuàng)建隊列等操作。
RabbitMQ 在加入集群時默認為磁盤模式,如果要以內存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改節(jié)點類型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type descMirror Queue
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 機制。請注意,開啟 Publisher Confirmed 或者事務的情況下,只有所有的 Slave 都 ACK 之后才會返回 ACK 給客戶端。
開啟 Mirror Queue 主要通過設置 Policy 其中最主要的是 defination:
ha-mode: Mirror Queue 的模式
all : 默認的模式,表示在集群中的所有節(jié)點上進行鏡像
exactly : 在指定數(shù)量的節(jié)點上進行鏡像,數(shù)量由 ha-params 指定。
nodes : 在指定的節(jié)點上進行鏡像,節(jié)點名稱由 ha-params 指定。
ha-params : 如上所述
ha-sync-mode : 消息的同步模式
automatic : 當新的 Slave 加入集群之后會自動同步消息。
manual: 默認,當加入新的 Slave 之后不會自動把消息同步到新的 Slave 上。指導調用命令顯式同步。
ha-promote-on-shutdown:
when-synced: 默認,如果主動停止 master ,那么 slave 不會自動接管。也就是說會期望 master 會重啟啟動,這可以保證消息不會丟失。
always: 不管 master 是因為什么原因停止的,slave 會立刻接管,有可能有一部分數(shù)據(jù)沒有從 master 同步到 slave.
ha-promote-on-failure: 默認 always ,不推薦設置為 when-synced
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/75346.html
摘要:慕課網(wǎng)消息中間件極速入門與實戰(zhàn)學習總結時間年月日星期三說明本文部分內容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實戰(zhàn)》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:基礎教程注本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。安裝庫這里我們首先將消息推入隊列,然后消費者從隊列中去除消息進行消費。 RabbitMQ 基礎教程(1) - Hello World 注:本文是對眾多博客的學習和總結,可能存在理解錯誤。請帶著懷疑的眼光,同時如果有錯誤希望能指出。 如果你喜歡我的文章,可以關注我的私人博客:http:...
摘要:添加應用啟動類通過半自動刷新配置。配置客戶端服務想要實現(xiàn)自動刷新配置的話,一端是不要做任何處理,只需要在一端處理即可。 SpringCloud(第 037 篇)通過bus/refresh半自動刷新ConfigClient配置 - 一、大致介紹 1、上章節(jié)我們講到了手動刷新配置,但是我們假設如果微服務一多的話,那么我們是不是需要對每臺服務進行手動刷新呢? 2、答案肯定是不需要的,我們也可...
摘要:消息丟失分成三種情況,可能出現(xiàn)生產(chǎn)者消費者。生產(chǎn)者丟失數(shù)據(jù)生產(chǎn)者丟失數(shù)據(jù)首先要確保寫入的消息別丟,消息隊列通過請求確認機制,保證消息的可靠傳輸。只有消息被持久化到磁盤以后,才會回傳消息。消息丟失分成三種情況,可能出現(xiàn)生產(chǎn)者、RabbitMQ、消費者。 生產(chǎn)者丟失數(shù)據(jù) 首先要確保寫入 RabbitMQ 的消息別丟,消息隊列通過請求確認機制,保證消息的可靠傳輸。生產(chǎn)開啟 comf...
閱讀 851·2021-11-15 17:58
閱讀 3652·2021-11-12 10:36
閱讀 3788·2021-09-22 16:06
閱讀 965·2021-09-10 10:50
閱讀 1332·2019-08-30 11:19
閱讀 3315·2019-08-29 16:26
閱讀 936·2019-08-29 10:55
閱讀 3347·2019-08-26 13:48