国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RocketMQ源碼學(xué)習(xí)(五)-Broker(與Consumer交互部分)

paulli3 / 1807人閱讀

摘要:發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送消費(fèi)情況,只有網(wǎng)絡(luò)異常,啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。

問題列表

Broker 怎么響應(yīng)Consumer請求?

Broker 怎么維護(hù)ConsumeQueue?

Broker 怎么處理事務(wù)消息的 ConsumeQueue ?

Broker 怎么處理定時(shí)消息的 ConsumeQueue?

Broker 怎么處理回溯消費(fèi)請求?

Broker 的消息是 at least once還是exactly only once?

怎么響應(yīng)Consumer請求?


原理:
如上圖所示,RocketMQ將所有消息都放在CommitLog里面,消費(fèi)是維護(hù)一個(gè)ConsumeQueue幫助Consumer消費(fèi).pull操作要讀兩次,先讀ConsumeQueue得到offset,再讀CommitLog得到消息內(nèi)容.


ConsumeQueue有一個(gè)長度20的ByteBufferb變量byteBufferIndex,里面維護(hù)者消息偏移量,消息發(fā)現(xiàn),tags的hashcode

        //消息偏移量
        this.byteBufferIndex.putLong(offset);
        //消息大小
        this.byteBufferIndex.putInt(size);
        //tags的HashCode
        this.byteBufferIndex.putLong(tagsCode);

BrokerController.initialize方法中會(huì)注冊PullMessageProcessor來處理pull message 請求

        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

在PullMessageProcessor.processRequest中又委托給DefaultMessageStore獲取

  final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
   

DefaultMessageStore.getMessage

    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
                                       final MessageFilter messageFilter) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }

        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }

        long beginTime = this.getSystemClock().now();

        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long nextBeginOffset = offset;
        long minOffset = 0;
        long maxOffset = 0;

        GetMessageResult getResult = new GetMessageResult();

        final long maxOffsetPy = this.commitLog.getMaxOffset();
        //根據(jù)topic和queueId找到ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQueue();
            maxOffset = consumeQueue.getMaxOffsetInQueue();

            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
            //前面都在處理異常,這里開始真正獲取
            //獲取consumeQueue offset之后所有可讀的offset,consumeQueue也是文件存儲
                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        //循環(huán)單個(gè)獲取
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }

                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                isInDisk)) {
                                break;
                            }

                            boolean extRet = false;
                            if (consumeQueue.isExtAddr(tagsCode)) {
                                extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                if (extRet) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    // can"t find ext content.Client will filter messages by tag also.
                                    log.error("[BUG] can"t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                        tagsCode, offsetPy, sizePy, topic, group);
                                }
                            }

                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
        //根據(jù)偏移量和大小從CommitLog獲取消息
                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            if (null == selectResult) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                }

                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                continue;
                            }

                            if (messageFilter != null
                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                // release...
                                selectResult.release();
                                continue;
                            }

                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();             
        //將查到的消息放入返回值
                            getResult.addMessage(selectResult);
                            status = GetMessageStatus.FOUND;
                            nextPhyFileStartOffset = Long.MIN_VALUE;
                        }

                        if (diskFallRecorded) {
                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                            brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                        }

                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {

                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                        + maxOffset + ", but access logic queue failed.");
                }
            }
        } else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);

        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

CommitLog.getMessage

  public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        //根據(jù)offset獲取到映射文件
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            //從映射文件中獲取指定位置的數(shù)據(jù)
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

MappedFile.selectMappedBuffer

    public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
        int readPosition = getReadPosition();
        if ((pos + size) <= readPosition) {

            if (this.hold()) {
                //Java NIO 獲取指定pos數(shù)據(jù)
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(pos);
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);
                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
            } else {
                log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                    + this.fileFromOffset);
            }
        } else {
            log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
                + ", fileFromOffset: " + this.fileFromOffset);
        }

        return null;
    }

消息獲取流程明白了,看下怎么獲取ConsumeQueue,它是維護(hù)在一個(gè)以queueId為key的ConcurrentMap中,沒有就新建一個(gè).

  public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap newMap = new ConcurrentHashMap(128);
            ConcurrentMap oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            //m
            ConsumeQueue newLogic = new ConsumeQueue(//
                topic, //
                queueId, //
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }
Broker 怎么維護(hù)ConsumeQueue?

很容易的想到,是在存放Message的時(shí)候維護(hù)了ConsumeQueue.但是相關(guān)代碼沒找到,我們從頂層往上層找
CosumeQueue.putMessagePositionInfo
-> CosumeQueue.putMessagePositionInfoWrapper
-> DefaultMessageStore.putMessagePositionInfo
-> CommitLogDispatcherBuildConsumeQueue.dispatch
-> DefaultMessageStore.doDispatch
-> ReputMessageService.doReput

