摘要:所以也很容易想到可以利用等待通知機制來實現,和上文的并發包入坑指北之阻塞隊列的類似。
前言
在面試過程中聊到并發相關的內容時,不少面試官都喜歡問這類問題:
當 N 個線程同時完成某項任務時,如何知道他們都已經執行完畢了。
這也是本次討論的話題之一,所以本篇為『并發包入坑指北』的第二篇;來聊聊常見的并發工具。
自己實現其實這類問題的核心論點都是:如何在一個線程中得知其他線程是否執行完畢。
假設現在有 3 個線程在運行,需要在主線程中得知他們的運行結果;可以分為以下幾步:
定義一個計數器為 3。
每個線程完成任務后計數減一。
一旦計數器減為 0 則通知等待的線程。
所以也很容易想到可以利用等待通知機制來實現,和上文的『并發包入坑指北』之阻塞隊列的類似。
按照這個思路自定義了一個 MultipleThreadCountDownKit 工具,構造函數如下:
考慮到并發的前提,這個計數器自然需要保證線程安全,所以采用了 AtomicInteger。
所以在初始化時需要根據線程數量來構建對象。
計數器減一當其中一個業務線程完成后需要將這個計數器減一,直到減為0為止。
/** * 線程完成后計數 -1 */ public void countDown(){ if (counter.get() <= 0){ return; } int count = this.counter.decrementAndGet(); if (count < 0){ throw new RuntimeException("concurrent error") ; } if (count == 0){ synchronized (notify){ notify.notify(); } } }
利用 counter.decrementAndGet() 來保證多線程的原子性,當減為 0 時則利用等待通知機制來 notify 其他線程。
等待所有線程完成而需要知道業務線程執行完畢的其他線程則需要在未完成之前一直處于等待狀態,直到上文提到的在計數器變為 0 時得到通知。
/** * 等待所有的線程完成 * @throws InterruptedException */ public void await() throws InterruptedException { synchronized (notify){ while (counter.get() > 0){ notify.wait(); } if (notifyListen != null){ notifyListen.notifyListen(); } } }
原理也很簡單,一旦計數器還存在時則會利用 notify 對象進行等待,直到被業務線程喚醒。
同時這里新增了一個通知接口可以自定義實現喚醒后的一些業務邏輯,后文會做演示。
并發測試主要就是這兩個函數,下面來做一個演示。
初始化了三個計數器的并發工具 MultipleThreadCountDownKit
創建了三個線程分別執行業務邏輯,完畢后執行 countDown()。
線程 3 休眠了 2s 用于模擬業務耗時。
主線程執行 await() 等待他們三個線程執行完畢。
通過執行結果可以看出主線程會等待最后一個線程完成后才會退出;從而達到了主線程等待其余線程的效果。
MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3); multipleThreadKit.setNotify(() -> LOGGER.info("三個線程完成了任務"));
也可以在初始化的時候指定一個回調接口,用于接收業務線程執行完畢后的通知。
當然和在主線程中執行這段邏輯效果是一樣的(和執行 await() 方法處于同一個線程)。
CountDownLatch當然我們自己實現的代碼沒有經過大量生產環境的驗證,所以主要的目的還是嘗試窺探官方的實現原理。
所以我們現在來看看 juc 下的 CountDownLatch 是如何實現的。
通過構造函數會發現有一個 內部類 Sync,他是繼承于 AbstractQueuedSynchronizer ;這是 Java 并發包中的基礎框架,都可以多帶帶拿來講了,所以這次重點不是它,今后我們再著重介紹。
這里就可以把他簡單理解為提供了和上文類似的一個計數器及線程通知工具就行了。countDown
其實他的核心邏輯和我們自己實現的區別不大。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
利用這個內部類的 releaseShared 方法,我們可以理解為他想要將計數器減一。
看到這里有沒有似曾相識的感覺。
沒錯,在 JDK1.7 中的 AtomicInteger 自減就是這樣實現的(利用 CAS 保證了線程安全)。
只是一旦計數器減為 0 時則會執行 doReleaseShared 喚醒其他的線程。
這里我們只需要關心紅框部分(其他的暫時不用關心,這里涉及到了 AQS 中的隊列相關),最終會調用 LockSupport.unpark 來喚醒線程;就相當于上文調用 object.notify()。
所以其實本質上還是相同的。
await其中的 await() 也是借用 Sync 對象的方法實現的。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //判斷計數器是否還未完成 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
一旦還存在未完成的線程時,則會調用 doAcquireSharedInterruptibly 進入阻塞狀態。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
同樣的由于這也是 AQS 中的方法,我們只需要關心紅框部分;其實最終就是調用了 LockSupport.park 方法,也就相當于執行了 object.wait() 。
所有的業務線程執行完畢后會在計數器減為 0 時調用 LockSupport.unpark 來喚醒線程。
等待線程一旦計數器 > 0 時則會利用 LockSupport.park 來等待喚醒。
這樣整個流程也就串起來了,它的使用方法也和上文的類似。
就不做過多介紹了。
實際案例同樣的來看一個實際案例。
在上一篇《一次分表踩坑實踐的探討》提到了對于全表掃描的情況下,需要利用多線程來提高查詢效率。
比如我們這里分為了 64 張表,計劃利用 8 個線程來分別處理這些表的數據,偽代碼如下:
CountDownLatch count = new CountDownLatch(64); ConcurrentHashMap total = new ConcurrentHashMap(); for(Integer i=0;i<=63;i++){ executor.execute(new Runnable(){ @Override public void run(){ List value = queryTable(i); total.put(value,NULL); count.countDown(); } }) ; } count.await(); System.out.println("查詢完畢");
這樣就可以實現所有數據都查詢完畢后再做統一匯總;代碼挺簡單,也好理解(當然也可以使用線程池的 API)。
總結CountDownLatch 算是 juc 中一個高頻使用的工具,學會和理解他的使用會幫助我們更容易編寫并發應用。
文中涉及到的源碼:
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java
你的點贊與分享是對我最大的支持
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74357.html
摘要:自己實現在自己實現之前先搞清楚阻塞隊列的幾個特點基本隊列特性先進先出。消費隊列空時會阻塞直到寫入線程寫入了隊列數據后喚醒消費線程。最終的隊列大小為,可見線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長一段時間以來我都發現不少開發者對 jdk 中的 J.U.C(java.util.c...
摘要:注意版本的是普通的超集,包含了所有正常版的功能,可以理解為。因為識別的還是之前的版本。安裝好以后就可以愉快地使用各種庫了。 寫在前面 之前搞樹莓派,opencv的contrib版本死活裝不上,最后用C++版本四線程編譯了一天, 浪費生命的玩意兒我明明記得之前,pip install opencv-contrib是可以安裝的......,年級大了,老了最近終于找到了一篇推文,原來是pip...
摘要:最近業務需要抽離,抽離出來的應用需要做成第三方包的形式,可以在任何也沒那么神奇,例如有些版本就沒測試版本項目中,直接安裝使用,所以這里還是需要發包到。第一次發包我是先發到環境,看下發包還是不是符合我的預期,畢竟很長時間沒發過包。 最近業務需要抽離,抽離出來的應用需要做成 Django 第三方包的形式,可以在任何 Django(也沒那么神奇,例如有些版本就沒測試)版本項目中,直接安裝使用...
摘要:模塊什么是模塊什么是模塊化玩過游戲的朋友應該知道,一把裝配完整的步槍,一般是槍身消音器倍鏡握把槍托。更重要的是,其它大部分語言都支持模塊化。這一點與規范完全不同。模塊輸出的是值的緩存,不存在動態更新。 1.模塊 1.1 什么是模塊?什么是模塊化? 玩過FPS游戲的朋友應該知道,一把裝配完整的M4步槍,一般是槍身+消音器+倍鏡+握把+槍托。 如果把M4步槍看成是一個頁面的話,那么我們可以...
閱讀 2474·2021-11-19 09:59
閱讀 1995·2019-08-30 15:55
閱讀 936·2019-08-29 13:30
閱讀 1339·2019-08-26 10:18
閱讀 3088·2019-08-23 18:36
閱讀 2390·2019-08-23 18:25
閱讀 1164·2019-08-23 18:07
閱讀 440·2019-08-23 17:15