国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

rocketmq之producer解析

luodongseu / 840人閱讀

摘要:所以基于目前的設計,建議關閉自動創建的功能,然后根據消息量的大小,手動創建。如果發送消息,返回結果超時,這種超時不會進行重試了如果是方法本身耗時超過,還未來得及調用發送消息,此時的超時也不會重試。

先來看下producer核心的類設計,如下圖:

1、核心發布消息的類DefaultMQProducer,繼承自MQProducer接口,此接口定義了一系列發送消息的方法,如普通消息,順序消息,延時消息等,最終進行網絡通信會交給MQClientAPIImpl處理。

2、rocketmq從4.1.3版本開始又支持了事務消息,由TransactionMQProducer類提供(之后會有專門的文章進行詳細解讀事務消息)

producer之配置

我們看到DefaultMQProducer繼承了一個客戶端的公共配置類ClientConfig(與consumer公用),其實就是一個普通的javaBean,既可以代碼中設置屬性,也可以集成spring來配置

參數名 默認值 說明
namesrvAddr nameserver的地址列表,用分號隔開
clientIP 本機ip地址 客戶端ip地址,有時候無法識別,需要手動配置
instanceName DEFAULT 客戶端實例名稱,客戶端創建的多個 Producer、Consumer 實際是共用一個內部實例(這個實例包含網絡連接、線程資源等)
clientCallbackExecutorThreads cpu核數 通信層客戶端處理請求的線程數
pollNameServerInterval 30000 輪詢nameserver的時間間隔,單位ms
heartbeatBrokerInterval 30000 向broker發送心跳的時間間隔,單位ms
persistConsumerOffsetInterval 5000 持久化 Consumer 消費進度間隔時間,單位ms

producer獨有的配置:

參數名 默認值 說明
producerGroup DEFAULT_PRODUCER Producer組名,相同分組的producer應該有相同的發送消息邏輯
createTopicKey AUTO_CREATE_TOPIC_KEY 自動創建topic時,以此默認topic為模板創建指定topic
defaultTopicQueueNums 4 自動創建topic隊列數量
sendMsgTimeout 3000 發送消息的超時時間,單位ms
compressMsgBodyOverHowmuch 4098 消息體超過多大會進行壓縮,單位字節
retryTimesWhenSendFailed 2 同步發送消息,發送失敗重試次數
retryTimesWhenSendAsyncFailed 2 異步發送消息,發送失敗的重試次數
retryAnotherBrokerWhenNotStoreOK false 同步發送消息,消息存儲失敗是否重試其他broker
maxMessageSize 4194304 客戶端限制消息的大小,默認4M
TransactionListener 事務消息時,必須設置的回查監聽器
producer之group概念

我們在創建producer時必須要指定一個group,這里有兩個作用:

生產者一般會是集群部署的,group用來標識一類生產者,相同group的生產者一般要有相同的發送邏輯。

在發送事務消息時,當事務消息異常,broker端來回查事務狀態時,需要知道是由哪類生產者發送的事務消息,生產端會根據group名稱來查找對應的producer來執行相應的回查邏輯。

producer的啟動流程

簡單說明下整個啟動流程:

1、首先在DefaultMQProducerImpl中會做一些參數校驗,如group是否合法;然后會創建MQClientInstance實例,此實例包含網絡連接、線程資源等,相同的clientId會共享此實例,所以通過MQClientManager來管理。

2、核心的啟動流程在MQClientInstance類中,如果nameserver地址沒有配置的話,會先通過靜態的http服務器地址去抓取nameserver的地址;再則啟動netty客戶端。

3、啟動一些定時任務,跟producer有關的如下幾個:

如果producer沒有配置nameserver地址,啟動定時抓取nameserver的地址的定時任務,任務延時10s開始,每隔2分支執行一次。

輪詢nameserver定時任務,主要是定時更新topic的路由信息,任務延時10ms開始,每隔30s執行一次。

清除下線的broker和向broker發送心跳,任務延時1s執行,每隔30s執行一次

Producer如何尋址