private void doReput() {
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
    //從CommitLog中讀取上次偏移量之后的新消息
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
                        //順序讀
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                        //構(gòu)建請求去維護(hù)consumeQueue的請求
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }
                                    // FIXED BUG By shijia
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {

                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);

                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

邏輯:啟個(gè)線程,不斷地查詢有沒有新的提交,如果有就請求維護(hù)consumeQueue.

Broker 怎么處理事務(wù)消息的ConsumeQueue?

邏輯:
1.prepare消息不會(huì)放入ConsumeQueue
2.commit消息才能被ConsumeQueue

CommitLogDispatcherBuildConsumeQueue.dispatch

  class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                // commit操作會(huì)放入consumeQueue
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                //  prepare消息不會(huì)放入ConsumeQueue
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }
Broker 怎么處理定時(shí)消息的 ConsumeQueue?

Producer那篇有講,定時(shí)消息是吧topic和queueId存儲到屬性中,真正存儲與ScheduleTopic上,時(shí)間到了才放入原有topic,故ConsumeQueue無需特殊維護(hù).

Broker 怎么處理回溯消費(fèi)請求?

邏輯:
1.在ConsumeQueue根據(jù)時(shí)間查詢offset
2.請求Consumer Group中所有Consumer重置offset

代碼:
AdminBrokerProcessor.processRequest

    case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
    return this.resetOffset(ctx, request);

Broker2Client.resetOffset

    public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final ResetOffsetRequestHeader requestHeader =
            (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
        log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
            requestHeader.getTimestamp(), requestHeader.isForce());
        boolean isC = false;
        LanguageCode language = request.getLanguage();
        switch (language) {
            case CPP:
                isC = true;
                break;
        }
        return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
            requestHeader.getTimestamp(), requestHeader.isForce(), isC);
    }

主要邏輯在此方法
Broker2Client.resetOffset

   public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
        boolean isC) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);

        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
            return response;
        }

        Map offsetTable = new HashMap();

        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue();
            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
            mq.setTopic(topic);
            mq.setQueueId(i);

            long consumerOffset =
                this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
            if (-1 == consumerOffset) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("THe consumer group <%s> not exist", group));
                return response;
            }

            long timeStampOffset;
            if (timeStamp == -1) {

                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
            } else {
            //根據(jù)時(shí)間找offset
                timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
            }

            if (timeStampOffset < 0) {
                log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
                timeStampOffset = 0;
            }

            if (isForce || timeStampOffset < consumerOffset) {
                offsetTable.put(mq, timeStampOffset);
            } else {
                offsetTable.put(mq, consumerOffset);
            }
        }

        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timeStamp);
        //給Consumer發(fā)送請求重置offset
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
        if (isC) {
            // c++ language
            ResetOffsetBodyForC body = new ResetOffsetBodyForC();
            List offsetList = convertOffsetTable2OffsetList(offsetTable);
            body.setOffsetTable(offsetList);
            request.setBody(body.encode());
        } else {
            // other language
            ResetOffsetBody body = new ResetOffsetBody();
            body.setOffsetTable(offsetTable);
            request.setBody(body.encode());
        }

        ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

        if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
            ConcurrentMap channelInfoTable =
                consumerGroupInfo.getChannelInfoTable();
            for (Map.Entry entry : channelInfoTable.entrySet()) {
                int version = entry.getValue().getVersion();
                if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                    try {
                        this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                        log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                            topic, group, entry.getValue().getClientId());
                    } catch (Exception e) {
                        log.error("[reset-offset] reset offset exception. topic={}, group={}",
                            new Object[] {topic, group}, e);
                    }
                } else {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("the client does not support this feature. version="
                        + MQVersion.getVersionDesc(version));
                    log.warn("[reset-offset] the client does not support this feature. version={}",
                        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                    return response;
                }
            }
        } else {
            String errorInfo =
                String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                    requestHeader.getGroup(),
                    requestHeader.getTopic(),
                    requestHeader.getTimestamp());
            log.error(errorInfo);
            response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
            response.setRemark(errorInfo);
            return response;
        }
        response.setCode(ResponseCode.SUCCESS);
        ResetOffsetBody resBody = new ResetOffsetBody();
        resBody.setOffsetTable(offsetTable);
        response.setBody(resBody.encode());
        return response;
    }

