国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPu

liangzai_cool / 3789人閱讀

摘要:優(yōu)化邏輯優(yōu)化方向向啟動(dòng)方法一樣,每次調(diào)用的方法都是在線程池中新建一個(gè)任務(wù)具體代碼解釋新建一個(gè)用來實(shí)現(xiàn)調(diào)用方法。改造筆記三優(yōu)化中的線程池

發(fā)現(xiàn)問題

下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼

@Override
    public void notifyClientConnected(final MqttConnectMessage msg) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
            LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
                    msg.payload().clientIdentifier(), handler.getID());
            executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
        }
    }

    @Override
    public void notifyClientDisconnected(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) {
            LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}",
                clientID, username, handler.getID());
            executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username)));
        }
    }

    @Override
    public void notifyClientConnectionLost(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) {
            LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, " +
                "interceptorId={}", clientID, username, handler.getID());
            executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username)));
        }
    }

    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
        msg.retain();

        executor.execute(() -> {
                try {
                    int messageId = msg.variableHeader().messageId();
                    String topic = msg.variableHeader().topicName();
                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                    }
                } finally {
                    ReferenceCountUtil.release(msg);
                }
        });
    }

    @Override
    public void notifyTopicSubscribed(final Subscription sub, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) {
            LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}",
                sub.getClientId(), sub.getTopicFilter(), handler.getID());
            executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username)));
        }
    }

可以發(fā)現(xiàn)在除了notifyTopicPublished(),其它方法中,在for循環(huán)中每次for循環(huán),對于InterceptHandler的調(diào)用都是在線程池中每次都是新執(zhí)行一個(gè)任務(wù),但是在notifyTopicPublished()方法中是在一個(gè)線程中for循環(huán)依次調(diào)用,這樣處理首先沒有用到線程池的多線程,其次是一旦某個(gè)InterceptHandler的notifyTopicPublished方法是阻塞的,那么后面的InterceptHandler的notifyTopicPublished()都會(huì)被阻塞。

優(yōu)化邏輯

優(yōu)化方向:向啟動(dòng)方法一樣,每次調(diào)用InterceptHandler的notifyTopicPublished方法都是在線程池中新建一個(gè)任務(wù)

具體代碼:

public class PublishTask implements Runnable {
        final MqttPublishMessage msg;
        final InterceptHandler interceptHandler;
        final String clientId;
        final String username;

        PublishTask(MqttPublishMessage msg, InterceptHandler interceptHandler, String clientId, String username) {
            this.msg = msg;
            this.interceptHandler = interceptHandler;
            this.clientId = clientId;
            this.username = username;
        }

        @Override
        public void run() {
            try {
                interceptHandler.onPublish(new InterceptPublishMessage(msg, clientId, username));
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public String toString() {
            return "PublishTask{" +
                    "msg=" + msg +
                    ", interceptHandler=" + interceptHandler +
                    ", clientId="" + clientId + """ +
                    ", username="" + username + """ +
                    "}";
        }

        public InterceptHandler getInterceptHandler() {
            return interceptHandler;
        }
    }


    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {

        msg.retain();

        try {
            for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                executor.execute(new PublishTask(msg.retainedDuplicate(), handler, clientID, username));
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }


//        executor.execute(() -> {
//                try {
//                    int messageId = msg.variableHeader().messageId();
//                    String topic = msg.variableHeader().topicName();
//                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
//                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
//                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
//                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
//                    }
//                } finally {
//                    ReferenceCountUtil.release(msg);
//                }
//        });
    }

解釋:
(1)新建一個(gè)PublishTask用來實(shí)現(xiàn)調(diào)用handler.onPublish()方法。其中要注意
new PublishTask(msg.retainedDuplicate(), handler, clientID, username)中的msg.retainedDuplicate()
還要主要在兩個(gè)finally中ReferenceCountUtil.release(msg);

(2)PublishTask的toString()和getInterceptHandler()可以先不用管,會(huì)在其它地方用到,下一篇文章會(huì)講到。
moquette改造筆記(三):優(yōu)化BrokerInterceptor 中的線程池

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77204.html

相關(guān)文章

  • moquette改造筆記(三):優(yōu)化BrokerInterceptor 中的線程池

    摘要:修改的實(shí)現(xiàn),實(shí)現(xiàn)接口在改造筆記一整合到中修改實(shí)現(xiàn),添加對的實(shí)現(xiàn)如下負(fù)載過大,處理不過來時(shí),會(huì)回調(diào)該方法例如可以發(fā)生郵件通知相關(guān)人員改造筆記四解決中的調(diào)用兩次 發(fā)現(xiàn)問題 在io.moquette.spi.impl.BrokerInterceptor的構(gòu)造函數(shù)中,新建了一個(gè)線程池,代碼如下: private BrokerInterceptor(int poolSize, List hand...

    cfanr 評論0 收藏0
  • moquette改造筆記(一):整合到SpringBoot

    摘要:整合到本文更加注重代碼實(shí)踐,對于配置相關(guān)的知識會(huì)一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導(dǎo)入。接口是預(yù)留給開發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實(shí)現(xiàn)的輕量級的MQTT BROKEN. Moquet...

    young.li 評論0 收藏0
  • moquette改造筆記(五):設(shè)備連接頻繁上下線或者相互頂替出現(xiàn)的設(shè)備上下線狀態(tài)錯(cuò)亂問題

    摘要:發(fā)現(xiàn)問題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒有先后順序,如果在這兩個(gè)方法里面來記錄設(shè)備上下線狀態(tài),會(huì)造成狀態(tài)不對。因?yàn)橄嗷ロ斕娴那闆r并不多見,因此兩個(gè)也可以接受,在性能上并不會(huì)造成多大影響。 發(fā)現(xiàn)問題 在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...

    betacat 評論0 收藏0
  • moquette改造筆記(四):解決InterceptHandler中的onConnectionLo

    摘要:發(fā)現(xiàn)問題在使用中設(shè)備異常斷開中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會(huì)導(dǎo)致調(diào)用兩次解釋會(huì)在該從鏈中移除掉時(shí)被調(diào)用,一般的話沒有手動(dòng)從鏈中刪除時(shí),會(huì)在連接斷開后回調(diào)該方法。 發(fā)現(xiàn)問題 在使用中設(shè)備異常斷開,InterceptHandler中的onConnectionLost()。經(jīng)過調(diào)試發(fā)現(xiàn)是MoquetteIdleTimeoutHandler中的代碼導(dǎo)致的,代碼...

    joyqi 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<