RocketMQ 有多種配置方式可以令客戶端找到 NameServer, 然后通過 NameServer 再找到 Broker,分別如下,
優先級由高到低,高優優先級會覆蓋低優先級

1、代碼中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

2、啟動參數指定

-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

3、環境變量指定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

4、HTTP 靜態服務器尋址(默認)

如果以上三種都沒有設置name server的地址,客戶端啟動后先會訪問一個靜態http服務器獲取name server的地址,然后會啟動一個定時任務訪問這個靜態 HTTP 服務器,地址如下:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

這是默認的地址,當然你也可以更改,做如下設置:

代碼:

System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")

或者啟動參數指定:

-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer

以上設置后http服務器地址就變成:

http://localhsot:8080/rocketmq/nameServer

這個 URL 的返回內容格式如下:

192.168.0.1:9876;192.168.0.2:9876

客戶端每隔 2 分鐘訪問一次這個 HTTP 服務器,并更新本地的 Name Server 地址。

推薦使用 HTTP 靜態服務器尋址方式,好處是客戶端部署簡單,且 Name Server 集群可以熱升級。

發送消息時如何獲取路由信息

1、broker在啟動的時候通過參數autoCreateTopicEnable設置是否自動創建topic,默認為true,此時會創建一個名為TBW102(4.3版本已經改名為AUTO_CREATE_TOPIC_KEY)的topic(參見類TopicConfigManager),broker在向namesrv注冊時會把默認的topic注冊上去。如果設置false,則不會注冊。

2、producer在發送消息時會在本地獲取路由信息,第一次發送的話本地肯定沒有,就會去namesrv獲取,如果此時namesrv也沒有,則會獲取TBW102的topic信息(參見DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此為模板創建topic,然后選擇topic下的一臺broker發,broker創建后,會通過心跳注冊到namesrv上。

3、如果autoCreateTopicEnable設置false的話,producer發送消息會報找不到路由的異常,此時必須手動創建topic。

建議autoCreateTopicEnable設置false,基于以上第二步,自動創建topic后,以后所有該TOPIC的消息,都將發送到剛才選擇的這臺broke上,達不到負載均衡的目的。所以基于目前RocketMQ的設計,建議關閉自動創建TOPIC的功能,然后根據消息量的大小,手動創建TOPIC。

可以通過管理工具mqadmin來手動創建topic

sh mqadmin updateTopic -c [集群名稱] -n [nameserver地址] -t [topic名稱] -w [寫隊列數] -r [讀隊列數]

手動創建了Topic后,producer就可以輪詢的發送到不同的broker了。

topic的隊列數

這里講一下自動創建的topic的隊列數如何設置,首先broker創建的模板topic=AUTO_CREATE_TOPIC_KEY的隊列是8,參見類TopicConfigManager:

public TopicConfigManager(BrokerController brokerController) { 
    //省略無關代碼
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
        String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                                     .getDefaultTopicQueueNums());
        topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                                      .getDefaultTopicQueueNums());
        int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
        topicConfig.setPerm(perm);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
     //省略無關代碼
}

BrokerConfig:

private int defaultTopicQueueNums = 8;

DefaultMQProducer端默認知道要創建的topic的隊列數是4

private volatile int defaultTopicQueueNums = 4;

MQClientInstance類的方法updateTopicRouteInfoFromNameServer中有這樣一段邏輯:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
    //省略無關代碼
    for (QueueData data : topicRouteData.getQueueDatas()) {
        int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
        data.setReadQueueNums(queueNums);
        data.setWriteQueueNums(queueNums);
    }
    //省略無關代碼
 }

創建隊列是取兩者最小的一個,也就是4,所以要設置topic的隊列數量,很明顯了設置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。這是自動創建Topic時隊列數的設置方法,上面也提到生成環境一般不會開啟自動創建Topic的功能,可以通過上面的手動創建Topic的指令來設置讀寫隊列數。你可能注意到了Topic下有讀寫隊兩個隊列數,分別代表上面意思呢?讀寫隊列其實是個邏輯概念,一個broker下topic的總隊列數是以寫隊列為準,而讀隊列意思是允許多少隊列可以被消費者消費,也就是說讀多寫少的情況下,沒有問題,隊列都可以被消費掉,如果寫多讀少的話,那么就會存在隊列不會被消費的情況。

