摘要:對于,我們僅僅需要關心兩個方法,一個是方法,另一個是方法。首先,我們來看方法,它代表線程阻塞,等待的值減為。首先,的源碼實現和大相徑庭,基于的共享模式的使用,而基于來實現。
前言
本文先用 CountDownLatch 將共享模式說清楚,然后順著把其他 AQS 相關的類 CyclicBarrier、Semaphore 的源碼一起過一下。
CountDownLatchCountDownLatch 這個類是比較典型的 AQS 的共享模式的使用,這是一個高頻使用的類。latch 的中文意思是門栓、柵欄,具體怎么解釋我就不廢話了,大家隨意,看兩個例子就知道在哪里用、怎么用了。
使用例子
我們看下 Doug Lea 在 java doc 中給出的例子,這個例子非常實用,我們經常會寫這個代碼。
假設我們有 N ( N > 0 ) 個任務,那么我們會用 N 來初始化一個 CountDownLatch,然后將這個 latch 的引用傳遞到各個線程中,在每個線程完成了任務后,調用 latch.countDown() 代表完成了一個任務。
調用 latch.await() 的方法的線程會阻塞,直到所有的任務完成。
class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = Executors.newFixedThreadPool(8); // 創(chuàng)建 N 個任務,提交給線程池來執(zhí)行 for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); // 等待所有的任務完成,這個方法才會返回 doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); // 這個線程的任務完成了,調用 countDown 方法 doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
所以說 CountDownLatch 非常實用,我們常常會將一個比較大的任務進行拆分,然后開啟多個線程來執(zhí)行,等所有線程都執(zhí)行完了以后,再往下執(zhí)行其他操作。這里例子中,只有 main 線程調用了 await 方法。
我們再來看另一個例子,這個例子很典型,用了兩個 CountDownLatch:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); // 這邊插入一些代碼,確保上面的每個線程先啟動起來,才執(zhí)行下面的代碼。 doSomethingElse(); // don"t let run yet // 因為這里 N == 1,所以,只要調用一次,那么所有的 await 方法都可以通過 startSignal.countDown(); // let all threads proceed doSomethingElse(); // 等待所有任務結束 doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { // 為了讓所有線程同時開始任務,我們讓所有線程先阻塞在這里 // 等大家都準備好了,再打開這個門栓 startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
這個例子中,doneSignal 同第一個例子的使用,我們說說這里的 startSignal。N 個新開啟的線程都調用了startSignal.await() 進行阻塞等待,它們阻塞在柵欄上,只有當條件滿足的時候(startSignal.countDown()),它們才能同時通過這個柵欄。如果始終只有一個線程調用 await 方法等待任務完成,那么 CountDownLatch 就會簡單很多,所以之后的源碼分析讀者一定要在腦海中構建出這么一個場景:有 m 個線程是做任務的,有 n 個線程在某個柵欄上等待這 m 個線程做完任務,直到所有 m 個任務完成后,n 個線程同時通過柵欄。
源碼分析
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 老套路了,內部封裝一個 Sync 類繼承自 AQS private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { // 這樣就 state == count 了 setState(count); } ... }
代碼都是套路,先分析套路:AQS 里面的 state 是一個整數值,這邊用一個 int count 參數其實初始化就是設置了這個值,所有調用了 await 方法的等待線程會掛起,然后有其他一些線程會做 state = state - 1 操作,當 state 減到 0 的同時,那個線程會負責喚醒調用了 await 方法的所有線程。都是套路啊,只是 Doug Lea 的套路很深,代碼很巧妙,不然我們也沒有要分析源碼的必要。
對于 CountDownLatch,我們僅僅需要關心兩個方法,一個是 countDown() 方法,另一個是 await() 方法。countDown() 方法每次調用都會將 state 減 1,直到 state 的值為 0;而 await 是一個阻塞方法,當 state 減為 0 的時候,await 方法才會返回。await 可以被多個線程調用,讀者這個時候腦子里要有個圖:所有調用了 await 方法的線程阻塞在 AQS 的阻塞隊列中,等待條件滿足(state == 0),將線程從隊列中一個個喚醒過來。
我們用以下程序來分析源碼,t1 和 t2 負責調用 countDown() 方法,t3 和 t4 調用 await 方法阻塞:
public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // 休息 5 秒后(模擬線程工作了 5 秒),調用 countDown() latch.countDown(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException ignore) { } // 休息 10 秒后(模擬線程工作了 10 秒),調用 countDown() latch.countDown(); } }, "t2"); t1.start(); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 減為 0 latch.await(); System.out.println("線程 t3 從 await 中返回了"); } catch (InterruptedException e) { System.out.println("線程 t3 await 被中斷"); Thread.currentThread().interrupt(); } } }, "t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 減為 0 latch.await(); System.out.println("線程 t4 從 await 中返回了"); } catch (InterruptedException e) { System.out.println("線程 t4 await 被中斷"); Thread.currentThread().interrupt(); } } }, "t4"); t3.start(); t4.start(); } }
上述程序,大概在過了 10 秒左右的時候,會輸出:
線程 t3 從 await 中返回了 線程 t4 從 await 中返回了 // 這兩條輸出,順序不是絕對的 // 后面的分析,我們假設 t3 先進入阻塞隊列
接下來,我們按照流程一步一步走:先 await 等待,然后被喚醒,await 方法返回。
首先,我們來看 await() 方法,它代表線程阻塞,等待 state 的值減為 0。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 這也是老套路了,我在第二篇的中斷那一節(jié)說過了 if (Thread.interrupted()) throw new InterruptedException(); // t3 和 t4 調用 await 的時候,state 都大于 0。 // 也就是說,這個 if 返回 true,然后往里看 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 只有當 state == 0 的時候,這個方法才會返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
從方法名我們就可以看出,這個方法是獲取共享鎖,并且此方法是可中斷的(中斷的時候拋出 InterruptedException 退出這個方法)。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入隊 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 同上,只要 state 不等于 0,那么這個方法返回 -1 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
我們再一步步看具體的流程。首先,我們看 countDown() 方法:
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 只有當 state 減為 0 的時候,tryReleaseShared 才返回 true // 否則只是簡單的 state = state - 1 那么 countDown 方法就結束了 if (tryReleaseShared(arg)) { // 喚醒 await 的線程 doReleaseShared(); return true; } return false; } // 這個方法很簡單,用自旋的方法實現 state 減 1 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
countDown 方法就是每次調用都將 state 值減 1,如果 state 減到 0 了,那么就調用下面的方法進行喚醒阻塞隊列中的線程:
// 調用這個方法的時候,state == 0 // 這個方法先不要看所有的代碼,按照思路往下到我寫注釋的地方,其他的之后還會仔細分析 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t3 入隊的時候,已經將頭節(jié)點的 waitStatus 設置為 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是這里,喚醒 head 的后繼節(jié)點,也就是阻塞隊列中的第一個節(jié)點 // 在這里,也就是喚醒 t3 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
一旦 t3 被喚醒后,我們繼續(xù)回到 await 的這段代碼,parkAndCheckInterrupt 返回,我們先不考慮中斷的情況:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 2. 這里是下一步 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 1. 喚醒后這個方法返回 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
接下來,t3 會進到 setHeadAndPropagate(node, r) 這個方法,先把 head 給占了,然后喚醒隊列中其他的線程:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 下面說的是,喚醒當前 node 之后的節(jié)點,即 t3 已經醒了,馬上喚醒 t4 // 類似的,如果 t4 后面還有 t5,那么 t4 醒了以后,馬上將 t5 給喚醒了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 又是這個方法,只是現在的 head 已經不是原來的空節(jié)點了,是 t3 的節(jié)點了 doReleaseShared(); } }
又回到這個方法了,那么接下來,我們好好分析 doReleaseShared 這個方法,我們根據流程,頭節(jié)點 head 此時是 t3 節(jié)點了:
// 調用這個方法的時候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; // 1. h == null: 說明阻塞隊列為空 // 2. h == tail: 說明頭結點可能是剛剛初始化的頭節(jié)點, // 或者是普通線程節(jié)點,但是此節(jié)點既然是頭節(jié)點了,那么代表已經被喚醒了,阻塞隊列沒有其他節(jié)點了 // 所以這兩種情況不需要進行喚醒后繼節(jié)點 if (h != null && h != tail) { int ws = h.waitStatus; // t4 將頭節(jié)點(此時是 t3)的 waitStatus 設置為 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { // 這里 CAS 失敗的場景請看下面的解讀 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 就是這里,喚醒 head 的后繼節(jié)點,也就是阻塞隊列中的第一個節(jié)點 // 在這里,也就是喚醒 t4 unparkSuccessor(h); } else if (ws == 0 && // 這個 CAS 失敗的場景是:執(zhí)行到這里的時候,剛好有一個節(jié)點入隊,入隊會將這個 ws 設置為 -1 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果到這里的時候,前面喚醒的線程已經占領了 head,那么再循環(huán) // 否則,就是 head 沒變,那么退出循環(huán), // 退出循環(huán)是不是意味著阻塞隊列中的其他節(jié)點就不喚醒了?當然不是,喚醒的線程之后還是會調用這個方法的 if (h == head) // loop if head changed break; } }
我們分析下最后一個 if 語句,然后才能解釋第一個 CAS 為什么可能會失敗:
h == head:說明頭節(jié)點還沒有被剛剛用 unparkSuccessor 喚醒的線程(這里可以理解為 t4)占有,此時 break 退出循環(huán)。
h != head:頭節(jié)點被剛剛喚醒的線程(這里可以理解為 t4)占有,那么這里重新進入下一輪循環(huán),喚醒下一個節(jié)點(這里是 t4 )。我們知道,等到 t4 被喚醒后,其實是會主動喚醒 t5、t6、t7...,那為什么這里要進行下一個循環(huán)來喚醒 t5 呢?我覺得是出于吞吐量的考慮。
滿足上面的 2 的場景,那么我們就能知道為什么上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 會失敗了?
因為當前進行 for 循環(huán)的線程到這里的時候,可能剛剛喚醒的線程 t4 也剛剛好到這里了,那么就有可能 CAS 失敗了。
for 循環(huán)第一輪的時候會喚醒 t4,t4 醒后會將自己設置為頭節(jié)點,如果在 t4 設置頭節(jié)點后,for 循環(huán)才跑到 if (h == head),那么此時會返回 false,for 循環(huán)會進入下一輪。t4 喚醒后也會進入到這個方法里面,那么 for 循環(huán)第二輪和 t4 就有可能在這個 CAS 相遇,那么就只會有一個成功了。
CyclicBarrier字面意思是“可重復使用的柵欄”,CyclicBarrier 相比 CountDownLatch 來說,要簡單很多,其源碼沒有什么高深的地方,它是 ReentrantLock 和 Condition 的組合使用。看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重復使用(Cyclic)。
首先,CyclicBarrier 的源碼實現和 CountDownLatch 大相徑庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 來實現。
因為 CyclicBarrier 的源碼相對來說簡單許多,讀者只要熟悉了前面關于 Condition 的分析,那么這里的源碼是毫無壓力的,就是幾個特殊概念罷了。
廢話結束,先上基本屬性和構造方法:
public class CyclicBarrier { // 我們說了,CyclicBarrier 是可以重復使用的,我們把每次從開始使用到穿過柵欄當做"一代" private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); // CyclicBarrier 是基于 Condition 的 // Condition 是“條件”的意思,CyclicBarrier 的等待線程通過 barrier 的“條件”是大家都到了柵欄上 private final Condition trip = lock.newCondition(); // 參與的線程數 private final int parties; // 如果設置了這個,代表越過柵欄之前,要執(zhí)行相應的操作 private final Runnable barrierCommand; // 當前所處的“代” private Generation generation = new Generation(); // 還沒有到柵欄的線程數,這個值初始為 parties,然后遞減 // 還沒有到柵欄的線程數 = parties - 已經到柵欄的數量 private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
我用一圖來描繪下 CyclicBarrier 里面的一些概念:
看圖我們也知道了,CyclicBarrier 的源碼最重要的就是 await() 方法了。
首先,先看怎么開啟新的一代:
// 開啟新的一代,當最后一個線程到達柵欄上的時候,調用這個方法來喚醒其他線程,同時初始化“下一代” private void nextGeneration() { // 首先,需要喚醒所有的在柵欄上等待的線程 trip.signalAll(); // 更新 count 的值 count = parties; // 重新生成“新一代” generation = new Generation(); }
看看怎么打破一個柵欄:
private void breakBarrier() { // 設置狀態(tài) broken 為 true generation.broken = true; // 重置 count 為初始值 parties count = parties; // 喚醒所有已經在等待的線程 trip.signalAll(); }
這兩個方法之后用得到,現在開始分析最重要的等待通過柵欄方法 await 方法:
// 不帶超時機制 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // 帶超時機制,如果超時拋出 TimeoutException 異常 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
繼續(xù)往里看:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 先要獲取到鎖,然后在 finally 中要記得釋放鎖 // 如果記得 Condition 部分的話,我們知道 condition 的 await 會釋放鎖,signal 的時候需要重新獲取鎖 lock.lock(); try { final Generation g = generation; // 檢查柵欄是否被打破,如果被打破,拋出 BrokenBarrierException 異常 if (g.broken) throw new BrokenBarrierException(); // 檢查中斷狀態(tài),如果中斷了,拋出 InterruptedException 異常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // index 是這個 await 方法的返回值 // 注意到這里,這個是從 count 遞減后得到的值 int index = --count; // 如果等于 0,說明所有的線程都到柵欄上了,準備通過 if (index == 0) { // tripped boolean ranAction = false; try { // 如果在初始化的時候,指定了通過柵欄前需要執(zhí)行的操作,在這里會得到執(zhí)行 final Runnable command = barrierCommand; if (command != null) command.run(); // 如果 ranAction 為 true,說明執(zhí)行 command.run() 的時候,沒有發(fā)生異常退出的情況 ranAction = true; // 喚醒等待的線程,然后開啟新的一代 nextGeneration(); return 0; } finally { if (!ranAction) // 進到這里,說明執(zhí)行指定操作的時候,發(fā)生了異常,那么需要打破柵欄 // 之前我們說了,打破柵欄意味著喚醒所有等待的線程,設置 broken 為 true,重置 count 為 parties breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 如果是最后一個線程調用 await,那么上面就返回了 // 下面的操作是給那些不是最后一個到達柵欄的線程執(zhí)行的 for (;;) { try { // 如果帶有超時機制,調用帶超時的 Condition 的 await 方法等待,直到最后一個線程調用 await if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果到這里,說明等待的線程在 await(是 Condition 的 await)的時候被中斷 if (g == generation && ! g.broken) { // 打破柵欄 breakBarrier(); // 打破柵欄后,重新拋出這個 InterruptedException 異常給外層調用的方法 throw ie; } else { // 到這里,說明 g != generation, 說明新的一代已經產生,即最后一個線程 await 執(zhí)行完成, // 那么此時沒有必要再拋出 InterruptedException 異常,記錄下來這個中斷信息即可 // 或者是柵欄已經被打破了,那么也不應該拋出 InterruptedException 異常, // 而是之后拋出 BrokenBarrierException 異常 Thread.currentThread().interrupt(); } } // 喚醒后,檢查柵欄是否是“破的” if (g.broken) throw new BrokenBarrierException(); // 這個 for 循環(huán)除了異常,就是要從這里退出了 // 我們要清楚,最后一個線程在執(zhí)行完指定任務(如果有的話),會調用 nextGeneration 來開啟一個新的代 // 然后釋放掉鎖,其他線程從 Condition 的 await 方法中得到鎖并返回,然后到這里的時候,其實就會滿足 g != generation 的 // 那什么時候不滿足呢?barrierCommand 執(zhí)行過程中拋出了異常,那么會執(zhí)行打破柵欄操作, // 設置 broken 為true,然后喚醒這些線程。這些線程會從上面的 if (g.broken) 這個分支拋 BrokenBarrierException 異常返回 // 當然,還有最后一種可能,那就是 await 超時,此種情況不會從上面的 if 分支異常返回,也不會從這里返回,會執(zhí)行后面的代碼 if (g != generation) return index; // 如果醒來發(fā)現超時了,打破柵欄,拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
好了,我想我應該講清楚了吧,我好像幾乎沒有漏掉任何一行代碼吧?
下面開始收尾工作。
首先,我們看看怎么得到有多少個線程到了柵欄上,處于等待狀態(tài):
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
判斷一個柵欄是否被打破了,這個很簡單,直接看 broken 的值即可:
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
前面我們在說 await 的時候也幾乎說清楚了,什么時候柵欄會被打破,總結如下:
1.中斷,我們說了,如果某個等待的線程發(fā)生了中斷,那么會打破柵欄,同時拋出 InterruptedException 異常;
2.超時,打破柵欄,同時拋出 TimeoutException 異常;
3.指定執(zhí)行的操作拋出了異常,這個我們前面也說過。
最后,我們來看看怎么重置一個柵欄:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
我們設想一下,如果初始化時,指定了線程 parties = 4,前面有 3 個線程調用了 await 等待,在第 4 個線程調用 await 之前,我們調用 reset 方法,那么會發(fā)生什么?
首先,打破柵欄,那意味著所有等待的線程(3個等待的線程)會喚醒,await 方法會通過拋出 BrokenBarrierException 異常返回。然后開啟新的一代,重置了 count 和 generation,相當于一切歸零了。
怎么樣,CyclicBarrier 源碼很簡單吧。
Semaphore有了 CountDownLatch 的基礎后,分析 Semaphore 會簡單很多。Semaphore 是什么呢?它類似一個資源池(讀者可以類比線程池),每個線程需要調用 acquire() 方法獲取資源,然后才能執(zhí)行,執(zhí)行完后,需要 release 資源,讓給其他的線程用。
大概大家也可以猜到,Semaphore 其實也是 AQS 中共享鎖的使用,因為每個線程共享一個池嘛。
套路解讀:創(chuàng)建 Semaphore 實例的時候,需要一個參數 permits,這個基本上可以確定是設置給 AQS 的 state 的,然后每個線程調用 acquire 的時候,執(zhí)行 state = state - 1,release 的時候執(zhí)行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他線程 release。
構造方法:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
這里和 ReentrantLock 類似,用了公平策略和非公平策略。
看 acquire 方法:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
這幾個方法也是老套路了,大家基本都懂了吧,這邊多了兩個可以傳參的 acquire 方法,不過大家也都懂的吧,如果我們需要一次獲取超過一個的資源,會用得著這個的。
我們接下來看不拋出 InterruptedException 異常的 acquireUninterruptibly() 方法吧:
public void acquireUninterruptibly() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
前面說了,Semaphore 分公平策略和非公平策略,我們對比一下兩個 tryAcquireShared 方法:
// 公平策略: protected int tryAcquireShared(int acquires) { for (;;) { // 區(qū)別就在于是不是會先判斷是否有線程在排隊,然后才進行 CAS 減操作 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 非公平策略: protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
也是老套路了,所以從源碼分析角度的話,我們其實不太需要關心是不是公平策略還是非公平策略,它們的區(qū)別往往就那么一兩行。
我們再回到 acquireShared 方法,
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
由于 tryAcquireShared(arg) 返回小于 0 的時候,說明 state 已經小于 0 了(沒資源了),此時 acquire 不能立馬拿到資源,需要進入到阻塞隊列等待,雖然貼了很多代碼,不在乎多這點了:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這個方法我就不介紹了,線程掛起后等待有資源被 release 出來。接下來,我們就要看 release 的方法了:
// 任務介紹,釋放一個資源 public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; // 溢出,當然,我們一般也不會用這么大的數 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
tryReleaseShared 方法總是會返回 true,然后是 doReleaseShared,這個也是我們熟悉的方法了,我就貼下代碼,不分析了,這個方法用于喚醒所有的等待線程:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
Semphore 的源碼確實很簡單,基本上都是分析過的老代碼的組合使用了。
總結寫到這里,終于把 AbstractQueuedSynchronizer 基本上說完了,對于 Java 并發(fā),Doug Lea 真的是神一樣的存在。日后我們還會接觸到很多 Doug Lea 的代碼,希望我們大家都可以朝著大神的方向不斷打磨自己的技術,少一些高大上的架構,多一些實實在在的優(yōu)秀代碼吧。
(全文完)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72339.html
摘要:所有示例代碼請見下載于基本概念并發(fā)同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內存這些線程是同時存在的,每個線程都處于執(zhí)行過程中的某個狀態(tài),如果運行在多核處理器上此時,程序中的每個線程都 所有示例代碼,請見/下載于 https://github.com/Wasabi1234... showImg(https://upload-images.jians...
摘要:將屏障重置為其初始狀態(tài)。注意,在由于其他原因造成損壞之后,實行重置可能會變得很復雜此時需要使用其他方式重新同步線程,并選擇其中一個線程來執(zhí)行重置。 安全共享對象策略 1.線程限制 : 一個被線程限制的對象,由線程獨占,并且只能被占有它的線程修改2.共享只讀 : 一個共享只讀的對象,在沒有額外同步的情況下,可以被多個線程并發(fā)訪問,但是任何線程都不能修改它3.線程安全對象 : 一個線程安全...
摘要:線程啟動規(guī)則對象的方法先行發(fā)生于此線程的每一個動作。所以局部變量是不被多個線程所共享的,也就不會出現并發(fā)問題。通過獲取到數據,放入當前線程處理完之后將當前線程中的信息移除。主線程必須在啟動其他線程后立即調用方法。 一、線程安全性 定義:當多個線程訪問某個類時,不管運行時環(huán)境采用何種調度方式,或者這些線程將如何交替執(zhí)行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行...
摘要:今天給大家總結一下,面試中出鏡率很高的幾個多線程面試題,希望對大家學習和面試都能有所幫助。指令重排在單線程環(huán)境下不會出先問題,但是在多線程環(huán)境下會導致一個線程獲得還沒有初始化的實例。使用可以禁止的指令重排,保證在多線程環(huán)境下也能正常運行。 下面最近發(fā)的一些并發(fā)編程的文章匯總,通過閱讀這些文章大家再看大廠面試中的并發(fā)編程問題就沒有那么頭疼了。今天給大家總結一下,面試中出鏡率很高的幾個多線...
摘要:在創(chuàng)建對象時,需要轉入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小于時,這些線程都會被阻塞住。當所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。 1.簡介 在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個...
閱讀 967·2021-11-24 09:39
閱讀 3396·2021-10-27 14:20
閱讀 2326·2019-08-30 14:08
閱讀 3368·2019-08-29 16:34
閱讀 2182·2019-08-26 12:14
閱讀 2110·2019-08-26 11:54
閱讀 2779·2019-08-26 11:44
閱讀 2480·2019-08-26 11:38