摘要:一關鍵字和之間的連接關系實際存儲消息。生產者進行接受應答,用來確定這條消息是否正常的發送到了,這種方式也是消息的可靠性投遞的核心保障。支持消息的過期時間,在消息發送時可以進行指定。可以監聽這個隊列中消息做相應的處理。
一、rabbitmq關鍵字
Binding:Exchange和Exchange、Queue之間的連接關系二、rabbitmq高級特性
Queue:實際存儲消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如選yes,代表當最后一個監聽被移除之后,該Queue會自動刪除。
Message:服務器和應用程序之間傳送的數據。本質上就是一段數據,由Properties和Payload(Body)組成。常用屬性:delivery mode、headers(自定義屬性)
Virtual host:虛擬主機,用于進行邏輯隔離,最上層的消息路由。一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchange或者Queue。
消息如何保證100%的投遞成功?
生產端可靠性投遞
保障消息的成功發出
保障MQ節點的成功接收
發送端收到MQ節點(Broker)確認應答
完善的消息進行補償機制
可靠性投遞解決方案
消息落庫,對消息狀態進行打標
消息的延遲投遞,做二次確認,回調檢查
消息集群鏡像隊列:rabbitmq集群模式非常經典的就是Mirror鏡像模式,保證100%數據不丟失,在實際工作中也是用的最多的。并且實現集群非常簡單,一般互聯網公司都會構建這種鏡像集群模式。https://segmentfault.com/a/11...
冪等性概念詳解
在海量訂單產生的業務高峰期,如何避免消息的重復消費問題?
Confirm確認消息、Return返回消息
理解Confirm消息確認機制:
消息的確認,是指生產者投遞消息后,如果Broker收到消息,則會給我們生產者一個應答。
生產者進行接受應答,用來確定這條消息是否正常的發送到了Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實現Confirm確認消息?
第一步:在channel上開啟確認模式:channel.confirmSelect()
第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送、或者記錄日志等后續處理。
Return消息機制
Return Listener用于處理一些不可路由的消息
我們的消息生產者,通過制定一個Exchange和Routing Key,把消息送達到某一個隊列中去,然后我們的消費者監聽隊列,進行消費處理。
但是在某寫情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的消息,就要使用Return Listener。
Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為false,那么broker端自動刪除該消息。
自定義消費者
public class MyConsumer extends DefaultConsumer implements Consumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { } }
消息的ACK與重回隊列
消費端手工ACK(應答成功)和NACK(應答失敗)
消費端進行消費的時候,如果由于業務異常我們可以進行日志的記錄,然后進行補償。
由于服務器宕機等嚴重問題,我們就要手工進行ACK保障消費端消費成功。比如:消費一半的時候宕機了,broker端沒有收到應答,重發消息。
消費端重回隊列
消費端重回隊列是為了對沒有處理成功的消息,把消息重新回遞給broker。
一般我們在實際應用中,都會關閉重回隊列,也就是設置為false。
channel.basciNack(envelope.getDeliveryTag(),false,true) // 為true的話,在消費失敗的情況下會重回隊列放入隊列末端。
消息的限流
假設有一個場景,RabbitMq服務器上有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現下面的情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據。RabbitMq提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基于consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount: 會告訴RabbitMq不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有被ack,則該consumer將block掉,知道有消息ack。
global: true/false,是否將上面設置應用于channel,簡答點說,就是上面限制是channel級別的還是consumer級別。
// 限流方式,第一件事就是autoAck設置為false,關閉自動簽收,必須手動簽收 channel.basicQos(0,3,false); // 3表示如果消息積壓了1000條,先給我推3條,這三條消費結束后,我會給你一個ack表示這三條我已經處理完了,然后再給我推送3條... channel.basicConsume(queueName,false,new MyConsumer(channel)) //在MyConsumer中對消息進行簽收ack channel.basicAck(envelope,getDeliveryTag(), false);
TTL消息
TTL:是Time To Live。就是生存時間。
RabbitMQ支持消息的過期時間,在消息發送時可以進行指定。
RabbitMQ支持隊列的過期時間,從消息入隊開始計算,只要超過了隊列的超時時間配置,那么消息會自動清除。
死信隊列
死信隊列:DLX,Dead-Letter-Exchange。
利用DLX,當消息在一個隊列中變成了死信(dead message:這條消息沒有消費者去消費)之后,他能被重新publish到另外一個Exchange,這個Exchange就是DLX。
進入死信隊列(進入死信的三種方式)1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
2.消息TTL過期
3.隊列達到最大長度
備注說明
DLX也是一個正常的Exchange,和一般的Exchange沒區別,他能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
當這個隊列中有死信時,RabbitMQ會自動將這個消息重新發送到已經設置了的Exchange上去,進而被路由到另外一個隊列上去。
可以監聽這個隊列中消息做相應的處理。
死信隊列設置
首先需要設置死信隊列的exchange和queue,然后進行綁定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
然后進行正常聲明交換機、隊列、綁定,只不過我們需要在隊列加上一個參數即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77668.html
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網提供的安裝方式本人安裝成功的方式第三步查看是否已經安裝好了,能查到說明已經安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網提供的安裝方式本人安裝成功的方式第三步查看是否已經安裝好了,能查到說明已經安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:本文基于的插件,針對進行簡單的測試。包括協議的介紹,的安裝配置開啟插件及基于進行的測試。協議是基于發布訂閱模型的物聯網消息傳遞協議。對傳輸消息有三種服務質量最多一次,這一級別會發生消息丟失或重復,消息發布依賴于底層網絡。 ...
閱讀 2033·2023-04-25 14:50
閱讀 2913·2021-11-17 09:33
閱讀 2617·2019-08-30 13:07
閱讀 2845·2019-08-29 16:57
閱讀 913·2019-08-29 15:26
閱讀 3552·2019-08-29 13:08
閱讀 1996·2019-08-29 12:32
閱讀 3391·2019-08-26 13:57