摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個(gè)分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開源的第三代分布式消息中間件。
上篇文章消息隊(duì)列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊(duì)列的發(fā)展史:
并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個(gè)RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳細(xì)探討RocketMQ究竟好在哪里。
RocketMQ是一個(gè)分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴(kuò)展性。它是阿里巴巴于2012年開源的第三代分布式消息中間件。
隨著阿里巴巴的電商業(yè)務(wù)不斷發(fā)展,需要一款更高性能的消息中間件,RocketMQ就是這個(gè)業(yè)務(wù)背景的產(chǎn)物。RocketMQ是一個(gè)分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于2012年開源的第三代分布式消息中間件。RocketMQ經(jīng)歷了多年雙十一的洗禮,在可用性、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)。值得一提的是,RocketMQ最初就是借鑒了Kafka進(jìn)行改造開發(fā)而來的,所以熟悉Kafka的朋友,會發(fā)現(xiàn)RocketMQ的原理和Kafka有很多相似之處。
RocketMQ前身叫做MetaQ,在MeataQ發(fā)布3.0版本的時(shí)候改名為RocketMQ,其本質(zhì)上的設(shè)計(jì)思路和Kafka類似,因?yàn)樽畛蹙褪腔贙afka改造而來,經(jīng)過不斷的迭代與版本升級,2016年11月21日,阿里巴巴向Apache軟件基金會捐贈了RocketMQ 。近年來被越來越多的國內(nèi)企業(yè)使用。
本文帶大家從以下幾個(gè)方面詳細(xì)了解RocketMQ:
RocketMQ的架構(gòu)主要分為四部分,如下圖所示:
Producer
:消息生產(chǎn)者,支持集群方式部署;Consumer
:消息消費(fèi)者,支持集群方式部署,支持pull,push模式獲取消息進(jìn)行消費(fèi),支持集群和廣播方式消費(fèi);NameServer
:Topic路由注冊中心,類似于Dubbo中的zookeeper,支持Broker的動態(tài)注冊與發(fā)現(xiàn);BrokerServer
:主要負(fù)責(zé)消息的存儲、投遞和查詢,以及服務(wù)高可用保證。BrokerServer包含以下幾個(gè)重要的子模塊:RocketMQ執(zhí)行原理如下圖所示:
brokerRole
,可選值:ASYNC_MASTER
:異步復(fù)制方式(異步雙寫),生產(chǎn)者寫入消息到Master之后,無需等到消息復(fù)制到Slave即可返回,消息的復(fù)制由旁路線程進(jìn)行異步復(fù)制;SYNC_MASTER
:同步復(fù)制方式(同步雙寫),生產(chǎn)者寫入消息到Master之后,需要等到Slave復(fù)制成功才可以返回。如果有多個(gè)Slave,只需要有一個(gè)Slave復(fù)制成功,并成功應(yīng)答,就算復(fù)制成功了。這里是否持久化到磁盤依賴于另一個(gè)參數(shù):flushDiskType
;SLAVE
:從節(jié)點(diǎn)本節(jié)我們來看看一個(gè)雙主雙從的RocketMQ是如何搭建的。
集群配置參數(shù)說明:
在討論集群前,我們需要了解兩個(gè)關(guān)鍵的集群配置參數(shù):brokerRole,flushDiskType。brokerRole在前一節(jié)已經(jīng)介紹了,而flushDiskType則是刷盤方式的配置,主要有:
- ASYNC_FLUSH: 異步刷盤
- SYNC_FLUSH: 同步刷盤
brokerRole確定了主從同步是異步的還是同步的,flushDiskType確定了數(shù)據(jù)刷盤的方式是同步的還是異步的。
如果業(yè)務(wù)場景對消息丟失容忍度很低,可以采用SYNC_MASTER + ASYNC_FLUSH的方式,這樣只有master和slave在刷盤前同時(shí)掛掉,消息才會丟失,也就是說即使有一臺機(jī)器出故障,仍然能保證數(shù)據(jù)不丟;
如果業(yè)務(wù)場景對消息丟失容忍度比較高,則可以采用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以盡可能的提高消息的吞吐量。
Master Broker支持讀和寫,Slave Broker只支持讀。
當(dāng)Master不可用的時(shí)候,Consumer會自動切換到Slave進(jìn)行讀,也就是說,當(dāng)Master節(jié)點(diǎn)的機(jī)器出現(xiàn)故障后,Consumer仍然可以從Slave節(jié)點(diǎn)讀取消息,不影響消費(fèi)端的消費(fèi)程序。
集群配置參數(shù)說明:
- brokerName: broker的名稱,需要把Master和Slave節(jié)點(diǎn)配置成相同的名稱,表示他們的主從關(guān)系,相同的brokerName的一組broker,組成一個(gè)broker組;
- brokerId: broker的id,0表示Master節(jié)點(diǎn)的id,大于0表示Slave節(jié)點(diǎn)的id。
在RocketMQ中,機(jī)器的主從節(jié)點(diǎn)關(guān)系是提前配置好的,沒有類似Kafka的Master動態(tài)選主功能。
如果一個(gè)Master宕機(jī)了,要讓生產(chǎn)端程序繼續(xù)可以生產(chǎn)消息,您需要部署多個(gè)Master節(jié)點(diǎn),組成多個(gè)broker組。這樣在創(chuàng)建Topic的時(shí)候,就可以把Topic的不同消息隊(duì)列分布在多個(gè)broker組中,即使某一個(gè)broker組的Master節(jié)點(diǎn)不可用了,其他組的Master節(jié)點(diǎn)仍然可用,保證了Producer可以繼續(xù)發(fā)送消息。
為了盡可能的保證消息不丟失
,并且保證生產(chǎn)者和消費(fèi)者的可用性
,我們可以構(gòu)建一個(gè)雙主雙從的集群,搭建的架構(gòu)圖如下所示:
部署架構(gòu)說明:
以下是關(guān)鍵的配置參數(shù):
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.100# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1# 0表示主節(jié)點(diǎn)brokerId=0# 2點(diǎn)進(jìn)行消息刪除deleteWhen=02# 消息在磁盤上保留48小時(shí)fileReservedTime=48# 主從同步復(fù)制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.101# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1 # 非0表示從節(jié)點(diǎn)brokerId=1# 2點(diǎn)進(jìn)行消息刪除deleteWhen=02# 消息在磁盤上保留48小時(shí)fileReservedTime=48# 從節(jié)點(diǎn)brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true # 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.102# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 0表示主節(jié)點(diǎn)brokerId=0# 2點(diǎn)進(jìn)行消息刪除deleteWhen=02# 消息在磁盤上保留48小時(shí)fileReservedTime=48# 主從同步復(fù)制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.103# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 非0表示從節(jié)點(diǎn)brokerId=1# 2點(diǎn)進(jìn)行消息刪除deleteWhen=02# 消息在磁盤上保留48小時(shí)fileReservedTime=48# 從節(jié)點(diǎn)brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創(chuàng)建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
寫了那么多頂層架構(gòu)圖,不寫寫底層內(nèi)幕,就不是IT宅(itzhai.com)的文章風(fēng)格,接下來,我們就來看看底層存儲架構(gòu)。
我們在broker.conf
文件中配置了消息存儲的根目錄:
# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
進(jìn)入這個(gè)目錄,我們可以發(fā)現(xiàn)如下的目錄結(jié)構(gòu):
其中:
下面我們來看看關(guān)鍵的commitlog以及consumequeue:
消息投遞到Broker之后,是先把實(shí)際的消息內(nèi)容存放到CommitLog中的,然后再把消息寫入到對應(yīng)主題的ConsumeQueue中。其中:
CommitLog:消息的物理存儲文件,存儲實(shí)際的消息內(nèi)容。每個(gè)Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。
單個(gè)文件大小默認(rèn)為1G,文件名長度為20位,左邊補(bǔ)零,剩余為起始偏移量。預(yù)分配好空間,消息順序?qū)懭肴罩疚募?/strong>。當(dāng)文件滿了,則寫入下一個(gè)文件,下一個(gè)文件的文件名基于文件第一條消息的偏移量進(jìn)行命名;
ConsumeQueue:消息的邏輯隊(duì)列,相當(dāng)于CommitLog的索引文件。RocketMQ是基于Topic主題訂閱模式實(shí)現(xiàn)的,每個(gè)Topic下會創(chuàng)建若干個(gè)邏輯上的消息隊(duì)列ConsumeQueue,在消息寫入到CommitLog之后,通過Broker的后臺服務(wù)線程(ReputMessageService)不停地分發(fā)請求并異步構(gòu)建ConsumeQueue和IndexFile(索引文件,后面介紹),然后把每個(gè)ConsumeQueue需要的消息記錄到各個(gè)ConsumeQueue中。
ConsumeQueue主要記錄8個(gè)字節(jié)的commitLogOffset(消息在CommitLog中的物理偏移量), 4個(gè)字節(jié)的msgSize(消息大小), 8個(gè)字節(jié)的TagHashcode,每個(gè)元素固定20個(gè)字節(jié)。
ConsumeQueue相當(dāng)于CommitLog文件的索引,可以通過ConsumeQueue快速從很大的CommitLog文件中快速定位到需要的消息。
主題消息隊(duì)列:在consumequeue目錄下,按照topic的維度存儲消息隊(duì)列。
重試消息隊(duì)列:如果topic中的消息消費(fèi)失敗,則會把消息發(fā)到重試隊(duì)列,重新隊(duì)列按照消費(fèi)端的GroupName來分組,命名規(guī)則:%RETRY%ConsumerGroupName
死信消息隊(duì)列:如果topic中的消息消費(fèi)失敗,并且超過了指定重試次數(shù)之后,則會把消息發(fā)到死信隊(duì)列,死信隊(duì)列按照消費(fèi)端的GroupName來分組,命名規(guī)則:%DLQ%ConsumerGroupName
假設(shè)我們現(xiàn)在有一個(gè)topic:itzhai-test
,消費(fèi)分組:itzhai_consumer_group
,當(dāng)消息消費(fèi)失敗之后,我們查看consumequeue目錄,會發(fā)現(xiàn)多處了一個(gè)重試隊(duì)列:
我們可以在RocketMQ的控制臺看到這個(gè)重試消息隊(duì)列的主題和消息:
如果一直重試失敗,達(dá)到一定次數(shù)之后(默認(rèn)是16次,重試時(shí)間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊(duì)列:
每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預(yù)先分配好1G空間的CommitLog文件,采用順序?qū)?/strong>的方式寫入消息,大大的提高寫入的速度。
RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種,通過flushDiskType參數(shù)進(jìn)行配置。如果需要提高寫消息的效率,降低延遲,提高M(jìn)Q的性能和吞吐量,并且不要求消息數(shù)據(jù)存儲的高可靠性,可以把刷盤策略設(shè)置為異步刷盤。
為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費(fèi)消息的索引,使用IndexFile作為基于消息key的查詢的索引。下面來詳細(xì)介紹下。
讀取消息是隨機(jī)讀的,為此,RocketMQ專門建立了ConsumeQueue索引文件,每次先從ConsumeQueue中獲取需要的消息的地址,消息大小,然后從CommitLog文件中根據(jù)地址直接讀取消息內(nèi)容。在讀取消息內(nèi)容的過程中,也盡量利用到了操作系統(tǒng)的頁緩存機(jī)制,進(jìn)一步加速讀取速度。
ConsumeQueue由于每個(gè)元素大小是固定的,因此可以像訪問數(shù)組一樣訪問每個(gè)消息元素。并且占用空間很小,大部分的ConsumeQueue能夠被全部載入內(nèi)存,所以這個(gè)索引查找的速度很快。每個(gè)ConsumeQueue文件由30w個(gè)元素組成,占用空間在6M以內(nèi)。每個(gè)文件默認(rèn)大小為600萬個(gè)字節(jié),當(dāng)一個(gè)ConsumeQueue類型的文件寫滿之后,則寫入下一個(gè)文件。
我們在RocketMQ的store目錄中可以發(fā)現(xiàn)有一個(gè)index目錄,這個(gè)是一個(gè)用于輔助提高查詢消息效率的索引文件。通過該索引文件實(shí)現(xiàn)基于消息key來查詢消息的功能。
IndexFile索引文件物理存儲結(jié)構(gòu)如下圖所示:
beginTimestamp
:索引文件中第一個(gè)索引消息存入Broker的時(shí)間戳;endTimestamp
:索引文件中最后一個(gè)索引消息存入Broker的時(shí)間戳beginPHYOffset
:索引文件中第一個(gè)索引消息在CommitLog中的偏移量;endPhyOffset
:索引文件中最后一個(gè)索引消息在CommitLog中的偏移量;hashSlotCount
:構(gòu)建索引使用的slot數(shù)量;indexCount
:索引的總數(shù);Key Hash
:消息的哈希值;Commit Log Offset
:消息在CommitLog中的偏移量;Timestamp
:消息存儲的時(shí)間戳;Next Index Offset
:下一個(gè)索引的位置,如果消息取模后發(fā)生槽位槽位碰撞,則通過此字段把碰撞的消息構(gòu)成鏈表。每個(gè)IndexFile文件的大小:40b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。
IndexFile索引文件的邏輯存儲結(jié)構(gòu)如下圖所示:
IndexFile邏輯上是基于哈希表來實(shí)現(xiàn)的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。
RocketMQ中的MessageId的長度總共有16字節(jié),其中包含了:消息存儲主機(jī)地址(IP地址和端口),消息Commit Log offset。“
按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個(gè)RPC請求后通過Remoting通信層發(fā)送(業(yè)務(wù)請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個(gè)完整的消息返回。
我們繼續(xù)看看在集群模式下,RocketMQ的Topic數(shù)據(jù)是如何做分區(qū)的。IT宅(itzhai.com)提醒大家,實(shí)踐出真知。這里我們部署兩個(gè)Master節(jié)點(diǎn):
我們通過手動配置每個(gè)Broker中的Topic,以及ConsumeQueue數(shù)量,來實(shí)現(xiàn)Topic的數(shù)據(jù)分片,如,我們到集群中手動配置這樣的Topic:
broker-a
創(chuàng)建itzhai-com-test-1
,4個(gè)隊(duì)列;broker-b
創(chuàng)建itzhai-com-test-1
,2個(gè)隊(duì)列。創(chuàng)建完成之后,Topic分片集群分布如下:
即:
可以發(fā)現(xiàn),RocketMQ是把Topic分片存儲到各個(gè)Broker節(jié)點(diǎn)中,然后在把Broker節(jié)點(diǎn)中的Topic繼續(xù)分片為若干等分的ConsumeQueue,從而提高消息的吞吐量。ConsumeQueue是作為負(fù)載均衡資源分配的基本單元。
這樣把Topic的消息分區(qū)到了不同的Broker上,從而增加了消息隊(duì)列的數(shù)量,從而能夠支持更塊的并發(fā)消費(fèi)速度(只要有足夠的消費(fèi)者)。
假設(shè)設(shè)置為通過Broker自動創(chuàng)建Topic(autoCreateTopicEnable=true),并且Producer端設(shè)置Topic消息隊(duì)列數(shù)量設(shè)置為4,也就是默認(rèn)值:
producer.setDefaultTopicQueueNums(4);
嘗試往一個(gè)新的 topic itzhai-test-queue-1
連續(xù)發(fā)送10條消息,發(fā)送完畢之后,查看Topic狀態(tài):
我們可以發(fā)現(xiàn),在兩個(gè)broker上面都創(chuàng)建了itzhai-test-queue-a
,并且每個(gè)broker上的消息隊(duì)列數(shù)量都為4。怎么回事,我配置的明明是期望創(chuàng)建4個(gè)隊(duì)列,為什么加起來會變成了8個(gè)?如下圖所示:
由于時(shí)間關(guān)系,本文我們不會帶大家從源碼方面去解讀為啥會出現(xiàn)這種情況,接下來我們通過一種更加直觀的方式來驗(yàn)證下這個(gè)問題:繼續(xù)做實(shí)驗(yàn)。
我們繼續(xù)嘗試往一個(gè)新的 topic itzhai-test-queue-10
發(fā)送1條消息,注意,這一次不做并發(fā)發(fā)送了,只發(fā)送一條,發(fā)送完畢之后,查看Topic狀態(tài):
可以發(fā)現(xiàn),這次創(chuàng)建的消息隊(duì)列數(shù)量又是對的了,并且都是在broker-a上面創(chuàng)建的。接下來,無論怎么并發(fā)發(fā)送消息,消息隊(duì)列的數(shù)量都不會繼續(xù)增加了。
其實(shí)這也是并發(fā)請求Broker,觸發(fā)自動創(chuàng)建Topic的bug。
為了更加嚴(yán)格的管理Topic的創(chuàng)建和分片配置,一般在生產(chǎn)環(huán)境都是配置為手動創(chuàng)建Topic,通過提交運(yùn)維工單申請創(chuàng)建Topic以及Topic的數(shù)據(jù)分配。
接下來我們來看看RocketMQ的特性。更多其他技術(shù)的底層架構(gòu)內(nèi)幕分析,請?jiān)L問我的博客IT宅(itzhai.com)或者關(guān)注Java架構(gòu)雜談公眾號。
RocketMQ中定義了如下三種消息通信的方式:
public enum CommunicationMode { SYNC, ASYNC, ONEWAY,}
SYNC
:同步發(fā)送,生產(chǎn)端會阻塞等待發(fā)送結(jié)果;ASYNC
:異步發(fā)送,生產(chǎn)端調(diào)用發(fā)送API之后,立刻返回,在拿到Broker的響應(yīng)結(jié)果后,觸發(fā)對應(yīng)的SendCallback回調(diào);ONEWAY
:單向發(fā)送,發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應(yīng)答。 此方式發(fā)送消息的過程耗時(shí)非常短,一般在微秒級別;SYNC和ASYNC關(guān)注發(fā)送結(jié)果,ONEWAY不關(guān)注發(fā)送結(jié)果。發(fā)送結(jié)果如下:
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE,}
SEND_OK
:消息發(fā)送成功。SEND_OK并不意味著投遞是可靠的,要確保消息不丟失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;FLUSH_DISK_TIMEOUT
:消息發(fā)送成功,但是刷盤超時(shí)。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒內(nèi)沒有完成消息的刷盤,則會返回這個(gè)狀態(tài);FLUSH_SLAVE_TIMEOUT
:消息發(fā)送成功,但是服務(wù)器同步到Slave時(shí)超時(shí)。如果Broker的brokerRole=SYNC_MASTER,并且5秒內(nèi)沒有完成同步,則會返回這個(gè)狀態(tài);SLAVE_NOT_AVAILABLE
:消息發(fā)送成功,但是無可用的Slave節(jié)點(diǎn)。如果Broker的brokerRole=SYNC_MASTER,但是沒有發(fā)現(xiàn)SLAVE節(jié)點(diǎn)或者SLAVE節(jié)點(diǎn)掛掉了,那么會返回這個(gè)狀態(tài)。源碼內(nèi)容更精彩,歡迎大家進(jìn)一步閱讀源碼詳細(xì)了解消息發(fā)送的內(nèi)幕:
- 同步發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
- 異步發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
- 單向發(fā)送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)
消息的有序性指的是一類消息消費(fèi)的時(shí)候,可以按照發(fā)送順序來消費(fèi),比如:在Java架構(gòu)雜談
茶餐廳吃飯產(chǎn)生的消息:進(jìn)入餐廳、點(diǎn)餐、下單、上菜、付款,消息要按照這個(gè)順序消費(fèi)才有意義,但是多個(gè)顧客產(chǎn)生的消息是可以并行消費(fèi)的。順序消費(fèi)又分為全局順序消費(fèi)和分區(qū)順序消費(fèi):
全局順序
:同一個(gè)Topic下的消息,所有消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。適用于:性能要求不高,所有消息嚴(yán)格按照FIFO進(jìn)行發(fā)布和消費(fèi)的場景;分區(qū)順序
:同一個(gè)Topic下,根據(jù)消息的特定業(yè)務(wù)ID進(jìn)行sharding key分區(qū),同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。適用于:性能要求高,在同一個(gè)分區(qū)中嚴(yán)格按照FIFO進(jìn)行發(fā)布和消費(fèi)的場景。一般情況下,生產(chǎn)者是會以輪訓(xùn)的方式把消息發(fā)送到Topic的消息隊(duì)列中的:
在同一個(gè)Queue里面,消息的順序性是可以得到保證的,但是如果一個(gè)Topic有多個(gè)Queue,以輪訓(xùn)的方式投遞消息,那么就會導(dǎo)致消息亂序了。
為了保證消息的順序性,需要把保持順序性的消息投遞到同一個(gè)Queue中。
RocketMQ提供了MessageQueueSelector
接口,可以用來實(shí)現(xiàn)自定義的選擇投遞的消息隊(duì)列的算法:
for (int i = 0; i < orderList.size(); i++) { String content = "Hello itzhai.com. Java架構(gòu)雜談," + new Date(); Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Long orderId = (Long) arg; // 訂單號與消息隊(duì)列個(gè)數(shù)取模,保證讓同一個(gè)訂單號的消息落入同一個(gè)消息隊(duì)列 long index = orderId % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId()); System.out.printf("content: %s, sendResult: %s%n", content, sendResult);}
如上圖,我們實(shí)現(xiàn)了MessageQueueSelector
接口,并在實(shí)現(xiàn)的select方法里面,指定了選擇消息隊(duì)列的算法:訂單號與消息隊(duì)列個(gè)數(shù)取模,保證讓同一個(gè)訂單號的消息落入同一個(gè)消息隊(duì)列:
有個(gè)異常場景需要考慮:假設(shè)某一個(gè)Master節(jié)點(diǎn)掛掉了,導(dǎo)致Topic的消息隊(duì)列數(shù)量發(fā)生了變化,那么繼續(xù)使用以上的選擇算法,就會導(dǎo)致在這個(gè)過程中同一個(gè)訂單的消息會分散到不同的消息隊(duì)列里面,最終導(dǎo)致消息不能順序消費(fèi)。
為了避免這種情況,只能選擇犧牲failover特性了。
現(xiàn)在投遞到消息隊(duì)列中的消息保證了順序,那如何保證消費(fèi)也是順序的呢?
RocketMQ中提供了MessageListenerOrderly
,該對象用于有順序收異步傳遞的消息,一個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)線程,使用方法如下:
consumer.registerMessageListener(new MessageListenerOrderly() { // 消費(fèi)次數(shù),用于輔助模擬各種消費(fèi)結(jié)果 AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; }});
如果您使用的是MessageListenerConcurrently
,表示并發(fā)消費(fèi),為了保證消息消費(fèi)的順序性,需要設(shè)置為單線程模式。
使用
MessageListenerOrderly
的問題:如果遇到某條消息消費(fèi)失敗,并且無法跳過,那么消息隊(duì)列的消費(fèi)進(jìn)度就會停滯。
定時(shí)消費(fèi)是指消息發(fā)送到Broker之后不會立即被消費(fèi),而是等待特定的時(shí)間之后才投遞到Topic中。定時(shí)消息會暫存在名為SCHEDULE_TOPIC_XXXX
的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個(gè)queue只存相同延遲的消息,保證具有相同延遲的消息能夠順序消費(fèi)。比如,我們設(shè)置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
Broker會調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫入真實(shí)的topic。
定時(shí)消息的副作用:定時(shí)消息會在第一次寫入Topic和調(diào)度寫入實(shí)際的topic都會進(jìn)行計(jì)數(shù),因此發(fā)送數(shù)量,tps都會變高。
使用延遲隊(duì)列的場景:提交了訂單之后,如果等待超過約定的時(shí)間還未支付,則把訂單設(shè)置為超時(shí)狀態(tài)。
RocketMQ提供了以下幾個(gè)固定的延遲級別:
public class MessageStoreConfig { ... // 10個(gè)level,level:1~18 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; ...}
level = 0 表示不使用延遲消息。
另外,消息消費(fèi)失敗也會進(jìn)入延遲隊(duì)列,消息發(fā)送時(shí)間與設(shè)置的延遲級別和重試次數(shù)有關(guān)。
以下是發(fā)送延遲消息的代碼:
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup"); producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 指定該消息在10秒后被消費(fèi)者消費(fèi) message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); }}
通過消息對系統(tǒng)進(jìn)行解耦之后,勢必會遇到分布式系統(tǒng)數(shù)據(jù)完整性的問題。
我們可以通過以下手段解決分布式系統(tǒng)數(shù)據(jù)最終一致性問題:
2PC(Two-phase commit protocol)
,二階段提交,同步阻塞,效率低下,存在協(xié)調(diào)者單點(diǎn)故障問題,極端情況下存在數(shù)據(jù)不一致的風(fēng)險(xiǎn)。對應(yīng)技術(shù)上的XA、JTA/JTS。這是分布式環(huán)境下事務(wù)處理的典型模式;3PC
,三階段提交,引入了參與者超時(shí)機(jī)制,增加了預(yù)提交階段,使得故障恢復(fù)之后協(xié)調(diào)者的決策復(fù)雜度降低,但整體的交互過程變得更長了,性能有所下降,仍舊會存在數(shù)據(jù)不一致的問題;Try - Confirm - Cancel
。對業(yè)務(wù)的侵入較大,和業(yè)務(wù)緊耦合,對于每一個(gè)操作都需要定義三個(gè)動作分別對應(yīng):Try - Confirm - Cancel
,將資源層的兩階段提交協(xié)議轉(zhuǎn)換到業(yè)務(wù)層,成為業(yè)務(wù)模型中的一部分;RocketMQ事務(wù)消息(Transactional Message)則是通過事務(wù)消息來實(shí)現(xiàn)分布式事務(wù)的最終一致性。下面看看RocketMQ是如何實(shí)現(xiàn)事務(wù)消息的。
如下圖:
事務(wù)消息有兩個(gè)流程:
補(bǔ)償階段主要用于解決消息的Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。
half消息:并不是發(fā)送了一半的消息,而是指消息已經(jīng)發(fā)送到了MQ Server,但是該消息未收到生產(chǎn)者的二次確認(rèn),此時(shí)該消息暫時(shí)不能投遞到具體的ConsumeQueue中,這種狀態(tài)的消息稱為half消息。
發(fā)送到MQ Server的half消息對消費(fèi)者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue信息存儲到消息的屬性中,然后把該half消息投遞到一個(gè)專門的處理事務(wù)消息的隊(duì)列中:RMQ_SYS_TRANS_HALF_TOPIC
,由于消費(fèi)者沒有訂閱該Topic,所以無法消息half類型的消息。
生產(chǎn)者執(zhí)行Commit half消息的時(shí)候,會存儲一條專門的Op消息,用于標(biāo)識事務(wù)消息已確定的狀態(tài),如果一條事務(wù)消息還沒有對應(yīng)的Op消息,說明這個(gè)事務(wù)的狀態(tài)還無法確定。RocketMQ會開啟一個(gè)定時(shí)任務(wù),對于pending狀態(tài)的消息,會先向生產(chǎn)者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是否提交或者回滾消息。
當(dāng)消息被標(biāo)記為Commit狀態(tài)之后,會把half消息的Topic和Queue相關(guān)屬性還原為原來的值,最終構(gòu)建實(shí)際的消費(fèi)索引(ConsumeQueue)。
RocketMQ并不會無休止的嘗試消息事務(wù)狀態(tài)回查,默認(rèn)查找15次,超過了15次還是無法獲取事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。并打印錯(cuò)誤日志,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個(gè)行為。
可以通過Broker的配置參數(shù):transactionCheckMax來修改此值。
如果消息發(fā)布方式是同步發(fā)送會重投,如果是異步發(fā)送會重試。
消息重投可以盡可能保證消息投遞成功,但是可能會造成消息重復(fù)。
什么情況會造成重復(fù)消費(fèi)消息?
可以使用的消息重試策略:
retryTimesWhenSendFailed
:設(shè)置同步發(fā)送失敗的重投次數(shù),默認(rèn)為2。所以生產(chǎn)者最多會嘗試發(fā)送retryTimesWhenSendFailed+1次。retryTimesWhenSendAsyncFailed
:設(shè)置異步發(fā)送失敗重試次數(shù),異步重試不會選擇其他Broker,不保證消息不丟失;retryAnotherBrokerWhenNotStoreOK
:消息刷盤(主或備)超時(shí)或slave不可用(返回狀態(tài)非SEND_OK),是否嘗試發(fā)送到其他broker,默認(rèn)false。重要的消息可以開啟此選項(xiàng)。oneway發(fā)布方式不支持重投。
為了提高系統(tǒng)的吞吐量,提高發(fā)送效率,可以使用批量發(fā)送消息。
批量發(fā)送消息的限制:
發(fā)送批量消息的例子:
String topic = "itzhai-test-topic";List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));producer.send(messages);
如果發(fā)送的消息比較多,會增加復(fù)雜性,為此,可以對大消息進(jìn)行拆分。以下是拆分的例子:
public class ListSplitter implements Iterator> { // 限制最大大小 private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int startIndex = getStartIndex(); int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } private int getStartIndex() { Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) { currIndex += 1; Message message = messages.get(curIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length(); Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes return tmpSize; }}// then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) { try { List listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); // handle the error }}
RocketMQ的消費(fèi)者可以根據(jù)Tag進(jìn)行消息過濾來獲取自己感興趣的消息,也支持自定義屬性過濾。
Tags是Topic下的次級消息類型/二級類型(注:Tags也支持TagA || TagB
這樣的表達(dá)式),可以在同一個(gè)Topic下基于Tags進(jìn)行消息過濾。
消息過濾是在Broker端實(shí)現(xiàn)的,減少了對Consumer無用消息的網(wǎng)絡(luò)傳輸,缺點(diǎn)是增加了Broker負(fù)擔(dān),實(shí)現(xiàn)相對復(fù)雜。
消費(fèi)端有兩周消費(fèi)模型:集群消費(fèi)和廣播消費(fèi)。
集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?/p>
廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息。
RocketMQ會為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic名稱為%RETRY%consumerGroupName
的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對消費(fèi)組,而不是針對每個(gè)Topic設(shè)置的),用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無法消費(fèi)的消息。
考慮到異常恢復(fù)起來需要一些時(shí)間,會為重試隊(duì)列設(shè)置多個(gè)重試級別,每個(gè)重試級別都有與之對應(yīng)的重新投遞延時(shí),重試次數(shù)越多投遞延時(shí)就越大。
RocketMQ對于重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX
的延遲隊(duì)列中,后臺定時(shí)任務(wù)按照對應(yīng)的時(shí)間進(jìn)行Delay后重新保存至%RETRY%consumerGroupName
的重試隊(duì)列中。
比如,我們設(shè)置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列會自動進(jìn)行消息重試;達(dá)到最大重試次數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息,此時(shí),消息隊(duì)列不會立刻將消息丟棄,而是將其發(fā)送到該消費(fèi)者對應(yīng)的特殊隊(duì)列中。
RocketMQ將這種正常情況下無法被消費(fèi)的消息稱為死信消息(Dead-Letter Message)
,將存儲死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)
。
在RocketMQ中,可以通過使用console控制臺對死信隊(duì)列中的消息進(jìn)行重發(fā)來使得消費(fèi)者實(shí)例再次進(jìn)行消費(fèi)。
由于RocketMQ是使用Java寫的,所以它的代碼特別適合拿來閱讀消遣,我們繼續(xù)來看看RocketMQ的源碼結(jié)構(gòu)...
不不,還是算了,一下子又到周末晚上了,時(shí)間差不多了,今天就寫到這里了。有空再聊。
我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的里面有,面試官不懂的里面也有,有了它,不怕面試官連環(huán)問,就怕面試官一上來就問你Redis的Redo Log是干啥的?畢竟這種問題我也不會。
在Java架構(gòu)雜談
公眾號發(fā)送Redis
關(guān)鍵字獲取pdf文件:
本文作者: arthinking
博客鏈接: https://www.itzhai.com/articles/deep-understanding-of-rocketmq.html
高并發(fā)異步解耦利器:RocketMQ究竟強(qiáng)在哪里?
版權(quán)聲明: 版權(quán)歸作者所有,未經(jīng)許可不得轉(zhuǎn)載,侵權(quán)必究!聯(lián)系作者請加公眾號。
apache/rocketmq. Retrieved from https://github.com/apache/rocketmq
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/124531.html
摘要:熱點(diǎn)隨筆入門實(shí)現(xiàn)跨框架組件復(fù)用葡萄城技術(shù)團(tuán)隊(duì)二工作三年的一些感悟百萬級大數(shù)據(jù)插入更新,支持多種數(shù)據(jù)庫果糖大數(shù)據(jù)科技被下屬罵,記一次矛盾升級有心無心,蝴蝶效應(yīng)葉小釵中的鑒權(quán)授權(quán)正確方式包子推薦一款顏值逆天且功能齊全的開源工具鉑賽東開源免費(fèi)圖熱點(diǎn)隨筆:·?Svelte入門——Web Components實(shí)現(xiàn)跨框架組件復(fù)用?(葡萄城技術(shù)團(tuán)隊(duì))·?(二)工作三年的一些感悟?(Craftsman-L)...
摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色。下屬們匯報(bào)的消息,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)。 微信公眾號:IT一刻鐘大型現(xiàn)實(shí)非嚴(yán)肅主義現(xiàn)場一刻鐘與你分享優(yōu)質(zhì)技術(shù)架構(gòu)與見聞,做一個(gè)有劇情的程序員關(guān)注可了解更多精彩內(nèi)容。問題或建議,請公眾號留言...
摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色。下屬們匯報(bào)的消息,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)。 微信公眾號:IT一刻鐘大型現(xiàn)實(shí)非嚴(yán)肅主義現(xiàn)場一刻鐘與你分享優(yōu)質(zhì)技術(shù)架構(gòu)與見聞,做一個(gè)有劇情的程序員關(guān)注可了解更多精彩內(nèi)容。問題或建議,請公眾號留言...
摘要:數(shù)量對吞吐量的影響可以達(dá)到幾百幾千個(gè)的級別,吞吐量會有小幅度的下降。這是的一大優(yōu)勢,可在同等數(shù)量機(jī)器下支撐大量的從幾十個(gè)到幾百個(gè)的時(shí)候,吞吐量會大幅下降。下一篇如何保證消息隊(duì)列的高可用 1.為什么使用消息隊(duì)列? (1)解耦:可以在多個(gè)系統(tǒng)之間進(jìn)行解耦,將原本通過網(wǎng)絡(luò)之間的調(diào)用的方式改為使用MQ進(jìn)行消息的異步通訊,只要該操作不是需要同步的,就可以改為使用MQ進(jìn)行不同系統(tǒng)之間的聯(lián)系,這樣項(xiàng)目之間...
閱讀 3654·2021-11-23 09:51
閱讀 1996·2021-11-16 11:42
閱讀 3245·2021-11-08 13:20
閱讀 1100·2019-08-30 15:55
閱讀 2210·2019-08-30 10:59
閱讀 1245·2019-08-29 14:04
閱讀 1030·2019-08-29 12:41
閱讀 2031·2019-08-26 12:22