ConsumeQueue中沒有時(shí)間戳啊,還是得去CommitLog中找

    public long getOffsetInQueueByTime(final long timestamp) {
        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
        if (mappedFile != null) {
            long offset = 0;
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            long leftIndexValue = -1L, rightIndexValue = -1L;
            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
            if (null != sbr) {
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                try {
                //二分查找?
                    while (high >= low) {
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        long phyOffset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        if (phyOffset < minPhysicOffset) {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        }
//獲取指定偏移量的時(shí)間戳
                       long storeTime =
                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        if (storeTime < 0) {
                            return 0;
                        } else if (storeTime == timestamp) {
                            targetOffset = midOffset;
                            break;
                        } else if (storeTime > timestamp) {
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                            rightIndexValue = storeTime;
                        } else {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            leftIndexValue = storeTime;
                        }
                    }

                    if (targetOffset != -1) {

                        offset = targetOffset;
                    } else {
                        if (leftIndexValue == -1) {

                            offset = rightOffset;
                        } else if (rightIndexValue == -1) {

                            offset = leftOffset;
                        } else {
                            offset =
                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                    - rightIndexValue) ? rightOffset : leftOffset;
                        }
                    }

                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                } finally {
                    sbr.release();
                }
            }
        }
        return 0;
    }

再來關(guān)注下消息存儲時(shí)存儲的額外信息 MessageExt

    // 隊(duì)列ID 
    private int queueId;
    //存儲消息大小
    private int storeSize;
    //隊(duì)列的offset
    private long queueOffset;
    //消息標(biāo)志位
    private int sysFlag;
    //消息在客戶端創(chuàng)建時(shí)間戳
    private long bornTimestamp;
    //producer 地址
    private SocketAddress bornHost;
    //消息在服務(wù)器存儲時(shí)間戳
    private long storeTimestamp;
    //存儲的broker地址
    private SocketAddress storeHost;
    //消息id
    private String msgId;
    //消息對應(yīng)的Commit Log Offset
    private long commitLogOffset;
    //消息體CRC
    private int bodyCRC;
    // 當(dāng)前消息被某個(gè)訂閱組重新消費(fèi)了幾次(訂閱組之間獨(dú)立計(jì)數(shù))
    private int reconsumeTimes;
Broker 的消息是 at least once還是exactly only once?

at least once:

是指每個(gè)消息必須投遞一次,RocketMQ Consumer 先 pull 消息到本地,消費(fèi)完成后,才向服務(wù)器返回 ack,如果沒有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性。

exactly only once:

(1). 發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。
(2). 消費(fèi)消息階段,不允許消費(fèi)重復(fù)的消息。只有以上兩個(gè)條件都滿足情況下,才能認(rèn)為消息是“Exactly Only Once”,而要實(shí)現(xiàn)以上兩點(diǎn),在分布式系統(tǒng)環(huán)境下,不可避免要產(chǎn)生巨大的開銷。所以 RocketMQ 為了追求高性能,并不保證此特性,要求在業(yè)務(wù)上進(jìn)行去重, 也就是說消費(fèi)消息要做到冪等性。RocketMQ 雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送、消 費(fèi)情況,只有網(wǎng)絡(luò)異常,Consumer 啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。此問題的本質(zhì)原因是網(wǎng)絡(luò)調(diào)用存在不確定性,即既不成功也不失敗的第三種狀態(tài),所以才產(chǎn)生了消息重復(fù)性問 題。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70367.html

相關(guān)文章

  • RocketMQ源碼學(xué)習(xí)(六)-Name Server

    摘要:完全無狀態(tài),可集群部署與集群中的其中一個(gè)節(jié)點(diǎn)隨機(jī)選擇建立長連接,定期從取路由信息,并向提供服務(wù)的建立長連接,且定時(shí)向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...

    Joyven 評論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(三)-Broker(Producer交互部分)

    摘要:如果干凈頁不足,此時(shí)寫入會(huì)被阻塞,系統(tǒng)嘗試刷盤部分?jǐn)?shù)據(jù),大約每次嘗試個(gè),異步刷盤消息收到后返回同時(shí)調(diào)用消息存儲過程異常情況下可能造成少量數(shù)據(jù)丟失來找出更多干凈。刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。 這次源碼學(xué)習(xí)的方法是帶著問題學(xué)習(xí)源碼實(shí)現(xiàn),問題列表如下 Broker 怎么接收消息的? Broker 異常情況下怎么保證數(shù)據(jù)可靠性? Broker 怎么保證存儲高吞吐量? B...

    rickchen 評論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(一)-概述

    摘要:每個(gè)與集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實(shí)現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個(gè)成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(四)-Consumer

    摘要:的都是從消息來消費(fèi),但是為了能做到實(shí)時(shí)收消息,使用長輪詢方式,可以保證消息實(shí)時(shí)性同方式一致。這種情況建議應(yīng)用,再消費(fèi)下一條消息,這樣可以減輕重試消息的壓力。邏輯請求按參數(shù)返回按照重置消費(fèi)從而實(shí)現(xiàn)回溯消費(fèi) 這次源碼學(xué)習(xí)的方法是帶著問題學(xué)習(xí)源碼實(shí)現(xiàn),問題列表如下 Consumer Group的概念是什么? Consumer pull過程是怎樣的? Consumer 支持push嗎? C...

    周國輝 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<