国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

CountDownLatch + Callbale+FutureTask 實現(xiàn)異步變同步調(diào)用

張金寶 / 3726人閱讀

摘要:背景通過接口實現(xiàn)調(diào)用發(fā)送數(shù)據(jù),接口返回值為發(fā)送數(shù)據(jù)的對應(yīng)結(jié)果。接口為同步阻塞,為異步回調(diào)方式。接收數(shù)據(jù)回調(diào)接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時設(shè)置結(jié)果返回給調(diào)用者

背景

通過HTTP接口實現(xiàn)調(diào)用MQTT Client發(fā)送數(shù)據(jù),HTTP接口返回值為MQTT Client發(fā)送數(shù)據(jù)的對應(yīng)結(jié)果。 HTTP接口為同步阻塞,MQTT Client 為異步回調(diào)方式。
如何實現(xiàn)在HTTP接口中調(diào)用MQTT Client發(fā)送數(shù)據(jù)后,能夠阻塞等待MQTT返回結(jié)果,然后將結(jié)果返回?

解決方法

CountDownLatch + Callbale+FutureTask

1.CountDownLatch作用

CountDownLatch實現(xiàn)在MQTT Client 發(fā)送數(shù)據(jù)后 到接收數(shù)據(jù)后這段時間的阻塞。
HTTP每次請求,新建一個CountDownLatch,然后將CountDownLatch作為值和deviceId作為KEY保存到Map中,
調(diào)用MQTT Client 發(fā)送數(shù)據(jù)后,countDownLatch.await(),進行同步等待
在MQTT Client接收數(shù)據(jù)的回調(diào)方法中更加deviceId取出CountDwonLatch然后計數(shù)減一


2.Callbale+FutureTask作用

將調(diào)用MQTT Client發(fā)送數(shù)據(jù)的過程,封裝成Callable,投遞發(fā)送任務(wù)時,通過返回的FutureTask的get()方法,
同步阻塞,直到結(jié)果返回。

關(guān)鍵代碼

1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回結(jié)果,以及將返回結(jié)果傳遞個FutureTask

    private final static ConcurrentMap countDownLatchMap = new ConcurrentHashMap<>();
    //線程池
    private final ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {
        Thread thread = new Thread(runnable, "mqtt thread");
        return thread;
    });

2.HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口

    /**
     * HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口
     * 同步阻塞
     */
    public Integer send(Long packageId, String deviceId) throws Exception {
        ......
       FutureTask futureTask = sendTask(publishDto));
       return futureTask.get()
    }

3.投遞發(fā)送MQTT指令的task方法

   /**
     * 投遞MQTT發(fā)送指令任務(wù)
     * 同步阻塞
     */ 
   private FutureTask sendTask(PublishDto publishDto) throws Exception {
        FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));
        threadPoolExecutor.execute(futureTask);
        //阻塞線程
        return futureTask;
    } 

4.封裝CountDownLatch 和 Integer的對象,用于CountDownLatch阻塞控制和返回結(jié)果

    /**
     * 封裝CountDownLatch 和 Integer
     * 用于CountDownLatch阻塞控制和返回結(jié)果
     */
    private class CountDownObj {
        private final CountDownLatch countDownLatch;
        private volatile Integer value;

        private CountDownObj(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public Integer getValue() {
            return value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }
    }

5.具體發(fā)送MQTT數(shù)據(jù)的Callbale線程Task,會新建CountDownLatch,并通過CountDownLatch.await()方法阻塞,直到MQTT回調(diào)接收到數(shù)據(jù)或者超時。

    /**
     * 發(fā)送MQTT消息的任務(wù)Callable
     */
    private class GetDatapointValueCallable implements Callable {
        private final PublishDto publishDto;

        GetDatapointValueCallable(PublishDto publishDto) {
            this.publishDto = publishDto;
        }

        @Override
        public Integer call() throws Exception {
            //mqtt client 發(fā)送數(shù)據(jù),此處具體代碼省略
            ......
            
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));
            //阻塞,超時時間3s
            countDownLatch.await(3, TimeUnit.SECONDS);
            //返回mqtt指令對應(yīng)的結(jié)果或者null
            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();
        }

    }

