摘要:發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送消費(fèi)情況,只有網(wǎng)絡(luò)異常,啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。
問題列表
Broker 怎么響應(yīng)Consumer請求?
Broker 怎么維護(hù)ConsumeQueue?
Broker 怎么處理事務(wù)消息的 ConsumeQueue ?
Broker 怎么處理定時(shí)消息的 ConsumeQueue?
Broker 怎么處理回溯消費(fèi)請求?
Broker 的消息是 at least once還是exactly only once?
怎么響應(yīng)Consumer請求?
原理:
如上圖所示,RocketMQ將所有消息都放在CommitLog里面,消費(fèi)是維護(hù)一個(gè)ConsumeQueue幫助Consumer消費(fèi).pull操作要讀兩次,先讀ConsumeQueue得到offset,再讀CommitLog得到消息內(nèi)容.
ConsumeQueue有一個(gè)長度20的ByteBufferb變量byteBufferIndex,里面維護(hù)者消息偏移量,消息發(fā)現(xiàn),tags的hashcode
//消息偏移量 this.byteBufferIndex.putLong(offset); //消息大小 this.byteBufferIndex.putInt(size); //tags的HashCode this.byteBufferIndex.putLong(tagsCode);
BrokerController.initialize方法中會(huì)注冊PullMessageProcessor來處理pull message 請求
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
在PullMessageProcessor.processRequest中又委托給DefaultMessageStore獲取
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
DefaultMessageStore.getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { if (this.shutdown) { log.warn("message store has shutdown, so getMessage is forbidden"); return null; } if (!this.runningFlags.isReadable()) { log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); return null; } long beginTime = this.getSystemClock().now(); GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long nextBeginOffset = offset; long minOffset = 0; long maxOffset = 0; GetMessageResult getResult = new GetMessageResult(); final long maxOffsetPy = this.commitLog.getMaxOffset(); //根據(jù)topic和queueId找到ConsumeQueue ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { //前面都在處理異常,這里開始真正獲取 //獲取consumeQueue offset之后所有可讀的offset,consumeQueue也是文件存儲 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); //循環(huán)單個(gè)獲取 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } boolean extRet = false; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can"t find ext content.Client will filter messages by tag also. log.error("[BUG] can"t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); } } if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } //根據(jù)偏移量和大小從CommitLog獲取消息 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); //將查到的消息放入返回值 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { bufferConsumeQueue.release(); } } else { status = GetMessageStatus.OFFSET_FOUND_NULL; nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + maxOffset + ", but access logic queue failed."); } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); } else { this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); } long eclipseTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime); getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }
CommitLog.getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); //根據(jù)offset獲取到映射文件 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); //從映射文件中獲取指定位置的數(shù)據(jù) return mappedFile.selectMappedBuffer(pos, size); } return null; }
MappedFile.selectMappedBuffer
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { int readPosition = getReadPosition(); if ((pos + size) <= readPosition) { if (this.hold()) { //Java NIO 獲取指定pos數(shù)據(jù) ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } else { log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " + this.fileFromOffset); } } else { log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size + ", fileFromOffset: " + this.fileFromOffset); } return null; }
消息獲取流程明白了,看下怎么獲取ConsumeQueue,它是維護(hù)在一個(gè)以queueId為key的ConcurrentMap中,沒有就新建一個(gè).
public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentMapBroker 怎么維護(hù)ConsumeQueue?map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap newMap = new ConcurrentHashMap (128); ConcurrentMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } ConsumeQueue logic = map.get(queueId); if (null == logic) { //m ConsumeQueue newLogic = new ConsumeQueue(// topic, // queueId, // StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
很容易的想到,是在存放Message的時(shí)候維護(hù)了ConsumeQueue.但是相關(guān)代碼沒找到,我們從頂層往上層找
CosumeQueue.putMessagePositionInfo
-> CosumeQueue.putMessagePositionInfoWrapper
-> DefaultMessageStore.putMessagePositionInfo
-> CommitLogDispatcherBuildConsumeQueue.dispatch
-> DefaultMessageStore.doDispatch
-> ReputMessageService.doReput
private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } //從CommitLog中讀取上次偏移量之后的新消息 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); //順序讀 for (int readSize = 0; readSize < result.getSize() && doNext; ) { //構(gòu)建請求去維護(hù)consumeQueue的請求 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } // FIXED BUG By shijia this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
邏輯:啟個(gè)線程,不斷地查詢有沒有新的提交,如果有就請求維護(hù)consumeQueue.
Broker 怎么處理事務(wù)消息的ConsumeQueue?邏輯:
1.prepare消息不會(huì)放入ConsumeQueue
2.commit消息才能被ConsumeQueue
CommitLogDispatcherBuildConsumeQueue.dispatch
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: // commit操作會(huì)放入consumeQueue case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; // prepare消息不會(huì)放入ConsumeQueue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }Broker 怎么處理定時(shí)消息的 ConsumeQueue?
Producer那篇有講,定時(shí)消息是吧topic和queueId存儲到屬性中,真正存儲與ScheduleTopic上,時(shí)間到了才放入原有topic,故ConsumeQueue無需特殊維護(hù).
Broker 怎么處理回溯消費(fèi)請求?邏輯:
1.在ConsumeQueue根據(jù)時(shí)間查詢offset
2.請求Consumer Group中所有Consumer重置offset
代碼:
AdminBrokerProcessor.processRequest
case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET: return this.resetOffset(ctx, request);
Broker2Client.resetOffset
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { case CPP: isC = true; break; } return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce(), isC); }
主要邏輯在此方法
Broker2Client.resetOffset
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, boolean isC) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic); return response; } MapoffsetTable = new HashMap (); for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setTopic(topic); mq.setQueueId(i); long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); if (-1 == consumerOffset) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("THe consumer group <%s> not exist", group)); return response; } long timeStampOffset; if (timeStamp == -1) { timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { //根據(jù)時(shí)間找offset timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); } if (timeStampOffset < 0) { log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); timeStampOffset = 0; } if (isForce || timeStampOffset < consumerOffset) { offsetTable.put(mq, timeStampOffset); } else { offsetTable.put(mq, consumerOffset); } } ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timeStamp); //給Consumer發(fā)送請求重置offset RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); if (isC) { // c++ language ResetOffsetBodyForC body = new ResetOffsetBodyForC(); List offsetList = convertOffsetTable2OffsetList(offsetTable); body.setOffsetTable(offsetList); request.setBody(body.encode()); } else { // other language ResetOffsetBody body = new ResetOffsetBody(); body.setOffsetTable(offsetTable); request.setBody(body.encode()); } ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { ConcurrentMap channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { try { this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[] {topic, group}, e); } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[reset-offset] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return response; } } } else { String errorInfo = String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp()); log.error(errorInfo); response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); response.setRemark(errorInfo); return response; } response.setCode(ResponseCode.SUCCESS); ResetOffsetBody resBody = new ResetOffsetBody(); resBody.setOffsetTable(offsetTable); response.setBody(resBody.encode()); return response; }
ConsumeQueue中沒有時(shí)間戳啊,還是得去CommitLog中找
public long getOffsetInQueueByTime(final long timestamp) { MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); if (mappedFile != null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; long leftIndexValue = -1L, rightIndexValue = -1L; long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); if (null != sbr) { ByteBuffer byteBuffer = sbr.getByteBuffer(); high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; try { //二分查找? while (high >= low) { midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE; byteBuffer.position(midOffset); long phyOffset = byteBuffer.getLong(); int size = byteBuffer.getInt(); if (phyOffset < minPhysicOffset) { low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; continue; } //獲取指定偏移量的時(shí)間戳 long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { targetOffset = midOffset; break; } else if (storeTime > timestamp) { high = midOffset - CQ_STORE_UNIT_SIZE; rightOffset = midOffset; rightIndexValue = storeTime; } else { low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; leftIndexValue = storeTime; } } if (targetOffset != -1) { offset = targetOffset; } else { if (leftIndexValue == -1) { offset = rightOffset; } else if (rightIndexValue == -1) { offset = leftOffset; } else { offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset; } } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; } finally { sbr.release(); } } } return 0; }
再來關(guān)注下消息存儲時(shí)存儲的額外信息 MessageExt
// 隊(duì)列IDBroker 的消息是 at least once還是exactly only once?private int queueId; //存儲消息大小 private int storeSize; //隊(duì)列的offset private long queueOffset; //消息標(biāo)志位 private int sysFlag; //消息在客戶端創(chuàng)建時(shí)間戳 private long bornTimestamp; //producer 地址 private SocketAddress bornHost; //消息在服務(wù)器存儲時(shí)間戳 private long storeTimestamp; //存儲的broker地址 private SocketAddress storeHost; //消息id private String msgId; //消息對應(yīng)的Commit Log Offset private long commitLogOffset; //消息體CRC private int bodyCRC; // 當(dāng)前消息被某個(gè)訂閱組重新消費(fèi)了幾次(訂閱組之間獨(dú)立計(jì)數(shù)) private int reconsumeTimes;
at least once:
是指每個(gè)消息必須投遞一次,RocketMQ Consumer 先 pull 消息到本地,消費(fèi)完成后,才向服務(wù)器返回 ack,如果沒有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性。
exactly only once:
(1). 發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。
(2). 消費(fèi)消息階段,不允許消費(fèi)重復(fù)的消息。只有以上兩個(gè)條件都滿足情況下,才能認(rèn)為消息是“Exactly Only Once”,而要實(shí)現(xiàn)以上兩點(diǎn),在分布式系統(tǒng)環(huán)境下,不可避免要產(chǎn)生巨大的開銷。所以 RocketMQ 為了追求高性能,并不保證此特性,要求在業(yè)務(wù)上進(jìn)行去重, 也就是說消費(fèi)消息要做到冪等性。RocketMQ 雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送、消 費(fèi)情況,只有網(wǎng)絡(luò)異常,Consumer 啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。此問題的本質(zhì)原因是網(wǎng)絡(luò)調(diào)用存在不確定性,即既不成功也不失敗的第三種狀態(tài),所以才產(chǎn)生了消息重復(fù)性問 題。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70367.html
摘要:完全無狀態(tài),可集群部署與集群中的其中一個(gè)節(jié)點(diǎn)隨機(jī)選擇建立長連接,定期從取路由信息,并向提供服務(wù)的建立長連接,且定時(shí)向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...
摘要:如果干凈頁不足,此時(shí)寫入會(huì)被阻塞,系統(tǒng)嘗試刷盤部分?jǐn)?shù)據(jù),大約每次嘗試個(gè),異步刷盤消息收到后返回同時(shí)調(diào)用消息存儲過程異常情況下可能造成少量數(shù)據(jù)丟失來找出更多干凈。刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。 這次源碼學(xué)習(xí)的方法是帶著問題學(xué)習(xí)源碼實(shí)現(xiàn),問題列表如下 Broker 怎么接收消息的? Broker 異常情況下怎么保證數(shù)據(jù)可靠性? Broker 怎么保證存儲高吞吐量? B...
摘要:每個(gè)與集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實(shí)現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個(gè)成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:的都是從消息來消費(fèi),但是為了能做到實(shí)時(shí)收消息,使用長輪詢方式,可以保證消息實(shí)時(shí)性同方式一致。這種情況建議應(yīng)用,再消費(fèi)下一條消息,這樣可以減輕重試消息的壓力。邏輯請求按參數(shù)返回按照重置消費(fèi)從而實(shí)現(xiàn)回溯消費(fèi) 這次源碼學(xué)習(xí)的方法是帶著問題學(xué)習(xí)源碼實(shí)現(xiàn),問題列表如下 Consumer Group的概念是什么? Consumer pull過程是怎樣的? Consumer 支持push嗎? C...
閱讀 2416·2021-11-18 10:02
閱讀 1929·2021-10-13 09:40
閱讀 3008·2021-09-07 10:07
閱讀 2117·2021-09-04 16:48
閱讀 1015·2019-08-30 13:18
閱讀 2462·2019-08-29 14:03
閱讀 2929·2019-08-29 12:54
閱讀 3167·2019-08-26 11:41