摘要:第至行從中獲得發布信息。第至行容錯策略選擇消息隊列邏輯。第至行執行發起網絡請求。第至行處理消息發送結果,設置響應結果和提示。第至行發送成功,響應。第至行消息格式與大小校驗。
摘要: 原創出處 http://www.iocoder.cn/RocketM... 「芋道源碼」歡迎轉載,保留摘要,謝謝!
本文主要基于 RocketMQ 4.0.x 正式版
1、概述
2、Producer 發送消息
DefaultMQProducer#send(Message)
DefaultMQProducerImpl#sendDefaultImpl()
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
MQFaultStrategy
MQFaultStrategy
LatencyFaultTolerance
LatencyFaultToleranceImpl
FaultItem
DefaultMQProducerImpl#sendKernelImpl()
3、Broker 接收消息
SendMessageProcessor#sendMessage
AbstractSendMessageProcessor#msgCheck
DefaultMessageStore#putMessage
4、某種結尾
1、概述???關注微信公眾號:【芋道源碼】有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源碼分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注釋源碼 GitHub 地址
您對于源碼的疑問每條留言都將得到認真回復。甚至不知道如何讀源碼也可以請教噢。
新的源碼解析文章實時收到通知。每周更新一篇左右。
認真的源碼交流微信群。
Producer 發送消息。主要是同步發送消息源碼,涉及到 異步/Oneway發送消息,事務消息會跳過。
Broker 接收消息。(存儲消息在《RocketMQ 源碼分析 —— Message 存儲》解析)
2、Producer 發送消息
DefaultMQProducer#send(Message)
1: @Override 2: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 3: return this.defaultMQProducerImpl.send(msg); 4: }
說明:發送同步消息,DefaultMQProducer#send(Message) 對 DefaultMQProducerImpl#send(Message) 進行封裝。
DefaultMQProducerImpl#sendDefaultImpl()1: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 2: return send(msg, this.defaultMQProducer.getSendMsgTimeout()); 3: } 4: 5: public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 6: return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); 7: } 8: 9: private SendResult sendDefaultImpl(// 10: Message msg, // 11: final CommunicationMode communicationMode, // 12: final SendCallback sendCallback, // 13: final long timeout// 14: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 15: // 校驗 Producer 處于運行狀態 16: this.makeSureStateOK(); 17: // 校驗消息格式 18: Validators.checkMessage(msg, this.defaultMQProducer); 19: // 20: final long invokeID = random.nextLong(); // 調用編號;用于下面打印日志,標記為同一次發送消息 21: long beginTimestampFirst = System.currentTimeMillis(); 22: long beginTimestampPrev = beginTimestampFirst; 23: long endTimestamp = beginTimestampFirst; 24: // 獲取 Topic路由信息 25: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 26: if (topicPublishInfo != null && topicPublishInfo.ok()) { 27: MessageQueue mq = null; // 最后選擇消息要發送到的隊列 28: Exception exception = null; 29: SendResult sendResult = null; // 最后一次發送結果 30: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次調用 31: int times = 0; // 第幾次發送 32: String[] brokersSent = new String[timesTotal]; // 存儲每次發送消息選擇的broker名 33: // 循環調用發送消息,直到成功 34: for (; times < timesTotal; times++) { 35: String lastBrokerName = null == mq ? null : mq.getBrokerName(); 36: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇消息要發送到的隊列 37: if (tmpmq != null) { 38: mq = tmpmq; 39: brokersSent[times] = mq.getBrokerName(); 40: try { 41: beginTimestampPrev = System.currentTimeMillis(); 42: // 調用發送消息核心方法 43: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 44: endTimestamp = System.currentTimeMillis(); 45: // 更新Broker可用性信息 46: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 47: switch (communicationMode) { 48: case ASYNC: 49: return null; 50: case ONEWAY: 51: return null; 52: case SYNC: 53: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { 54: if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步發送成功但存儲有問題時 && 配置存儲異常時重新發送開關 時,進行重試 55: continue; 56: } 57: } 58: return sendResult; 59: default: 60: break; 61: } 62: } catch (RemotingException e) { // 打印異常,更新Broker可用性信息,更新繼續循環 63: endTimestamp = System.currentTimeMillis(); 64: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 65: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 66: log.warn(msg.toString()); 67: exception = e; 68: continue; 69: } catch (MQClientException e) { // 打印異常,更新Broker可用性信息,繼續循環 70: endTimestamp = System.currentTimeMillis(); 71: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 72: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 73: log.warn(msg.toString()); 74: exception = e; 75: continue; 76: } catch (MQBrokerException e) { // 打印異常,更新Broker可用性信息,部分情況下的異常,直接返回,結束循環 77: endTimestamp = System.currentTimeMillis(); 78: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 79: log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 80: log.warn(msg.toString()); 81: exception = e; 82: switch (e.getResponseCode()) { 83: // 如下異常continue,進行發送消息重試 84: case ResponseCode.TOPIC_NOT_EXIST: 85: case ResponseCode.SERVICE_NOT_AVAILABLE: 86: case ResponseCode.SYSTEM_ERROR: 87: case ResponseCode.NO_PERMISSION: 88: case ResponseCode.NO_BUYER_ID: 89: case ResponseCode.NOT_IN_CURRENT_UNIT: 90: continue; 91: // 如果有發送結果,進行返回,否則,拋出異常; 92: default: 93: if (sendResult != null) { 94: return sendResult; 95: } 96: throw e; 97: } 98: } catch (InterruptedException e) { 99: endTimestamp = System.currentTimeMillis(); 100: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 101: log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); 102: log.warn(msg.toString()); 103: throw e; 104: } 105: } else { 106: break; 107: } 108: } 109: // 返回發送結果 110: if (sendResult != null) { 111: return sendResult; 112: } 113: // 根據不同情況,拋出不同的異常 114: String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, 115: msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); 116: MQClientException mqClientException = new MQClientException(info, exception); 117: if (exception instanceof MQBrokerException) { 118: mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); 119: } else if (exception instanceof RemotingConnectException) { 120: mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); 121: } else if (exception instanceof RemotingTimeoutException) { 122: mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); 123: } else if (exception instanceof MQClientException) { 124: mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); 125: } 126: throw mqClientException; 127: } 128: // Namesrv找不到異常 129: ListnsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); 130: if (null == nsList || nsList.isEmpty()) { 131: throw new MQClientException( 132: "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); 133: } 134: // 消息路由找不到異常 135: throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), 136: null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); 137: }
說明 :發送消息。步驟:獲取消息路由信息,選擇要發送到的消息隊列,執行消息發送核心方法,并對發送結果進行封裝返回。
第 1 至 7 行:對sendsendDefaultImpl(...)進行封裝。
第 20 行 :invokeID僅僅用于打印日志,無實際的業務用途。
第 25 行 :獲取 Topic路由信息, 詳細解析見:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
第 30 & 34 行 :計算調用發送消息到成功為止的最大次數,并進行循環。同步或異步發送消息會調用多次,默認配置為3次。
第 36 行 :選擇消息要發送到的隊列,詳細解析見:MQFaultStrategy
第 43 行 :調用發送消息核心方法,詳細解析見:DefaultMQProducerImpl#sendKernelImpl()
第 46 行 :更新Broker可用性信息。在選擇發送到的消息隊列時,會參考Broker發送消息的延遲,詳細解析見:MQFaultStrategy
第 62 至 68 行:當拋出RemotingException時,如果進行消息發送失敗重試,則可能導致消息發送重復。例如,發送消息超時(RemotingTimeoutException),實際Broker接收到該消息并處理成功。因此,Consumer在消費時,需要保證冪等性。
DefaultMQProducerImpl#tryToFindTopicPublishInfo()1: private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { 2: // 緩存中獲取 Topic發布信息 3: TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 4: // 當無可用的 Topic發布信息時,從Namesrv獲取一次 5: if (null == topicPublishInfo || !topicPublishInfo.ok()) { 6: this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); 7: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 8: topicPublishInfo = this.topicPublishInfoTable.get(topic); 9: } 10: // 若獲取的 Topic發布信息時候可用,則返回 11: if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 12: return topicPublishInfo; 13: } else { // 使用 {@link DefaultMQProducer#createTopicKey} 對應的 Topic發布信息。用于 Topic發布信息不存在 && Broker支持自動創建Topic 14: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 15: topicPublishInfo = this.topicPublishInfoTable.get(topic); 16: return topicPublishInfo; 17: } 18: }
說明 :獲得 Topic發布信息。優先從緩存topicPublishInfoTable,其次從Namesrv中獲得。
第 3 行 :從緩存topicPublishInfoTable中獲得 Topic發布信息。
第 5 至 9 行 :從 Namesrv 中獲得 Topic發布信息。
第 13 至 17 行 :當從 Namesrv 無法獲取時,使用 {@link DefaultMQProducer#createTopicKey} 對應的 Topic發布信息。目的是當 Broker 開啟自動創建 Topic開關時,Broker 接收到消息后自動創建Topic,詳細解析見《RocketMQ 源碼分析 —— Topic》。
MQFaultStrategyMQFaultStrategy
1: public class MQFaultStrategy { 2: private final static Logger log = ClientLogger.getLog(); 3: 4: /** 5: * 延遲故障容錯,維護每個Broker的發送消息的延遲 6: * key:brokerName 7: */ 8: private final LatencyFaultTolerancelatencyFaultTolerance = new LatencyFaultToleranceImpl(); 9: /** 10: * 發送消息延遲容錯開關 11: */ 12: private boolean sendLatencyFaultEnable = false; 13: /** 14: * 延遲級別數組 15: */ 16: private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; 17: /** 18: * 不可用時長數組 19: */ 20: private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; 21: 22: /** 23: * 根據 Topic發布信息 選擇一個消息隊列 24: * 25: * @param tpInfo Topic發布信息 26: * @param lastBrokerName brokerName 27: * @return 消息隊列 28: */ 29: public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { 30: if (this.sendLatencyFaultEnable) { 31: try { 32: // 獲取 brokerName=lastBrokerName && 可用的一個消息隊列 33: int index = tpInfo.getSendWhichQueue().getAndIncrement(); 34: for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { 35: int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 36: if (pos < 0) 37: pos = 0; 38: MessageQueue mq = tpInfo.getMessageQueueList().get(pos); 39: if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { 40: if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) 41: return mq; 42: } 43: } 44: // 選擇一個相對好的broker,并獲得其對應的一個消息隊列,不考慮該隊列的可用性 45: final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); 46: int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); 47: if (writeQueueNums > 0) { 48: final MessageQueue mq = tpInfo.selectOneMessageQueue(); 49: if (notBestBroker != null) { 50: mq.setBrokerName(notBestBroker); 51: mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); 52: } 53: return mq; 54: } else { 55: latencyFaultTolerance.remove(notBestBroker); 56: } 57: } catch (Exception e) { 58: log.error("Error occurred when selecting message queue", e); 59: } 60: // 選擇一個消息隊列,不考慮隊列的可用性 61: return tpInfo.selectOneMessageQueue(); 62: } 63: // 獲得 lastBrokerName 對應的一個消息隊列,不考慮該隊列的可用性 64: return tpInfo.selectOneMessageQueue(lastBrokerName); 65: } 66: 67: /** 68: * 更新延遲容錯信息 69: * 70: * @param brokerName brokerName 71: * @param currentLatency 延遲 72: * @param isolation 是否隔離。當開啟隔離時,默認延遲為30000。目前主要用于發送消息異常時 73: */ 74: public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { 75: if (this.sendLatencyFaultEnable) { 76: long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); 77: this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); 78: } 79: } 80: 81: /** 82: * 計算延遲對應的不可用時間 83: * 84: * @param currentLatency 延遲 85: * @return 不可用時間 86: */ 87: private long computeNotAvailableDuration(final long currentLatency) { 88: for (int i = latencyMax.length - 1; i >= 0; i--) { 89: if (currentLatency >= latencyMax[i]) 90: return this.notAvailableDuration[i]; 91: } 92: return 0; 93: }
說明 :Producer消息發送容錯策略。默認情況下容錯策略關閉,即sendLatencyFaultEnable=false。
第 30 至 62 行 :容錯策略選擇消息隊列邏輯。優先獲取可用隊列,其次選擇一個broker獲取隊列,最差返回任意broker的一個隊列。
第 64 行 :未開啟容錯策略選擇消息隊列邏輯。
第 74 至 79 行 :更新延遲容錯信息。當 Producer 發送消息時間過長,則邏輯認為N秒內不可用。按照latencyMax,notAvailableDuration的配置,對應如下:
| Producer發送消息消耗時長 | Broker不可用時長 | | --- | --- | | >= 15000 ms | 600 * 1000 ms | | >= 3000 ms | 180 * 1000 ms | | >= 2000 ms | 120 * 1000 ms | | >= 1000 ms | 60 * 1000 ms | | >= 550 ms | 30 * 1000 ms | | >= 100 ms | 0 ms | | >= 50 ms | 0 ms |LatencyFaultTolerance
1: public interface LatencyFaultTolerance{ 2: 3: /** 4: * 更新對應的延遲和不可用時長 5: * 6: * @param name 對象 7: * @param currentLatency 延遲 8: * @param notAvailableDuration 不可用時長 9: */ 10: void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); 11: 12: /** 13: * 對象是否可用 14: * 15: * @param name 對象 16: * @return 是否可用 17: */ 18: boolean isAvailable(final T name); 19: 20: /** 21: * 移除對象 22: * 23: * @param name 對象 24: */ 25: void remove(final T name); 26: 27: /** 28: * 獲取一個對象 29: * 30: * @return 對象 31: */ 32: T pickOneAtLeast(); 33: }
說明 :延遲故障容錯接口
LatencyFaultToleranceImpl1: public class LatencyFaultToleranceImpl implements LatencyFaultTolerance{ 2: 3: /** 4: * 對象故障信息Table 5: */ 6: private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap<>(16); 7: /** 8: * 對象選擇Index 9: * @see #pickOneAtLeast() 10: */ 11: private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); 12: 13: @Override 14: public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { 15: FaultItem old = this.faultItemTable.get(name); 16: if (null == old) { 17: // 創建對象 18: final FaultItem faultItem = new FaultItem(name); 19: faultItem.setCurrentLatency(currentLatency); 20: faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); 21: // 更新對象 22: old = this.faultItemTable.putIfAbsent(name, faultItem); 23: if (old != null) { 24: old.setCurrentLatency(currentLatency); 25: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); 26: } 27: } else { // 更新對象 28: old.setCurrentLatency(currentLatency); 29: old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); 30: } 31: } 32: 33: @Override 34: public boolean isAvailable(final String name) { 35: final FaultItem faultItem = this.faultItemTable.get(name); 36: if (faultItem != null) { 37: return faultItem.isAvailable(); 38: } 39: return true; 40: } 41: 42: @Override 43: public void remove(final String name) { 44: this.faultItemTable.remove(name); 45: } 46: 47: /** 48: * 選擇一個相對優秀的對象 49: * 50: * @return 對象 51: */ 52: @Override 53: public String pickOneAtLeast() { 54: // 創建數組 55: final Enumeration elements = this.faultItemTable.elements(); 56: List tmpList = new LinkedList<>(); 57: while (elements.hasMoreElements()) { 58: final FaultItem faultItem = elements.nextElement(); 59: tmpList.add(faultItem); 60: } 61: // 62: if (!tmpList.isEmpty()) { 63: // 打亂 + 排序。TODO 疑問:應該只能二選一。猜測Collections.shuffle(tmpList)去掉。 64: Collections.shuffle(tmpList); 65: Collections.sort(tmpList); 66: // 選擇順序在前一半的對象 67: final int half = tmpList.size() / 2; 68: if (half <= 0) { 69: return tmpList.get(0).getName(); 70: } else { 71: final int i = this.whichItemWorst.getAndIncrement() % half; 72: return tmpList.get(i).getName(); 73: } 74: } 75: return null; 76: } 77: }
說明 :延遲故障容錯實現。維護每個對象的信息。
FaultItem1: class FaultItem implements Comparable{ 2: /** 3: * 對象名 4: */ 5: private final String name; 6: /** 7: * 延遲 8: */ 9: private volatile long currentLatency; 10: /** 11: * 開始可用時間 12: */ 13: private volatile long startTimestamp; 14: 15: public FaultItem(final String name) { 16: this.name = name; 17: } 18: 19: /** 20: * 比較對象 21: * 可用性 > 延遲 > 開始可用時間 22: * 23: * @param other other 24: * @return 升序 25: */ 26: @Override 27: public int compareTo(final FaultItem other) { 28: if (this.isAvailable() != other.isAvailable()) { 29: if (this.isAvailable()) 30: return -1; 31: 32: if (other.isAvailable()) 33: return 1; 34: } 35: 36: if (this.currentLatency < other.currentLatency) 37: return -1; 38: else if (this.currentLatency > other.currentLatency) { 39: return 1; 40: } 41: 42: if (this.startTimestamp < other.startTimestamp) 43: return -1; 44: else if (this.startTimestamp > other.startTimestamp) { 45: return 1; 46: } 47: 48: return 0; 49: } 50: 51: /** 52: * 是否可用:當開始可用時間大于當前時間 53: * 54: * @return 是否可用 55: */ 56: public boolean isAvailable() { 57: return (System.currentTimeMillis() - startTimestamp) >= 0; 58: } 59: 60: @Override 61: public int hashCode() { 62: int result = getName() != null ? getName().hashCode() : 0; 63: result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); 64: result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); 65: return result; 66: } 67: 68: @Override 69: public boolean equals(final Object o) { 70: if (this == o) 71: return true; 72: if (!(o instanceof FaultItem)) 73: return false; 74: 75: final FaultItem faultItem = (FaultItem) o; 76: 77: if (getCurrentLatency() != faultItem.getCurrentLatency()) 78: return false; 79: if (getStartTimestamp() != faultItem.getStartTimestamp()) 80: return false; 81: return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; 82: 83: } 84: }
說明 :對象故障信息。維護對象的名字、延遲、開始可用的時間。
DefaultMQProducerImpl#sendKernelImpl()1: private SendResult sendKernelImpl(final Message msg, // 2: final MessageQueue mq, // 3: final CommunicationMode communicationMode, // 4: final SendCallback sendCallback, // 5: final TopicPublishInfo topicPublishInfo, // 6: final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 7: // 獲取 broker地址 8: String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 9: if (null == brokerAddr) { 10: tryToFindTopicPublishInfo(mq.getTopic()); 11: brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); 12: } 13: // 14: SendMessageContext context = null; 15: if (brokerAddr != null) { 16: // 是否使用broker vip通道。broker會開啟兩個端口對外服務。 17: brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); 18: byte[] prevBody = msg.getBody(); // 記錄消息內容。下面邏輯可能改變消息內容,例如消息壓縮。 19: try { 20: // 設置唯一編號 21: MessageClientIDSetter.setUniqID(msg); 22: // 消息壓縮 23: int sysFlag = 0; 24: if (this.tryToCompressMessage(msg)) { 25: sysFlag |= MessageSysFlag.COMPRESSED_FLAG; 26: } 27: // 事務 28: final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 29: if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { 30: sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; 31: } 32: // hook:發送消息校驗 33: if (hasCheckForbiddenHook()) { 34: CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); 35: checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); 36: checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); 37: checkForbiddenContext.setCommunicationMode(communicationMode); 38: checkForbiddenContext.setBrokerAddr(brokerAddr); 39: checkForbiddenContext.setMessage(msg); 40: checkForbiddenContext.setMq(mq); 41: checkForbiddenContext.setUnitMode(this.isUnitMode()); 42: this.executeCheckForbiddenHook(checkForbiddenContext); 43: } 44: // hook:發送消息前邏輯 45: if (this.hasSendMessageHook()) { 46: context = new SendMessageContext(); 47: context.setProducer(this); 48: context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); 49: context.setCommunicationMode(communicationMode); 50: context.setBornHost(this.defaultMQProducer.getClientIP()); 51: context.setBrokerAddr(brokerAddr); 52: context.setMessage(msg); 53: context.setMq(mq); 54: String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 55: if (isTrans != null && isTrans.equals("true")) { 56: context.setMsgType(MessageType.Trans_Msg_Half); 57: } 58: if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { 59: context.setMsgType(MessageType.Delay_Msg); 60: } 61: this.executeSendMessageHookBefore(context); 62: } 63: // 構建發送消息請求 64: SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); 65: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); 66: requestHeader.setTopic(msg.getTopic()); 67: requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); 68: requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); 69: requestHeader.setQueueId(mq.getQueueId()); 70: requestHeader.setSysFlag(sysFlag); 71: requestHeader.setBornTimestamp(System.currentTimeMillis()); 72: requestHeader.setFlag(msg.getFlag()); 73: requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); 74: requestHeader.setReconsumeTimes(0); 75: requestHeader.setUnitMode(this.isUnitMode()); 76: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重發Topic 77: String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); 78: if (reconsumeTimes != null) { 79: requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); 80: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); 81: } 82: String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); 83: if (maxReconsumeTimes != null) { 84: requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); 85: MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); 86: } 87: } 88: // 發送消息 89: SendResult sendResult = null; 90: switch (communicationMode) { 91: case ASYNC: 92: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// 93: brokerAddr, // 1 94: mq.getBrokerName(), // 2 95: msg, // 3 96: requestHeader, // 4 97: timeout, // 5 98: communicationMode, // 6 99: sendCallback, // 7 100: topicPublishInfo, // 8 101: this.mQClientFactory, // 9 102: this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 103: context, // 104: this); 105: break; 106: case ONEWAY: 107: case SYNC: 108: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( 109: brokerAddr, 110: mq.getBrokerName(), 111: msg, 112: requestHeader, 113: timeout, 114: communicationMode, 115: context, 116: this); 117: break; 118: default: 119: assert false; 120: break; 121: } 122: // hook:發送消息后邏輯 123: if (this.hasSendMessageHook()) { 124: context.setSendResult(sendResult); 125: this.executeSendMessageHookAfter(context); 126: } 127: // 返回發送結果 128: return sendResult; 129: } catch (RemotingException e) { 130: if (this.hasSendMessageHook()) { 131: context.setException(e); 132: this.executeSendMessageHookAfter(context); 133: } 134: throw e; 135: } catch (MQBrokerException e) { 136: if (this.hasSendMessageHook()) { 137: context.setException(e); 138: this.executeSendMessageHookAfter(context); 139: } 140: throw e; 141: } catch (InterruptedException e) { 142: if (this.hasSendMessageHook()) { 143: context.setException(e); 144: this.executeSendMessageHookAfter(context); 145: } 146: throw e; 147: } finally { 148: msg.setBody(prevBody); 149: } 150: } 151: // broker為空拋出異常 152: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 153: }
說明 :發送消息核心方法。該方法真正發起網絡請求,發送消息給 Broker。
第 21 行 :生產消息編號,詳細解析見《RocketMQ 源碼分析 —— Message 基礎》。
第 64 至 121 行 :構建發送消息請求SendMessageRequestHeader。
第 107 至 117 行 :執行 MQClientInstance#sendMessage(...) 發起網絡請求。
3、Broker 接收消息SendMessageProcessor#sendMessage
1: @Override 2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { 3: SendMessageContext mqtraceContext; 4: switch (request.getCode()) { 5: case RequestCode.CONSUMER_SEND_MSG_BACK: 6: return this.consumerSendMsgBack(ctx, request); 7: default: 8: // 解析請求 9: SendMessageRequestHeader requestHeader = parseRequestHeader(request); 10: if (requestHeader == null) { 11: return null; 12: } 13: // 發送請求Context。在 hook 場景下使用 14: mqtraceContext = buildMsgContext(ctx, requestHeader); 15: // hook:處理發送消息前邏輯 16: this.executeSendMessageHookBefore(ctx, request, mqtraceContext); 17: // 處理發送消息邏輯 18: final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); 19: // hook:處理發送消息后邏輯 20: this.executeSendMessageHookAfter(response, mqtraceContext); 21: return response; 22: } 23: } 24: 25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // 26: final RemotingCommand request, // 27: final SendMessageContext sendMessageContext, // 28: final SendMessageRequestHeader requestHeader) throws RemotingCommandException { 29: 30: // 初始化響應 31: final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); 32: final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); 33: response.setOpaque(request.getOpaque()); 34: response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); 35: response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); 36: 37: if (log.isDebugEnabled()) { 38: log.debug("receive SendMessage request command, {}", request); 39: } 40: 41: // 如果未開始接收消息,拋出系統異常 42: @SuppressWarnings("SpellCheckingInspection") 43: final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); 44: if (this.brokerController.getMessageStore().now() < startTimstamp) { 45: response.setCode(ResponseCode.SYSTEM_ERROR); 46: response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); 47: return response; 48: } 49: 50: // 消息配置(Topic配置)校驗 51: response.setCode(-1); 52: super.msgCheck(ctx, requestHeader, response); 53: if (response.getCode() != -1) { 54: return response; 55: } 56: 57: final byte[] body = request.getBody(); 58: 59: // 如果隊列小于0,從可用隊列隨機選擇 60: int queueIdInt = requestHeader.getQueueId(); 61: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); 62: if (queueIdInt < 0) { 63: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); 64: } 65: 66: // 67: int sysFlag = requestHeader.getSysFlag(); 68: if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { 69: sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; 70: } 71: 72: // 對RETRY類型的消息處理。如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名,即加入 死信隊列(Dead Letter Queue) 73: String newTopic = requestHeader.getTopic(); 74: if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 75: // 獲取訂閱分組配置 76: String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); 77: SubscriptionGroupConfig subscriptionGroupConfig = 78: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); 79: if (null == subscriptionGroupConfig) { 80: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); 81: response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); 82: return response; 83: } 84: // 計算最大可消費次數 85: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 86: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 87: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 88: } 89: int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); 90: if (reconsumeTimes >= maxReconsumeTimes) { // 超過最大消費次數 91: newTopic = MixAll.getDLQTopic(groupName); 92: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; 93: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // 94: DLQ_NUMS_PER_GROUP, // 95: PermName.PERM_WRITE, 0 96: ); 97: if (null == topicConfig) { 98: response.setCode(ResponseCode.SYSTEM_ERROR); 99: response.setRemark("topic[" + newTopic + "] not exist"); 100: return response; 101: } 102: } 103: } 104: 105: // 創建MessageExtBrokerInner 106: MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 107: msgInner.setTopic(newTopic); 108: msgInner.setBody(body); 109: msgInner.setFlag(requestHeader.getFlag()); 110: MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); 111: msgInner.setPropertiesString(requestHeader.getProperties()); 112: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); 113: msgInner.setQueueId(queueIdInt); 114: msgInner.setSysFlag(sysFlag); 115: msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); 116: msgInner.setBornHost(ctx.channel().remoteAddress()); 117: msgInner.setStoreHost(this.getStoreHost()); 118: msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); 119: 120: // 校驗是否不允許發送事務消息 121: if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { 122: String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); 123: if (traFlag != null) { 124: response.setCode(ResponseCode.NO_PERMISSION); 125: response.setRemark( 126: "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); 127: return response; 128: } 129: } 130: 131: // 添加消息 132: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); 133: if (putMessageResult != null) { 134: boolean sendOK = false; 135: 136: switch (putMessageResult.getPutMessageStatus()) { 137: // Success 138: case PUT_OK: 139: sendOK = true; 140: response.setCode(ResponseCode.SUCCESS); 141: break; 142: case FLUSH_DISK_TIMEOUT: 143: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); 144: sendOK = true; 145: break; 146: case FLUSH_SLAVE_TIMEOUT: 147: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); 148: sendOK = true; 149: break; 150: case SLAVE_NOT_AVAILABLE: 151: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); 152: sendOK = true; 153: break; 154: 155: // Failed 156: case CREATE_MAPEDFILE_FAILED: 157: response.setCode(ResponseCode.SYSTEM_ERROR); 158: response.setRemark("create mapped file failed, server is busy or broken."); 159: break; 160: case MESSAGE_ILLEGAL: 161: case PROPERTIES_SIZE_EXCEEDED: 162: response.setCode(ResponseCode.MESSAGE_ILLEGAL); 163: response.setRemark( 164: "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); 165: break; 166: case SERVICE_NOT_AVAILABLE: 167: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); 168: response.setRemark( 169: "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); 170: break; 171: case OS_PAGECACHE_BUSY: 172: response.setCode(ResponseCode.SYSTEM_ERROR); 173: response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); 174: break; 175: case UNKNOWN_ERROR: 176: response.setCode(ResponseCode.SYSTEM_ERROR); 177: response.setRemark("UNKNOWN_ERROR"); 178: break; 179: default: 180: response.setCode(ResponseCode.SYSTEM_ERROR); 181: response.setRemark("UNKNOWN_ERROR DEFAULT"); 182: break; 183: } 184: 185: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); 186: if (sendOK) { 187: // 統計 188: this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); 189: this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); 190: this.brokerController.getBrokerStatsManager().incBrokerPutNums(); 191: 192: // 響應 193: response.setRemark(null); 194: responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); 195: responseHeader.setQueueId(queueIdInt); 196: responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); 197: doResponse(ctx, request, response); 198: 199: // hook:設置發送成功到context 200: if (hasSendMessageHook()) { 201: sendMessageContext.setMsgId(responseHeader.getMsgId()); 202: sendMessageContext.setQueueId(responseHeader.getQueueId()); 203: sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); 204: 205: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); 206: int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); 207: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; 208: 209: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); 210: sendMessageContext.setCommercialSendTimes(incValue); 211: sendMessageContext.setCommercialSendSize(wroteSize); 212: sendMessageContext.setCommercialOwner(owner); 213: } 214: return null; 215: } else { 216: // hook:設置發送失敗到context 217: if (hasSendMessageHook()) { 218: int wroteSize = request.getBody().length; 219: int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); 220: 221: sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); 222: sendMessageContext.setCommercialSendTimes(incValue); 223: sendMessageContext.setCommercialSendSize(wroteSize); 224: sendMessageContext.setCommercialOwner(owner); 225: } 226: } 227: } else { 228: response.setCode(ResponseCode.SYSTEM_ERROR); 229: response.setRemark("store putMessage return null"); 230: } 231: 232: return response; 233: }
#processRequest() 說明 :處理消息請求。
#sendMessage() 說明 :發送消息,并返回發送消息結果。
第 51 至 55 行 :消息配置(Topic配置)校驗,詳細解析見:AbstractSendMessageProcessor#msgCheck()。
第 60 至 64 行 :消息隊列編號小于0時,Broker 可以設置隨機選擇一個消息隊列。
第 72 至 103 行 :對RETRY類型的消息處理。如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名, 即加 死信隊 (Dead Letter Queue),詳細解析見:《RocketMQ 源碼分析 —— Topic》。
第 105 至 118 行 :創建MessageExtBrokerInner。
第 132 :存儲消息,詳細解析見:DefaultMessageStore#putMessage()。
第 133 至 183 行 :處理消息發送結果,設置響應結果和提示。
第 186 至 214 行 :發送成功,響應。這里doResponse(ctx, request, response)進行響應,最后return null,原因是:響應給 Producer 可能發生異常,#doResponse(ctx, request, response)捕捉了該異常并輸出日志。這樣做的話,我們進行排查 Broker 接收消息成功后響應是否存在異常會方便很多。
AbstractSendMessageProcessor#msgCheck1: protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, 2: final SendMessageRequestHeader requestHeader, final RemotingCommand response) { 3: // 檢查 broker 是否有寫入權限 4: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) 5: && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { 6: response.setCode(ResponseCode.NO_PERMISSION); 7: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() 8: + "] sending message is forbidden"); 9: return response; 10: } 11: // 檢查topic是否可以被發送。目前是{@link MixAll.DEFAULT_TOPIC}不被允許發送 12: if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { 13: String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; 14: log.warn(errorMsg); 15: response.setCode(ResponseCode.SYSTEM_ERROR); 16: response.setRemark(errorMsg); 17: return response; 18: } 19: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); 20: if (null == topicConfig) { // 不能存在topicConfig,則進行創建 21: int topicSysFlag = 0; 22: if (requestHeader.isUnitMode()) { 23: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 24: topicSysFlag = TopicSysFlag.buildSysFlag(false, true); 25: } else { 26: topicSysFlag = TopicSysFlag.buildSysFlag(true, false); 27: } 28: } 29: // 創建topic配置 30: log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); 31: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// 32: requestHeader.getTopic(), // 33: requestHeader.getDefaultTopic(), // 34: RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // 35: requestHeader.getDefaultTopicQueueNums(), topicSysFlag); 36: if (null == topicConfig) { 37: if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 38: topicConfig = 39: this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( 40: requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, 41: topicSysFlag); 42: } 43: } 44: // 如果沒配置 45: if (null == topicConfig) { 46: response.setCode(ResponseCode.TOPIC_NOT_EXIST); 47: response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" 48: + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); 49: return response; 50: } 51: } 52: // 隊列編號是否正確 53: int queueIdInt = requestHeader.getQueueId(); 54: int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); 55: if (queueIdInt >= idValid) { 56: String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", 57: queueIdInt, 58: topicConfig.toString(), 59: RemotingHelper.parseChannelRemoteAddr(ctx.channel())); 60: log.warn(errorInfo); 61: response.setCode(ResponseCode.SYSTEM_ERROR); 62: response.setRemark(errorInfo); 63: return response; 64: } 65: return response; 66: }
說明:校驗消息是否正確,主要是Topic配置方面,例如:Broker 是否有寫入權限,topic配置是否存在,隊列編號是否正確。
第 11 至 18 行 :檢查Topic是否可以被發送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允許發送。
第 20 至 51 行 :當找不到Topic配置,則進行創建。當然,創建會存在不成功的情況,例如說:defaultTopic 的Topic配置不存在,又或者是 存在但是不允許繼承,詳細解析見《RocketMQ 源碼分析 —— Topic》。
DefaultMessageStore#putMessage1: public PutMessageResult putMessage(MessageExtBrokerInner msg) { 2: if (this.shutdown) { 3: log.warn("message store has shutdown, so putMessage is forbidden"); 4: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 5: } 6: 7: // 從節點不允許寫入 8: if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { 9: long value = this.printTimes.getAndIncrement(); 10: if ((value % 50000) == 0) { 11: log.warn("message store is slave mode, so putMessage is forbidden "); 12: } 13: 14: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 15: } 16: 17: // store是否允許寫入 18: if (!this.runningFlags.isWriteable()) { 19: long value = this.printTimes.getAndIncrement(); 20: if ((value % 50000) == 0) { 21: log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); 22: } 23: 24: return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); 25: } else { 26: this.printTimes.set(0); 27: } 28: 29: // 消息過長 30: if (msg.getTopic().length() > Byte.MAX_VALUE) { 31: log.warn("putMessage message topic length too long " + msg.getTopic().length()); 32: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); 33: } 34: 35: // 消息附加屬性過長 36: if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { 37: log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); 38: return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); 39: } 40: 41: if (this.isOSPageCacheBusy()) { 42: return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 43: } 44: 45: long beginTime = this.getSystemClock().now(); 46: // 添加消息到commitLog 47: PutMessageResult result = this.commitLog.putMessage(msg); 48: 49: long eclipseTime = this.getSystemClock().now() - beginTime; 50: if (eclipseTime > 500) { 51: log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); 52: } 53: this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); 54: 55: if (null == result || !result.isOk()) { 56: this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 57: } 58: 59: return result; 60: }
說明:存儲消息封裝,最終存儲需要 CommitLog 實現。
第 7 至 27 行 :校驗 Broker 是否可以寫入。
第 29 至 39 行 :消息格式與大小校驗。
第 47 行 :調用 CommitLong 進行存儲,詳細邏輯見:《RocketMQ 源碼分析 —— Message 存儲》
4、某種結尾感謝閱讀、收藏、點贊本文的工程師同學。
閱讀源碼是件令自己很愉悅的事情,編寫源碼解析是讓自己腦細胞死傷無數的過程,痛并快樂著。
如果有內容寫的存在錯誤,或是不清晰的地方,見笑了,?。歡迎加 QQ:7685413 我們一起探討,共進步。
再次感謝閱讀、收藏、點贊本文的工程師同學。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67909.html
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...
摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。既然消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務消息。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經歷了多年的雙十一高并發挑戰...
摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發產生的事務消息存儲在消息隊列中,從而削平高峰期的并發事務。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數接近 16k)。地址:https://github.com/Snailclimb... 本文內容思維導圖:showImg(ht...
摘要:利用的高級特性特性是一種負載均衡的機制。在一個消息被分發到之前,首先檢查消息屬性。屬性為某個值的消息單個消息或消息集合在描述,和的對應關系,以及負載均衡策略時。同樣做到了保證消息的順序情況下,均衡消費的消費消息。 通常mq可以保證先到隊列的消息按照順序分發給消費者消費來保證順序,但是一個隊列有多個消費者消費的時候,那將失去這個保證,因為這些消息被多個線程并發的消費。但是有的時候消息按照...
閱讀 4751·2021-11-15 11:39
閱讀 2698·2021-11-11 16:55
閱讀 2206·2021-10-25 09:44
閱讀 3511·2021-09-22 16:02
閱讀 2441·2019-08-30 15:55
閱讀 3129·2019-08-30 13:46
閱讀 2670·2019-08-30 13:15
閱讀 1958·2019-08-30 11:12