摘要:背景通過接口實現(xiàn)調(diào)用發(fā)送數(shù)據(jù),接口返回值為發(fā)送數(shù)據(jù)的對應(yīng)結(jié)果。接口為同步阻塞,為異步回調(diào)方式。接收數(shù)據(jù)回調(diào)接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時設(shè)置結(jié)果返回給調(diào)用者
背景
通過HTTP接口實現(xiàn)調(diào)用MQTT Client發(fā)送數(shù)據(jù),HTTP接口返回值為MQTT Client發(fā)送數(shù)據(jù)的對應(yīng)結(jié)果。 HTTP接口為同步阻塞,MQTT Client 為異步回調(diào)方式。
如何實現(xiàn)在HTTP接口中調(diào)用MQTT Client發(fā)送數(shù)據(jù)后,能夠阻塞等待MQTT返回結(jié)果,然后將結(jié)果返回?
CountDownLatch + Callbale+FutureTask
1.CountDownLatch作用
CountDownLatch實現(xiàn)在MQTT Client 發(fā)送數(shù)據(jù)后 到接收數(shù)據(jù)后這段時間的阻塞。 HTTP每次請求,新建一個CountDownLatch,然后將CountDownLatch作為值和deviceId作為KEY保存到Map中, 調(diào)用MQTT Client 發(fā)送數(shù)據(jù)后,countDownLatch.await(),進行同步等待 在MQTT Client接收數(shù)據(jù)的回調(diào)方法中更加deviceId取出CountDwonLatch然后計數(shù)減一
2.Callbale+FutureTask作用
將調(diào)用MQTT Client發(fā)送數(shù)據(jù)的過程,封裝成Callable,投遞發(fā)送任務(wù)時,通過返回的FutureTask的get()方法, 同步阻塞,直到結(jié)果返回。關(guān)鍵代碼
1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回結(jié)果,以及將返回結(jié)果傳遞個FutureTask
private final static ConcurrentMapcountDownLatchMap = new ConcurrentHashMap<>(); //線程池 private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> { Thread thread = new Thread(runnable, "mqtt thread"); return thread; });
2.HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口
/** * HTTP API 調(diào)用的發(fā)送MQTT 消息數(shù)據(jù)的接口 * 同步阻塞 */ public Integer send(Long packageId, String deviceId) throws Exception { ...... FutureTaskfutureTask = sendTask(publishDto)); return futureTask.get() }
3.投遞發(fā)送MQTT指令的task方法
/** * 投遞MQTT發(fā)送指令任務(wù) * 同步阻塞 */ private FutureTasksendTask(PublishDto publishDto) throws Exception { FutureTask futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto)); threadPoolExecutor.execute(futureTask); //阻塞線程 return futureTask; }
4.封裝CountDownLatch 和 Integer的對象,用于CountDownLatch阻塞控制和返回結(jié)果
/** * 封裝CountDownLatch 和 Integer * 用于CountDownLatch阻塞控制和返回結(jié)果 */ private class CountDownObj { private final CountDownLatch countDownLatch; private volatile Integer value; private CountDownObj(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5.具體發(fā)送MQTT數(shù)據(jù)的Callbale線程Task,會新建CountDownLatch,并通過CountDownLatch.await()方法阻塞,直到MQTT回調(diào)接收到數(shù)據(jù)或者超時。
/** * 發(fā)送MQTT消息的任務(wù)Callable */ private class GetDatapointValueCallable implements Callable{ private final PublishDto publishDto; GetDatapointValueCallable(PublishDto publishDto) { this.publishDto = publishDto; } @Override public Integer call() throws Exception { //mqtt client 發(fā)送數(shù)據(jù),此處具體代碼省略 ...... CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch)); //阻塞,超時時間3s countDownLatch.await(3, TimeUnit.SECONDS); //返回mqtt指令對應(yīng)的結(jié)果或者null return countDownLatchMap.remove(publishDto.getDeviceId()).getValue(); } }
6.MQTT接收數(shù)據(jù)回調(diào),這里通過deviceId從MAP里面取到CountDownObj,釋放閉鎖(結(jié)束callable線程的等待)和設(shè)置MQTT返回的結(jié)果(即callable中call()返回的結(jié)果,也就是FutureTask的get()方法返回的結(jié)果)。
/** * MQTT 接收數(shù)據(jù)回調(diào) */ void mqttReceiveCallback(String deviceId, String datapointId, String value) { ...... //接收到數(shù)據(jù)后,通過閉鎖釋放阻塞的線程,同時設(shè)置結(jié)果返回給調(diào)用者 CountDownObj countDownObj=countDownLatchMap.get(deviceId); if(countDownObj!=null) { countDownObj.setValue(Integer.parseInt(value)); countDownObj.getCountDownLatch().countDown(); } ....... }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68941.html
摘要:線程啟動規(guī)則對象的方法先行發(fā)生于此線程的每一個動作。所以局部變量是不被多個線程所共享的,也就不會出現(xiàn)并發(fā)問題。通過獲取到數(shù)據(jù),放入當前線程處理完之后將當前線程中的信息移除。主線程必須在啟動其他線程后立即調(diào)用方法。 一、線程安全性 定義:當多個線程訪問某個類時,不管運行時環(huán)境采用何種調(diào)度方式,或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同,這個類都能表現(xiàn)出正確的行...
摘要:但是單核我們還是要應(yīng)用多線程,就是為了防止阻塞。多線程可以防止這個問題,多條線程同時運行,哪怕一條線程的代碼執(zhí)行讀取數(shù)據(jù)阻塞,也不會影響其它任務(wù)的執(zhí)行。 1、多線程有什么用?一個可能在很多人看來很扯淡的一個問題:我會用多線程就好了,還管它有什么用?在我看來,這個回答更扯淡。所謂知其然知其所以然,會用只是知其然,為什么用才是知其所以然,只有達到知其然知其所以然的程度才可以說是把一個知識點...
摘要:典型地,和被用在等待另一個線程產(chǎn)生的結(jié)果的情形測試發(fā)現(xiàn)結(jié)果還沒有產(chǎn)生后,讓線程阻塞,另一個線程產(chǎn)生了結(jié)果后,調(diào)用使其恢復(fù)。使當前線程放棄當前已經(jīng)分得的時間,但不使當前線程阻塞,即線程仍處于可執(zhí)行狀態(tài),隨時可能再次分得時間。 1、說說進程,線程,協(xié)程之間的區(qū)別 簡而言之,進程是程序運行和資源分配的基本單位,一個程序至少有一個進程,一個進程至少有一個線程.進程在執(zhí)行過程中擁有獨立的內(nèi)存單元...
摘要:大多數(shù)待遇豐厚的開發(fā)職位都要求開發(fā)者精通多線程技術(shù)并且有豐富的程序開發(fā)調(diào)試優(yōu)化經(jīng)驗,所以線程相關(guān)的問題在面試中經(jīng)常會被提到。掌握了這些技巧,你就可以輕松應(yīng)對多線程和并發(fā)面試了。進入等待通行準許時,所提供的對象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語言一個重要的特點就...
摘要:每個工作線程在結(jié)束前將門栓計數(shù)器減一,門栓的計數(shù)變?yōu)榫捅砻鞴ぷ魍瓿伞3S梅椒ㄟf減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 【同步器 java.util.concurrent包包含幾個能幫助人們管理相互合作的線程集的類。這些機制具有為線程直間的共用集結(jié)點模式提供的‘預(yù)制功能’。如果有一個相互合作的...
閱讀 2060·2019-08-30 15:52
閱讀 2450·2019-08-29 18:37
閱讀 806·2019-08-29 12:33
閱讀 2850·2019-08-29 11:04
閱讀 1546·2019-08-27 10:57
閱讀 2104·2019-08-26 13:38
閱讀 2772·2019-08-26 12:25
閱讀 2460·2019-08-26 12:23