6.MQTT接收數(shù)據(jù)回調(diào),這里通過deviceId從MAP里面取到CountDownObj,釋放閉鎖(結(jié)束callable線程的等待)和設(shè)置MQTT返回的結(jié)果(即callable中call()返回的結(jié)果,也就是FutureTask的get()方法返回的結(jié)果)。

    /**
     * MQTT 接收數(shù)據(jù)回調(diào)
     */
    void mqttReceiveCallback(String deviceId, String datapointId, String value) {
        ......
        
        //接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時設(shè)置結(jié)果返回給調(diào)用者
        CountDownObj countDownObj=countDownLatchMap.get(deviceId);
        if(countDownObj!=null) {
            countDownObj.setValue(Integer.parseInt(value));
            countDownObj.getCountDownLatch().countDown();
        }
        
        .......
    }

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68941.html

相關(guān)文章

  • Java多線程&高并發(fā)

    摘要:線程啟動規(guī)則對象的方法先行發(fā)生于此線程的每一個動作。所以局部變量是不被多個線程所共享的,也就不會出現(xiàn)并發(fā)問題。通過獲取到數(shù)據(jù),放入當前線程處理完之后將當前線程中的信息移除。主線程必須在啟動其他線程后立即調(diào)用方法。 一、線程安全性 定義:當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式,或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行...

    SQC 評論0 收藏0
  • 40道阿里巴巴JAVA研發(fā)崗多線程面試題詳解,你能答出多少

    摘要:但是單核我們還是要應(yīng)用多線程,就是為了防止阻塞。多線程可以防止這個問題,多條線程同時運行,哪怕一條線程的代碼執(zhí)行讀取數(shù)據(jù)阻塞,也不會影響其它任務(wù)的執(zhí)行。 1、多線程有什么用?一個可能在很多人看來很扯淡的一個問題:我會用多線程就好了,還管它有什么用?在我看來,這個回答更扯淡。所謂知其然知其所以然,會用只是知其然,為什么用才是知其所以然,只有達到知其然知其所以然的程度才可以說是把一個知識點...

    lpjustdoit 評論0 收藏0
  • bat等大公司常考java多線程面試題

    摘要:典型地,和被用在等待另一個線程產(chǎn)生的結(jié)果的情形測試發(fā)現(xiàn)結(jié)果還沒有產(chǎn)生后,讓線程阻塞,另一個線程產(chǎn)生了結(jié)果后,調(diào)用使其恢復(fù)。使當前線程放棄當前已經(jīng)分得的時間,但不使當前線程阻塞,即線程仍處于可執(zhí)行狀態(tài),隨時可能再次分得時間。 1、說說進程,線程,協(xié)程之間的區(qū)別 簡而言之,進程是程序運行和資源分配的基本單位,一個程序至少有一個進程,一個進程至少有一個線程.進程在執(zhí)行過程中擁有獨立的內(nèi)存單元...

    Charlie_Jade 評論0 收藏0
  • 想進大廠?50個多線程面試題,你會多少?【后25題】(二)

    摘要:大多數(shù)待遇豐厚的開發(fā)職位都要求開發(fā)者精通多線程技術(shù)并且有豐富的程序開發(fā)調(diào)試優(yōu)化經(jīng)驗,所以線程相關(guān)的問題在面試中經(jīng)常會被提到。掌握了這些技巧,你就可以輕松應(yīng)對多線程和并發(fā)面試了。進入等待通行準許時,所提供的對象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語言一個重要的特點就...

    caozhijian 評論0 收藏0
  • java并發(fā)編程學習10--同步器--倒計時門栓

    摘要:每個工作線程在結(jié)束前將門栓計數(shù)器減一,門栓的計數(shù)變?yōu)榫捅砻鞴ぷ魍瓿伞3S梅椒ㄟf減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 【同步器 java.util.concurrent包包含幾個能幫助人們管理相互合作的線程集的類。這些機制具有為線程直間的共用集結(jié)點模式提供的‘預(yù)制功能’。如果有一個相互合作的...

    stackfing 評論0 收藏0

發(fā)表評論

0條評論

張金寶

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<