消息發送

前面我們講到了如何獲取topic的路由信息,如何創建topic的隊列數,一個topic下有多個隊列,又可以分布在不同的broker上面,所以topic的總隊列數應該是所有broker上的topic下隊列數的總和。

備注:如果手動在每個broker上分別創建topic的話,相同topic在不同broker上的隊列數可以不一樣。

那么問題來了,在發送消息時根據怎么樣的策略來選擇一個隊列發送呢?rocketmq提供了一個MQFaultStrategy策略類來負責選擇隊列,這里會有一個參數sendLatencyFaultEnable是否開啟延遲故障,

該值默認為false,在不開啟的情況下,相同線程發送消息是輪詢topic下的所有隊列,不同線程發送是隨機的,核心代碼如下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        //省略不必要的代碼......
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
//以上代碼邏輯參見類MQFaultStrategy.selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //省略不必要的代碼......
    }
}
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
//以上代碼邏輯參見類TopicPublishInfo
public int getAndIncrement() {
    Integer index = this.threadLocalIndex.get();//ThreadLocal中獲取
    if (null == index) {//為空,隨機生成一個
        index = Math.abs(random.nextInt());
        if (index < 0)
            index = 0;
        this.threadLocalIndex.set(index);
    }
    index = Math.abs(index + 1);
    if (index < 0)
        index = 0;
    this.threadLocalIndex.set(index);
    return index;
}
//以上代碼參見類ThreadLocalIndex

每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的情況下就是隨機生成一個,加1取絕對值后返回,再對隊列列表的長度取模,所以在同一線程中,會輪訓的從隊列列表獲取隊列。而如果是不同線程的話,index是隨機生成的,所以就是隨機從隊列列表中獲取。如下圖所示:

可以看到選擇隊列方法的入參有一個lastBrokerName的入參,此參數的目的是在發送消息失敗的情況下,producer會重試再次發送,而再次發送選擇的隊列需要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發送消息。

開啟延遲故障,每當發送完一次消息,不管成功還是失敗,都會把此次存儲消息的broker給保存下來,記錄故障情況下此broker需要延長多長時間才能再次發送,目前看到在代碼里面寫死了,故障下30s之內是不能再向此broker發送消息了。

消息重試

producer的send方法本身支持內部重試,重試邏輯如下:

1、最大重試次數默認2次,可以通過參數retryTimesWhenSendFailed設置

2、發送失敗,則輪詢到下一個broker,如果此時只有一個broker在線呢?那就會輪訓這個broker下的其他隊列。

3、這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認為3s。

如果發送消息,broker返回結果超時,這種超時不會進行重試了;如果是方法本身耗時超過sendMsgTimeout ,還未來得及調用發送消息,此時的超時也不會重試。

以上策略其實也很難保證同步發送消息一定成功,如果應用要保證消息不丟失,最好先把消息存儲到db,后臺啟線程定時重試,確保消息一定存儲到broker。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77288.html

相關文章

  • 高并發異步解耦利器:RocketMQ究竟強在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • RocketMQ源碼學習(一)-概述

    摘要:每個與集群中的所有節點建立長連接,定時注冊信息到所有。完全無狀態,可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • 如何解決MQ消息消費順序問題

    摘要:利用的高級特性特性是一種負載均衡的機制。在一個消息被分發到之前,首先檢查消息屬性。屬性為某個值的消息單個消息或消息集合在描述,和的對應關系,以及負載均衡策略時。同樣做到了保證消息的順序情況下,均衡消費的消費消息。 通常mq可以保證先到隊列的消息按照順序分發給消費者消費來保證順序,但是一個隊列有多個消費者消費的時候,那將失去這個保證,因為這些消息被多個線程并發的消費。但是有的時候消息按照...

    Atom 評論0 收藏0
  • SpringBoot RocketMQ 整合使用和監控

    摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監控信息如下總結整篇文章講述了與整合和監控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...

    Jacendfeng 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<