摘要:修改的實現(xiàn),實現(xiàn)接口在改造筆記一整合到中修改實現(xiàn),添加對的實現(xiàn)如下負(fù)載過大,處理不過來時,會回調(diào)該方法例如可以發(fā)生郵件通知相關(guān)人員改造筆記四解決中的調(diào)用兩次
發(fā)現(xiàn)問題
在io.moquette.spi.impl.BrokerInterceptor的構(gòu)造函數(shù)中,新建了一個線程池,代碼如下:
private BrokerInterceptor(int poolSize, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } executor = Executors.newFixedThreadPool(poolSize); }
executor = Executors.newFixedThreadPool(poolSize);這句代碼雖然創(chuàng)建了一個固定線程數(shù)量的線程池,但是線程池的任務(wù)隊列并沒有做限制,一旦某個InterceptHandler中的某個方法進(jìn)行了耗時處理,在高并發(fā)的情況下,會很容易導(dǎo)致線程池的隊列堆積大量待處理的任務(wù),進(jìn)而可能造成內(nèi)存溢出。
解決問題分別添加以下類和接口
public class ThreadPoolHelper { public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) { return new ThreadPoolExecutor( threadNum, threadNum, 30, TimeUnit.SECONDS, new LinkedBlockingDeque(capacity), new SimpleThreadFactory(threadFactoryName), new LogDiscardRejectPolicy() ); } } public class SimpleThreadFactory implements ThreadFactory { private static final String NAME_FORMAT = "%s-%s"; private String threadNamePrefix; public SimpleThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis())); return thread; } } public class LogDiscardRejectPolicy implements RejectedExecutionHandler { private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r); if (!(r instanceof PublishTask)) { return; } PublishTask publishTask = (PublishTask) r; InterceptHandler interceptHandler = publishTask.getInterceptHandler(); if (!(interceptHandler instanceof RejectHandler)) { return; } ((RejectHandler)interceptHandler).rejectedExecution(r,executor); } } public interface RejectHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
BrokerInterceptor 創(chuàng)建線程池的邏輯改為
private BrokerInterceptor(int poolSize, int capacity, Listhandlers) { LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers)); this.handlers = new HashMap<>(); for (Class> messageType : InterceptHandler.ALL_MESSAGE_TYPES) { this.handlers.put(messageType, new CopyOnWriteArrayList ()); } for (InterceptHandler handler : handlers) { this.addInterceptHandler(handler); } /** modify by liuhh */ executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME); //executor = Executors.newFixedThreadPool(poolSize); }
解釋:
(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的線程池指定任務(wù)隊列大小和拒絕策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務(wù)log一遍,對于PublishTask(moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPublished()邏輯)做特殊處理,會交給實現(xiàn)RejectHandler的InterceptHandler處理,由業(yè)務(wù)邏輯決定,出現(xiàn)任務(wù)太多處理不完被遺棄的任務(wù)該如何處理。
在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現(xiàn),添加對RejectHandler的實現(xiàn)如下
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { /**MQTT SERVICE 負(fù)載過大,處理不過來時,會回調(diào)該方法*/ //例如可以發(fā)生郵件通知相關(guān)人員 } }
moquette改造筆記(四):解決InterceptHandler中的onConnectionLost()調(diào)用兩次
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77202.html
摘要:優(yōu)化邏輯優(yōu)化方向向啟動方法一樣,每次調(diào)用的方法都是在線程池中新建一個任務(wù)具體代碼解釋新建一個用來實現(xiàn)調(diào)用方法。改造筆記三優(yōu)化中的線程池 發(fā)現(xiàn)問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:整合到本文更加注重代碼實踐,對于配置相關(guān)的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導(dǎo)入。接口是預(yù)留給開發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現(xiàn)的輕量級的MQTT BROKEN. Moquet...
摘要:發(fā)現(xiàn)問題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒有先后順序,如果在這兩個方法里面來記錄設(shè)備上下線狀態(tài),會造成狀態(tài)不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。 發(fā)現(xiàn)問題 在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...
摘要:發(fā)現(xiàn)問題在使用中設(shè)備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會導(dǎo)致調(diào)用兩次解釋會在該從鏈中移除掉時被調(diào)用,一般的話沒有手動從鏈中刪除時,會在連接斷開后回調(diào)該方法。 發(fā)現(xiàn)問題 在使用中設(shè)備異常斷開,InterceptHandler中的onConnectionLost()。經(jīng)過調(diào)試發(fā)現(xiàn)是MoquetteIdleTimeoutHandler中的代碼導(dǎo)致的,代碼...
摘要:常見標(biāo)高線程上下文切換頻繁線程太多鎖競爭激烈標(biāo)高如果的占用很高,排查涉及到的程序,比如把改造成。抖動問題原因字節(jié)碼轉(zhuǎn)為機(jī)器碼需要占用時間片,大量的在執(zhí)行字節(jié)碼時,導(dǎo)致長期處于高位現(xiàn)象,占用率最高解決辦法保證編譯線程的占比。 一、并發(fā) Unable to create new native thread …… 問題1:Java中創(chuàng)建一個線程消耗多少內(nèi)存? 每個線程有獨(dú)自的棧內(nèi)存,共享堆內(nèi)...
閱讀 2003·2021-11-24 10:45
閱讀 1860·2021-10-09 09:43
閱讀 1298·2021-09-22 15:38
閱讀 1229·2021-08-18 10:19
閱讀 2844·2019-08-30 15:55
閱讀 3068·2019-08-30 12:45
閱讀 2971·2019-08-30 11:25
閱讀 362·2019-08-29 11:30