摘要:在創建對象時,需要轉入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小于時,這些線程都會被阻塞住。當所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。
1.簡介
在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個線程同步組件CountDownLatch和CyclicBarrier。這兩個同步組件比較常用,也經常被放在一起對比。通過分析這兩個同步組件,可使我們對 Java 線程間協同有更深入的了解。同時通過分析其原理,也可使我們做到知其然,并知其所以然。
這里首先來介紹一下 CountDownLatch 的用途,CountDownLatch 允許一個或一組線程等待其他線程完成后再恢復運行。線程可通過調用await方法進入等待狀態,在其他線程調用countDown方法將計數器減為0后,處于等待狀態的線程即可恢復運行。CyclicBarrier (可循環使用的屏障)則與此不同,CyclicBarrier 允許一組線程到達屏障后阻塞住,直到最后一個線程進入到達屏障,所有線程才恢復運行。它們之間主要的區別在于喚醒等待線程的時機。CountDownLatch 是在計數器減為0后,喚醒等待線程。CyclicBarrier 是在計數器(等待線程數)增長到指定數量后,再喚醒等待線程。除此之外,兩種之間還有一些其他的差異,這個將會在后面進行說明。
在下一章中,我將會介紹一下兩者的實現原理,繼續往下看吧。
2.原理 2.1 CountDownLatch 的實現原理CountDownLatch 的同步功能是基于 AQS 實現的,CountDownLatch 使用 AQS 中的 state 成員變量作為計數器。在 state 不為0的情況下,凡是調用 await 方法的線程將會被阻塞,并被放入 AQS 所維護的同步隊列中進行等待。大致示意圖如下:
每個阻塞的線程都會被封裝成節點對象,節點之間通過 prev 和 next 指針形成同步隊列。初始情況下,隊列的頭結點是一個虛擬節點。該節點僅是一個占位符,沒什么特別的意義。每當有一個線程調用 countDown 方法,就將計數器 state--。當 state 被減至0時,隊列中的節點就會按照 FIFO 順序被喚醒,被阻塞的線程即可恢復運行。
CountDownLatch 本身的原理并不難理解,不過如果大家想深入理解 CountDownLatch 的實現細節,那么需要先去學習一下 AQS 的相關原理。CountDownLatch 是基于 AQS 實現的,所以理解 AQS 是學習 CountDownLatch 的前置條件。我在之前寫過一篇關于 AQS 的文章 Java 重入鎖 ReentrantLock 原理分析,有興趣的朋友可以去讀一讀。
2.2 CyclicBarrier 的實現原理與 CountDownLatch 的實現方式不同,CyclicBarrier 并沒有直接通過 AQS 實現同步功能,而是在重入鎖 ReentrantLock 的基礎上實現的。在 CyclicBarrier 中,線程訪問 await 方法需先獲取鎖才能訪問。在最后一個線程訪問 await 方法前,其他線程進入 await 方法中后,會調用 Condition 的 await 方法進入等待狀態。在最后一個線程進入 CyclicBarrier await 方法后,該線程將會調用 Condition 的 signalAll 方法喚醒所有處于等待狀態中的線程。同時,最后一個進入 await 的線程還會重置 CyclicBarrier 的狀態,使其可以重復使用。
在創建 CyclicBarrier 對象時,需要轉入一個值,用于初始化 CyclicBarrier 的成員變量 parties,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小于 parties 時,這些線程都會被阻塞住。當最后一個線程到達屏障后,此前被阻塞的線程才會被喚醒。
3.源碼分析通過前面簡單的分析,相信大家對 CountDownLatch 和 CyclicBarrier 的原理有一定的了解了。那么接下來趁熱打鐵,我們一起探索一下這兩個同步組件的具體實現吧。
3.1 CountDownLatch 源碼分析CountDownLatch 的原理不是很復雜,所以在具體的實現上,也不是很復雜。當然,前面說過 CountDownLatch 是基于 AQS 實現的,AQS 的實現則要復雜的多。不過這里僅要求大家掌握 AQS 的基本原理,知道它內部維護了一個同步隊列,同步隊列中的線程會按照 FIFO 依次獲取同步狀態就行了。好了,下面我們一起去看一下 CountDownLatch 的源碼吧。
3.1.1 源碼結構CountDownLatch 的代碼量不大,加上注釋也不過300多行,所以它的代碼結構也會比較簡單。如下:
如上圖,CountDownLatch 源碼包含一個構造方法和一個私有成員變量,以及數個普通方法和一個重要的靜態內部類 Sync。CountDownLatch 的主要邏輯都是封裝在 Sync 和其父類 AQS 里的。所以分析 CountDownLatch 的源碼,本質上是分析 Sync 和 AQS 的原理。相關的分析,將會在下一節中展開,本節先說到這。
3.1.2 構造方法及成員變量本節來分析一下 CountDownLatch 的構造方法和其 Sync 類型的成員變量實現,如下:
public class CountDownLatch { private final Sync sync; /** CountDownLatch 的構造方法,該方法要求傳入大于0的整型數值作為計數器 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化 Sync this.sync = new Sync(count); } /** CountDownLatch 的同步控制器,繼承自 AQS */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 設置 AQS state setState(count); } int getCount() { return getState(); } /** 嘗試在共享狀態下獲取同步狀態,該方法在 AQS 中是抽象方法,這里進行了覆寫 */ protected int tryAcquireShared(int acquires) { /* * 如果 state = 0,則返回1,表明可獲取同步狀態, * 此時線程調用 await 方法時就不會被阻塞。 */ return (getState() == 0) ? 1 : -1; } /** 嘗試在共享狀態下釋放同步狀態,該方法在 AQS 中也是抽象方法 */ protected boolean tryReleaseShared(int releases) { /* * 下面的邏輯是將 state--,state 減至0時,調用 await 等待的線程會被喚醒。 * 這里使用循環 + CAS,表明會存在競爭的情況,也就是多個線程可能會同時調用 * countDown 方法。在 state 不為0的情況下,線程調用 countDown 是必須要完 * 成 state-- 這個操作。所以這里使用了循環 + CAS,確保 countDown 方法可正 * 常運行。 */ for (;;) { // 獲取 state int c = getState(); if (c == 0) return false; int nextc = c-1; // 使用 CAS 設置新的 state 值 if (compareAndSetState(c, nextc)) return nextc == 0; } } } }
需要說明的是,Sync 中的 tryAcquireShared 和 tryReleaseShared 方法并不是直接給 await 和 countDown 方法調用了的,這兩個方法以“try”開頭的方法最終會在 AQS 中被調用。
3.1.3 awaitCountDownLatch中有兩個版本的 await 方法,一個響應中斷,另一個在此基礎上增加了超時功能。本節將分析無超時功能的 await,如下:
/** * 該方法會使線程進入等待狀態,直到計數器減至0,或者線程被中斷。當計數器為0時,調用 * 此方法將會立即返回,不會被阻塞住。 */ public void await() throws InterruptedException { // 調用 AQS 中的 acquireSharedInterruptibly 方法 sync.acquireSharedInterruptibly(1); } /** 帶有超時功能的 await */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } +--- AbstractQueuedSynchronizer public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 若線程被中斷,則直接拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); // 調用 Sync 中覆寫的 tryAcquireShared 方法,嘗試獲取同步狀態 if (tryAcquireShared(arg) < 0) /* * 若 tryAcquireShared 小于0,則表示獲取同步狀態失敗, * 此時將線程放入 AQS 的同步隊列中進行等待。 */ doAcquireSharedInterruptibly(arg); }
從上面的代碼中可以看出,CountDownLatch await 方法實際上調用的是 AQS 的 acquireSharedInterruptibly 方法。該方法會在內部調用 Sync 所覆寫的 tryAcquireShared 方法。在 state != 0時,tryAcquireShared 返回值 -1。此時線程將進入 doAcquireSharedInterruptibly 方法中,在此方法中,線程會被放入同步隊列中進行等待。若 state = 0,此時 tryAcquireShared 返回1,acquireSharedInterruptibly 會直接返回。此時調用 await 的線程也不會被阻塞住。
3.1.4 countDown與 await 方法一樣,countDown 實際上也是對 AQS 方法的一層封裝。具體的實現如下:
/** 該方法的作用是將計數器進行自減操作,當計數器為0時,喚醒正在同步隊列中等待的線程 */ public void countDown() { // 調用 AQS 中的 releaseShared 方法 sync.releaseShared(1); } +--- AbstractQueuedSynchronizer public final boolean releaseShared(int arg) { // 調用 Sync 中的 tryReleaseShared 嘗試釋放同步狀態 if (tryReleaseShared(arg)) { /* * tryReleaseShared 返回 true 時,表明 state = 0,即計數器為0。此時調用 * doReleaseShared 方法喚醒正在同步隊列中等待的線程 */ doReleaseShared(); return true; } return false; }
以上就是 countDown 的源碼分析,不是很難懂,這里就不啰嗦了。
3.2 CyclicBarrier 源碼分析 3.2.1 源碼結構如前面所說,CyclicBarrier 是基于重入鎖 ReentrantLock 實現相關邏輯的。所以要弄懂 CyclicBarrier 的源碼,僅需有 ReentrantLock 相關的背景知識即可。關于重入鎖 ReentrantLock 方面的知識,有興趣的朋友可以參考我之前寫的文章 Java 重入鎖 ReentrantLock 原理分析。下面看一下 CyclicBarrier 的代碼結構吧,如下:
從上圖可以看出,CyclicBarrier 包含了一個靜態內部類Generation、數個方法和一些成員變量。結構上比 CountDownLatch 略為復雜一些,但總體仍比較簡單。好了,接下來進入源碼分析部分吧。
3.2.2 構造方法及成員變量CyclicBarrier 包含兩個有參構造方法,分別如下:
/** 創建一個允許 parties 個線程通行的屏障 */ public CyclicBarrier(int parties) { this(parties, null); } /** * 創建一個允許 parties 個線程通行的屏障,若 barrierAction 回調對象不為 null, * 則在最后一個線程到達屏障后,執行相應的回調邏輯 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
上面的第二個構造方法初始化了一些成員變量,下面我們就來說明一下這些成員變量的作用。
成員變量 | 作用 |
---|---|
parties | 線程數,即當?parties 個線程到達屏障后,屏障才會放行 |
count | 計數器,當 count > 0 時,到達屏障的線程會進入等待狀態。當最后一個線程到達屏障后,count 自減至0。最后一個到達的線程會執行回調方法,并喚醒其他處于等待狀態中的線程。 |
barrierCommand | 回調對象,如果不為 null,會在第?parties 個線程到達屏障后被執行 |
除了上面幾個成員變量,還有一個成員變量需要說明一下,如下:
/** * CyclicBarrier 是可循環使用的屏障,這里使用 Generation 記錄當前輪次 CyclicBarrier * 的運行狀態。當所有線程到達屏障后,generation 將會被更新,表示 CyclicBarrier 進入新一 * 輪的運行輪次中。 */ private Generation generation = new Generation(); private static class Generation { // 用于記錄屏障有沒有被破壞 boolean broken = false; }3.2.3 await
上一節所提到的幾個成員變量,在 await 方法中將會悉數登場。下面就來分析一下 await 方法的試下,如下:
public int await() throws InterruptedException, BrokenBarrierException { try { // await 的邏輯封裝在 dowait 中 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { final Generation g = generation; // 如果 g.broken = true,表明屏障被破壞了,這里直接拋出異常 if (g.broken) throw new BrokenBarrierException(); // 如果線程中斷,則調用 breakBarrier 破壞屏障 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } /* * index 表示線程到達屏障的順序,index = parties - 1 表明當前線程是第一個 * 到達屏障的。index = 0,表明當前線程是最有一個到達屏障的。 */ int index = --count; // 當 index = 0 時,喚醒所有處于等待狀態的線程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 如果回調對象不為 null,則執行回調 if (command != null) command.run(); ranAction = true; // 重置屏障狀態,使其進入新一輪的運行過程中 nextGeneration(); return 0; } finally { // 若執行回調的過程中發生異常,此時調用 breakBarrier 破壞屏障 if (!ranAction) breakBarrier(); } } // 線程運行到此處的線程都會被屏障擋住,并進入等待狀態。 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { /* * 若下面的條件成立,則表明本輪運行還未結束。此時調用 breakBarrier * 破壞屏障,喚醒其他線程,并拋出異常 */ if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { /* * 若上面的條件不成立,則有兩種可能: * 1. g != generation * 此種情況下,表明循環屏障的第 g 輪次的運行已經結束,屏障已經 * 進入了新的一輪運行輪次中。當前線程在稍后返回 到達屏障 的順序即可 * * 2. g = generation 但 g.broken = true * 此種情況下,表明已經有線程執行過 breakBarrier 方法了,當前 * 線程則會在稍后拋出 BrokenBarrierException */ Thread.currentThread().interrupt(); } } // 屏障被破壞,則拋出 BrokenBarrierException 異常 if (g.broken) throw new BrokenBarrierException(); // 屏障進入新的運行輪次,此時返回線程在上一輪次到達屏障的順序 if (g != generation) return index; // 超時判斷 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** 開啟新的一輪運行過程 */ private void nextGeneration() { // 喚醒所有處于等待狀態中的線程 trip.signalAll(); // 重置 count count = parties; // 重新創建 Generation,表明進入循環屏障進入新的一輪運行輪次中 generation = new Generation(); } /** 破壞屏障 */ private void breakBarrier() { // 設置屏障是否被破壞標志 generation.broken = true; // 重置 count count = parties; // 喚醒所有處于等待狀態中的線程 trip.signalAll(); }3.2.4 reset
reset 方法用于強制重置屏障,使屏障進入新一輪的運行過程中。代碼如下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 破壞屏障 breakBarrier(); // break the current generation // 開啟新一輪的運行過程 nextGeneration(); // start a new generation } finally { lock.unlock(); } }
reset 方法并不復雜,沒什么好講的。CyclicBarrier 中還有其他一些方法,均不復雜,這里就不一一分析了。
4.兩者區別看完上面的分析,相信大家對著兩個同步組件有了更深入的認識。那么下面趁熱打鐵,簡單對比一下兩者之間的區別。這里用一個表格列舉一下:
差異點 | CountDownLatch | CyclicBarrier |
---|---|---|
等待線程喚醒時機 | 計數器減至0時,喚醒等待線程 | 到達屏障的線程數達到 parties 時,喚醒等待線程 |
是否可循環使用 | 否 | 是 |
是否可設置回調 | 否 | 是 |
除了上面列舉的差異點,還有一些其他方面的差異,這里就不一一列舉了。
5.總結分析完 CountDownLatch 和 CyclicBarrier,不知道大家有什么感覺。我個人的感覺是這兩個類的源碼并不復雜,比較好理解。當然,前提是建立在對 AQS 以及 ReentrantLock 有較深的理解之上。所以在學習這兩個類的源碼時,還是建議大家先看看前置知識。
好了,本文到這里就結束了。謝謝閱讀,再見。
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處注明出處
作者:coolblog
本文同步發布在我的個人博客:http://www.coolblog.xyz
本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69338.html
摘要:今天給大家總結一下,面試中出鏡率很高的幾個多線程面試題,希望對大家學習和面試都能有所幫助。指令重排在單線程環境下不會出先問題,但是在多線程環境下會導致一個線程獲得還沒有初始化的實例。使用可以禁止的指令重排,保證在多線程環境下也能正常運行。 下面最近發的一些并發編程的文章匯總,通過閱讀這些文章大家再看大廠面試中的并發編程問題就沒有那么頭疼了。今天給大家總結一下,面試中出鏡率很高的幾個多線...
摘要:當位玩家角色都選擇完畢后,開始進入游戲。進入游戲時需要加載相關的數據,待全部玩家都加載完畢后正式開始游戲。 showImg(https://segmentfault.com/img/remote/1460000016414941?w=640&h=338); CyclicBarrier是java.util.concurrent包下面的一個工具類,字面意思是可循環使用(Cyclic)的屏障...
摘要:前言之前學多線程的時候沒有學習線程的同步工具類輔助類。而其它線程完成自己的操作后,調用使計數器減。信號量控制一組線程同時執行。 前言 之前學多線程的時候沒有學習線程的同步工具類(輔助類)。ps:當時覺得暫時用不上,認為是挺高深的知識點就沒去管了.. 在前幾天,朋友發了一篇比較好的Semaphore文章過來,然后在瀏覽博客的時候又發現面試還會考,那還是挺重要的知識點。于是花了點時間去了解...
摘要:倒計時鎖,線程中調用使進程進入阻塞狀態,當達成指定次數后通過繼續執行每個線程中剩余的內容。實現分階段的的功能測試代碼拿客網站群三產創建于年月日。 同步器 為每種特定的同步問題提供了解決方案 Semaphore Semaphore【信號標;旗語】,通過計數器控制對共享資源的訪問。 測試類: package concurrent; import concurrent.th...
閱讀 2649·2019-08-30 15:52
閱讀 3596·2019-08-29 17:02
閱讀 1844·2019-08-29 13:00
閱讀 922·2019-08-29 11:07
閱讀 3238·2019-08-27 10:53
閱讀 1770·2019-08-26 13:43
閱讀 1016·2019-08-26 10:22
閱讀 1332·2019-08-23 18:06