摘要:每個優先級可以用不同的表示,發消息時,指定不同的來表示優先級,這種方式可以解決絕大部分的優先級問題,但是對業務的優先級精確性做了妥協。支持定時消息,但是不支持任意時間精度,支持特定的,例如定時,,等。
Producer 生產者
這次源碼學習的方法是帶著問題學習源碼實現,問題列表如下
Producer 同步消息怎么發送?
Producer 是與NameServer什么交互?
Producer 異步消息怎么發送?
P2P,Pub/Sub 都支持嗎?
Producer 怎么保證順序消息?
Producer 負載均衡?
Producer Group是什么概念?
Producer 怎么制定消息優先級?
Producer 的事務消息怎么實現?
Producer 定時消息怎么實現?
Producer 同步消息怎么發送? 消息體Message//主題 private String topic; //狀態 private int flag; //屬性 private Map執行邏輯properties; //消息體 private byte[] body;
1. 查找topic中的發布信息 2. 選擇topic中的某個MessageQueue 3. 使用Netty發送消息至MessageQueue
DefaultMQProducer.send ->DefaultMQProducerImple.send ->DefaultMQProducerImple.sendDefaultImpl
private SendResult sendDefaultImpl(// Message msg, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //獲取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //選擇topic中的中的某個MessageQueue(相當于kafka的partition) MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); //真正的發送數據方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } } }
本次重點關注消息體的發送
DefaultMQProducerImple.sendDefaultImpl->DefaultMQProducerImpl.sendKernelImpl
private SendResult sendKernelImpl(final Message msg, // final MessageQueue mq, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } //構建請求頭部 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { //異步模式發送 case ASYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// brokerAddr, // 1 mq.getBrokerName(), // 2 msg, // 3 requestHeader, // 4 timeout, // 5 communicationMode, // 6 sendCallback, // 7 topicPublishInfo, // 8 this.mQClientFactory, // 9 this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 context, // this); break; case ONEWAY: //同步模式發送 case SYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
后面就是底層的消息發送工具,整個RocketMQ共用
DefaultMQProducerImpl.sendKernelImpl->MQClientAPIImpl.sendMessage
public SendResult sendMessage(// final String addr, // 1 final String brokerName, // 2 final Message msg, // 3 final SendMessageRequestHeader requestHeader, // 4 final long timeoutMillis, // 5 final CommunicationMode communicationMode, // 6 final SendCallback sendCallback, // 7 final TopicPublishInfo topicPublishInfo, // 8 final MQClientInstance instance, // 9 final int retryTimesWhenSendFailed, // 10 final SendMessageContext context, // 11 final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); default: assert false; break; } return null; }
MQClientAPIImpl.sendMessage->MQClientAPIImpl.sendMessageSync
委托給Netty
private SendResult sendMessageSync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }
MQClientAPIImpl.sendMessageSync->NettyRomotingClient.invokeSync
在調用前后觸發hook,真正的還在后面
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
NettyRomotingClient.invokeSync->NettyRomotingClient.invokeSyncImpl
終于到了Netty真正發數據的時候了
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); PLOG.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }Producer是與NameServer什么交互? TopicPublishInfo
private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; //topic中MessgeQueuee信息 private List執行邏輯messageQueueList = new ArrayList (); //負責均衡策略的索引 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //topic包含的路由信息 private TopicRouteData topicRouteData;
1. 發送消息時查找topic中的發布信息 2. 如果沒找到,通過NameServer去尋找 3. 通過Netty請求NameServer
DefaultMQProducerImpl
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //從NameServer更新 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
MQClientInstance
public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); }
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } }
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); //構建向NameServer的消息體 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); //Netty 請求 RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { // TODO LOG break; } case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { return TopicRouteData.decode(body, TopicRouteData.class); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }Producer 異步消息怎么發送?
異步:
異步與同步的區別,是無需等待,后續邏輯透過回調.
異步發送方法參數多了SendCallback參數,且沒有返回值
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); }
SendCallback是發送成功和失敗后的回調函數
public interface SendCallback { public void onSuccess(final SendResult sendResult); public void onException(final Throwable e); }
前面的和同步類似都跳過,直接看
private void sendMessageAsync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final MQClientInstance instance, // final int retryTimesWhenSendFailed, // final AtomicInteger times, // final SendMessageContext context, // final DefaultMQProducerImpl producer // ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { // } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } // 上面重復了,sendCallBack的判斷應該放在這 try { // 異步成功回調點 sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); //異常回調點 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); }
@Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { //執行回調 executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // timeoutMillis, // this.semaphoreAsync.getQueueLength(), // this.semaphoreAsync.availablePermits()// ); PLOG.warn(info); throw new RemotingTimeoutException(info); } } }P2P,Pub/Sub都支持嗎?
JMS系統有P2P和Pub/Sub兩種模式,一條消息只能被消費一次時即為P2P,RocketMQ的廣播模式即為了Pub/Sub模式.
Producer怎么保證順序消息?順序消息:
消費消息的順序要同發送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息為滿足順序性,必須 Producer 單線程順序發送,且發送到同一個隊列,這樣 Consumer 就可以按照 Producer 發送的順序去消費消息。
普通順序消息:
順序消息的一種,正常情況下可以保證完全的順序消息,但是一旦發生通信異常,Broker 重啟,由于隊列總數發生變化,哈希取模后定位的隊列會變化,產生短暫的消息順序不一致。 如果業務能容忍在集群異常情況(如某個 Broker 宕機或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。
嚴格順序消息:
順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式 Failover 特性,即 Broker 集群中只 要有一臺機器不可用,則整個集群都不可用,服務可用性大大降低。 如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用。(依賴同步雙寫,主備自動切換,自動切換功能目前還未實現)目前已知的應用只有數據庫binlog同步強依賴嚴格順序消息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息。
要實現嚴格的順序消息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是一對一對一的關系
關于順序消息建議閱讀分布式開放消息系統(RocketMQ)的原理與實踐
放在Producer就是要保證需要嚴格順序消息就是把同類型的消息發送到同一MessageQueue,通過實現特定的MessageQueueSelector,可以定制跟消息體相關的MessageQueue
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); }
public interface MessageQueueSelector { MessageQueue select(final ListProducer 負載均衡?mqs, final Message msg, final Object arg); }
同步發送消息中,有這樣一個方法來選定MessageQueue
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
保證低延遲
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //如果要保證最低發送延遲走此邏輯 //建議:這里改成策略模式 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); //輪詢看能不能有低延遲的MessageQueue for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //如果沒找到就隨機找一個 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //不然走此邏輯,輪詢模式 return tpInfo.selectOneMessageQueue(lastBrokerName); }
選最低延遲的broker,根據FaultItem的可用性和開始時間選擇
@Override public String pickOneAtLeast() { final Enumerationelements = this.faultItemTable.elements(); List tmpList = new LinkedList (); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { @Override public String pickOneAtLeast() { final Enumeration elements = this.faultItemTable.elements(); List tmpList = new LinkedList (); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); //按延遲排序 Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { //從最爛的后一個開始選 final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
FaultItem排序策略
@Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; }
輪詢模式:
輪詢取不等于lastBrokerName的一個MessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
沒有lastBrokerName,就輪詢取吧
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }Producer Group是什么概念?
一類 Producer 的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
Producer 怎么制定消息優先級?優先級:
每條消息都有不同的優先級,一般用整數來描述,優先級高的消 息先投遞,如果消息完全在一個內存隊列中,那么在投遞前可以按照優先級排序,令優先級高的先投遞。由于RocketMQ所有消息都是持久化的,所以如果按照優先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優先級,但是可以通過變通的方式實現類似功能,即多帶帶配置一個優先級高的隊列,和一個普通優先級 的隊列, 將不同優先級發送到不同隊列即可。
對于優先級問題,可以歸納為 2 類
只要達到優先級目的即可,不是嚴格意義上的優先級,通常將優先級劃分為高、中、低,或者再多幾個級別。每個優先級可以用不同的topic表示,發消息時,指定不同的topic來表示優先級,這種方式可以解決絕大部分的優先級問題,但是對業務的優先級精確性做了妥協。
嚴格的優先級,優先級用整數表示,例如0~65535,這種優先級問題一般使用不同 topic 解決就非常不合適。如果要讓 MQ 解決此問題,會對 MQ 的性能造成非常大的影響。這里要確保一點,業務上是否確實需要這種嚴格的優先級,如果將優先級壓縮成幾個,對業務的影響有多大?
Producer 的事務消息怎么實現?邏輯:
1. 發送prepare消息 2. 執行生產者業務邏輯(業務邏輯代碼需實現LocalTransactionExecuter) 3. 結束事務消息(commit,rollback,notType)
TransactionMQProducer.sendMessageInTransaction->DefaultMQProducerImpl.sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 設置事務消息屬性 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { 1. 發送消息,同普通消息一樣 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } //執行生產者業務邏輯 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: //如果1就發送失敗,就回滾 case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //結束事務狀態,by本地業務邏輯執行情況 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
public void endTransaction(// final SendResult sendResult, // final LocalTransactionState localTransactionState, // final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { //如果本地執行成功就提交 case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; //如果本地失敗,就回滾 case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; //如果本地未知,那就打死未知 case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
特殊情況:如果事務結束消息發送失敗,該怎么辦?
broker會檢查如果是prepared狀態,則會向Producer發起CheckTransaction請求.
定時消息:
指消息發到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能 被消費。
如果要支持任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的產生巨大性能開銷。
RocketMQ 支持定時消息,但是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。
實現:
在消息體中設置Message延遲級別,剩下的就交給Broker實現吧
public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70317.html
摘要:每個與集群中的所有節點建立長連接,定時注冊信息到所有。完全無狀態,可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:完全無狀態,可集群部署與集群中的其中一個節點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...
摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監控信息如下總結整篇文章講述了與整合和監控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:本地的安裝與調試標簽啟動進入的源碼項目。消息發送的高性能與低延遲。強大的消息堆積能力與消息處理能力。嚴格的順序消息存儲。保證消息至少被消費一次,但不承諾消息不會被消費者多次消費。其消息的冪等由消費者自己實現。 本地RocketMQ的安裝與調試 標簽:【RocketMQ】 1. 啟動 進入RocketMQ-ALL的源碼項目。 執行maven打包: mvn -Prelease-all ...
閱讀 3003·2021-11-23 09:51
閱讀 1012·2021-09-26 09:55
閱讀 3963·2021-09-22 14:58
閱讀 1492·2021-09-08 09:35
閱讀 1083·2021-08-26 14:16
閱讀 888·2019-08-23 18:17
閱讀 2069·2019-08-23 16:45
閱讀 706·2019-08-23 15:55