国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

moquette改造筆記(三):優(yōu)化BrokerInterceptor 中的線程池

cfanr / 2668人閱讀

摘要:修改的實現(xiàn),實現(xiàn)接口在改造筆記一整合到中修改實現(xiàn),添加對的實現(xiàn)如下負(fù)載過大,處理不過來時,會回調(diào)該方法例如可以發(fā)生郵件通知相關(guān)人員改造筆記四解決中的調(diào)用兩次

發(fā)現(xiàn)問題

在io.moquette.spi.impl.BrokerInterceptor的構(gòu)造函數(shù)中,新建了一個線程池,代碼如下:

private BrokerInterceptor(int poolSize, List handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }
        executor = Executors.newFixedThreadPool(poolSize);
    }

executor = Executors.newFixedThreadPool(poolSize);這句代碼雖然創(chuàng)建了一個固定線程數(shù)量的線程池,但是線程池的任務(wù)隊列并沒有做限制,一旦某個InterceptHandler中的某個方法進(jìn)行了耗時處理,在高并發(fā)的情況下,會很容易導(dǎo)致線程池的隊列堆積大量待處理的任務(wù),進(jìn)而可能造成內(nèi)存溢出。

解決問題

分別添加以下類和接口

public class ThreadPoolHelper {
    public static ExecutorService createFixedExecutor(int threadNum,int capacity,String threadFactoryName) {
        return new ThreadPoolExecutor(
                threadNum,
                threadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque(capacity),
                new SimpleThreadFactory(threadFactoryName),
                new LogDiscardRejectPolicy()
        );
    }
}

public class SimpleThreadFactory implements ThreadFactory {
    private static final String NAME_FORMAT = "%s-%s";
    private String threadNamePrefix;

    public SimpleThreadFactory(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setName(String.format(NAME_FORMAT, threadNamePrefix, System.currentTimeMillis()));
        return thread;
    }
}


public class LogDiscardRejectPolicy implements RejectedExecutionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LogDiscardRejectPolicy.class);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        LOG.error("executor:{} task queue has full, runnable:{} discarded",executor,r);
        if (!(r instanceof PublishTask)) {
            return;
        }

        PublishTask publishTask = (PublishTask) r;
        InterceptHandler interceptHandler = publishTask.getInterceptHandler();
        if (!(interceptHandler instanceof RejectHandler)) {
            return;
        }

        ((RejectHandler)interceptHandler).rejectedExecution(r,executor);
    }
}


public interface RejectHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

BrokerInterceptor 創(chuàng)建線程池的邏輯改為

private BrokerInterceptor(int poolSize, int capacity, List handlers) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
        this.handlers = new HashMap<>();
        for (Class messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList());
        }
        for (InterceptHandler handler : handlers) {
            this.addInterceptHandler(handler);
        }

        /** modify by liuhh */
        executor = ThreadPoolHelper.createFixedExecutor(poolSize, capacity, THREAD_POOL_NAME);
        //executor = Executors.newFixedThreadPool(poolSize);

    }

解釋:
(1)ThreadPoolHelper中的createFixedExecutor()方法為新建的線程池指定任務(wù)隊列大小和拒絕策略LogDiscardRejectPolicy
(2)在LogDiscardRejectPolicy中,首先將被拒絕的任務(wù)log一遍,對于PublishTask(moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPublished()邏輯)做特殊處理,會交給實現(xiàn)RejectHandler的InterceptHandler處理,由業(yè)務(wù)邏輯決定,出現(xiàn)任務(wù)太多處理不完被遺棄的任務(wù)該如何處理

修改InterceptHandler的實現(xiàn),實現(xiàn)RejectHandler接口

在 moquette改造筆記(一):整合到SpringBoot 中修改SafetyInterceptHandler實現(xiàn),添加對RejectHandler的實現(xiàn)如下

@Slf4j
@Component
public class SafetyInterceptHandler extends AbstractInterceptHandler{

    @Override
    public String getID() {
        return SafetyInterceptHandler.class.getName();
    }

    @Override
    public void onConnect(InterceptConnectMessage msg) {
        
    }

    @Override
    public void onConnectionLost(InterceptConnectionLostMessage msg) {
       
    }


    @Override
    public void onPublish(InterceptPublishMessage msg) {
       
    }


    @Override
    public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) {
        
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        /**MQTT SERVICE 負(fù)載過大,處理不過來時,會回調(diào)該方法*/
        //例如可以發(fā)生郵件通知相關(guān)人員
    }

}

moquette改造筆記(四):解決InterceptHandler中的onConnectionLost()調(diào)用兩次

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77202.html

相關(guān)文章

  • moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPu

    摘要:優(yōu)化邏輯優(yōu)化方向向啟動方法一樣,每次調(diào)用的方法都是在線程池中新建一個任務(wù)具體代碼解釋新建一個用來實現(xiàn)調(diào)用方法。改造筆記三優(yōu)化中的線程池 發(fā)現(xiàn)問題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...

    liangzai_cool 評論0 收藏0
  • moquette改造筆記(一):整合到SpringBoot

    摘要:整合到本文更加注重代碼實踐,對于配置相關(guān)的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導(dǎo)入。接口是預(yù)留給開發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現(xiàn)的輕量級的MQTT BROKEN. Moquet...

    young.li 評論0 收藏0
  • moquette改造筆記(五):設(shè)備連接頻繁上下線或者相互頂替出現(xiàn)的設(shè)備上下線狀態(tài)錯亂問題

    摘要:發(fā)現(xiàn)問題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒有先后順序,如果在這兩個方法里面來記錄設(shè)備上下線狀態(tài),會造成狀態(tài)不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。 發(fā)現(xiàn)問題 在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...

    betacat 評論0 收藏0
  • moquette改造筆記(四):解決InterceptHandler中的onConnectionLo

    摘要:發(fā)現(xiàn)問題在使用中設(shè)備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會導(dǎo)致調(diào)用兩次解釋會在該從鏈中移除掉時被調(diào)用,一般的話沒有手動從鏈中刪除時,會在連接斷開后回調(diào)該方法。 發(fā)現(xiàn)問題 在使用中設(shè)備異常斷開,InterceptHandler中的onConnectionLost()。經(jīng)過調(diào)試發(fā)現(xiàn)是MoquetteIdleTimeoutHandler中的代碼導(dǎo)致的,代碼...

    joyqi 評論0 收藏0
  • 程序員筆記|如何編寫高性能的Java代碼

    摘要:常見標(biāo)高線程上下文切換頻繁線程太多鎖競爭激烈標(biāo)高如果的占用很高,排查涉及到的程序,比如把改造成。抖動問題原因字節(jié)碼轉(zhuǎn)為機(jī)器碼需要占用時間片,大量的在執(zhí)行字節(jié)碼時,導(dǎo)致長期處于高位現(xiàn)象,占用率最高解決辦法保證編譯線程的占比。 一、并發(fā) Unable to create new native thread …… 問題1:Java中創(chuàng)建一個線程消耗多少內(nèi)存? 每個線程有獨(dú)自的棧內(nèi)存,共享堆內(nèi)...

    ky0ncheng 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<