摘要:簡介抽象隊列同步器,以下簡稱出現(xiàn)在中,由大師所創(chuàng)作。獲取成功則返回,獲取失敗,線程進入同步隊列等待。響應中斷版的超時響應中斷版的共享式獲取同步狀態(tài),同一時刻可能會有多個線程獲得同步狀態(tài)。
1.簡介
AbstractQueuedSynchronizer (抽象隊列同步器,以下簡稱 AQS)出現(xiàn)在 JDK 1.5 中,由大師 Doug Lea 所創(chuàng)作。AQS 是很多同步器的基礎框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 實現(xiàn)的。除此之外,我們還可以基于 AQS,定制出我們所需要的同步器。
AQS 的使用方式通常都是通過內部類繼承 AQS 實現(xiàn)同步功能,通過繼承 AQS,可以簡化同步器的實現(xiàn)。如前面所說,AQS 是很多同步器實現(xiàn)的基礎框架。弄懂 AQS 對理解 Java 并發(fā)包里的組件大有裨益,這也是我學習 AQS 并寫出這篇文章的緣由。另外,需要說明的是,AQS 本身并不是很好理解,細節(jié)很多。在看的過程中藥有一定的耐心,做好看多遍的準備。好了,其他的就不多說了,開始進入正題吧。
2.原理概述在 AQS 內部,通過維護一個FIFO 隊列來管理多線程的排隊工作。在公平競爭的情況下,無法獲取同步狀態(tài)的線程將會被封裝成一個節(jié)點,置于隊列尾部。入隊的線程將會通過自旋的方式獲取同步狀態(tài),若在有限次的嘗試后,仍未獲取成功,線程則會被阻塞住。大致示意圖如下:
當頭結點釋放同步狀態(tài)后,且后繼節(jié)點對應的線程被阻塞,此時頭結點
線程將會去喚醒后繼節(jié)點線程。后繼節(jié)點線程恢復運行并獲取同步狀態(tài)后,會將舊的頭結點從隊列中移除,并將自己設為頭結點。大致示意圖如下:
3.重要方法介紹本節(jié)將介紹三組重要的方法,通過使用這三組方法即可實現(xiàn)一個同步組件。
第一組方法是用于訪問/設置同步狀態(tài)的,如下:
方法 | 說明 |
---|---|
int getState() | 獲取同步狀態(tài) |
void setState() | 設置同步狀態(tài) |
boolean compareAndSetState(int expect, int update) | 通過 CAS 設置同步狀態(tài) |
第二組方需要由同步組件覆寫。如下:
方法 | 說明 |
---|---|
boolean tryAcquire(int arg) | 獨占式獲取同步狀態(tài) |
boolean tryRelease(int arg) | 獨占式釋放同步狀態(tài) |
int tryAcquireShared(int arg) | 共享式獲取同步狀態(tài) |
boolean tryReleaseShared(int arg) | 共享式私房同步狀態(tài) |
boolean isHeldExclusively() | 檢測當前線程是否獲取獨占鎖 |
第三組方法是一組模板方法,同步組件可直接調用。如下:
方法 | 說明 |
---|---|
void acquire(int arg) | 獨占式獲取同步狀態(tài),該方法將會調用 tryAcquire 嘗試獲取同步狀態(tài)。獲取成功則返回,獲取失敗,線程進入同步隊列等待。 |
void acquireInterruptibly(int arg) | 響應中斷版的 acquire |
boolean tryAcquireNanos(int arg,long nanos) | 超時+響應中斷版的?acquire |
void acquireShared(int arg) | 共享式獲取同步狀態(tài),同一時刻可能會有多個線程獲得同步狀態(tài)。比如讀寫鎖的讀鎖就是就是調用這個方法獲取同步狀態(tài)的。 |
void acquireSharedInterruptibly(int arg) | 響應中斷版的?acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 超時+響應中斷版的 acquireShared |
boolean release(int arg) | 獨占式釋放同步狀態(tài) |
boolean releaseShared(int arg) | 共享式釋放同步狀態(tài) |
上面列舉了一堆方法,看似繁雜。但稍微理一下,就會發(fā)現(xiàn)上面諸多方法無非就兩大類:一類是獨占式獲取和釋放共享狀態(tài),另一類是共享式獲取和釋放同步狀態(tài)。至于這兩類方法的實現(xiàn)細節(jié),我會在接下來的章節(jié)中講到,繼續(xù)往下看吧。
4.源碼分析 4.1 節(jié)點結構在并發(fā)的情況下,AQS 會將未獲取同步狀態(tài)的線程將會封裝成節(jié)點,并將其放入同步隊列尾部。同步隊列中的節(jié)點除了要保存線程,還要保存等待狀態(tài)。不管是獨占式還是共享式,在獲取狀態(tài)失敗時都會用到節(jié)點類。所以這里我們要先看一下節(jié)點類的實現(xiàn),為后面的源碼分析進行簡單鋪墊。源碼如下:
static final class Node { /** 共享類型節(jié)點,標記節(jié)點在共享模式下等待 */ static final Node SHARED = new Node(); /** 獨占類型節(jié)點,標記節(jié)點在獨占模式下等待 */ static final Node EXCLUSIVE = null; /** 等待狀態(tài) - 取消 */ static final int CANCELLED = 1; /** * 等待狀態(tài) - 通知。某個節(jié)點是處于該狀態(tài),當該節(jié)點釋放同步狀態(tài)后, * 會通知后繼節(jié)點線程,使之可以恢復運行 */ static final int SIGNAL = -1; /** 等待狀態(tài) - 條件等待。表明節(jié)點等待在 Condition 上 */ static final int CONDITION = -2; /** * 等待狀態(tài) - 傳播。表示無條件向后傳播喚醒動作,詳細分析請看第五章 */ static final int PROPAGATE = -3; /** * 等待狀態(tài),取值如下: * SIGNAL, * CANCELLED, * CONDITION, * PROPAGATE, * 0 * * 初始情況下,waitStatus = 0 */ volatile int waitStatus; /** * 前驅節(jié)點 */ volatile Node prev; /** * 后繼節(jié)點 */ volatile Node next; /** * 對應的線程 */ volatile Thread thread; /** * 下一個等待節(jié)點,用在 ConditionObject 中 */ Node nextWaiter; /** * 判斷節(jié)點是否是共享節(jié)點 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 獲取前驅節(jié)點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } /** addWaiter 方法會調用該構造方法 */ Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } /** Condition 中會用到此構造方法 */ Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }4.2 獨占模式分析 4.2.1 獲取同步狀態(tài)
獨占式獲取同步狀態(tài)時通過 acquire 進行的,下面來分析一下該方法的源碼。如下:
/** * 該方法將會調用子類復寫的 tryAcquire 方法獲取同步狀態(tài), * - 獲取成功:直接返回 * - 獲取失敗:將線程封裝在節(jié)點中,并將節(jié)點置于同步隊列尾部, * 通過自旋嘗試獲取同步狀態(tài)。如果在有限次內仍無法獲取同步狀態(tài), * 該線程將會被 LockSupport.park 方法阻塞住,直到被前驅節(jié)點喚醒 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** 向同步隊列尾部添加一個節(jié)點 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 嘗試以快速方式將節(jié)點添加到隊列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速插入節(jié)點失敗,調用 enq 方法,不停的嘗試插入節(jié)點 enq(node); return node; } /** * 通過 CAS + 自旋的方式插入節(jié)點到隊尾 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 設置頭結點,初始情況下,頭結點是一個空節(jié)點 if (compareAndSetHead(new Node())) tail = head; } else { /* * 將節(jié)點插入隊列尾部。這里是先將新節(jié)點的前驅設為尾節(jié)點,之后在嘗試將新節(jié)點設為尾節(jié) * 點,最后再將原尾節(jié)點的后繼節(jié)點指向新的尾節(jié)點。除了這種方式,我們還先設置尾節(jié)點, * 之后再設置前驅和后繼,即: * * if (compareAndSetTail(t, node)) { * node.prev = t; * t.next = node; * } * * 但但如果是這樣做,會導致一個問題,即短時內,隊列結構會遭到破壞。考慮這種情況, * 某個線程在調用 compareAndSetTail(t, node)成功后,該線程被 CPU 切換了。此時 * 設置前驅和后繼的代碼還沒帶的及執(zhí)行,但尾節(jié)點指針卻設置成功,導致隊列結構短時內會 * 出現(xiàn)如下情況: * * +------+ prev +-----+ +-----+ * head | | <---- | | | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * tail 節(jié)點完全脫離了隊列,這樣導致一些隊列遍歷代碼出錯。如果先設置 * 前驅,在設置尾節(jié)點。及時線程被切換,隊列結構短時可能如下: * * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * 這樣并不會影響從后向前遍歷,不會導致遍歷邏輯出錯。 * * 參考: * https://www.cnblogs.com/micrari/p/6937995.html */ node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 同步隊列中的線程在此方法中以循環(huán)嘗試獲取同步狀態(tài),在有限次的嘗試后, * 若仍未獲取鎖,線程將會被阻塞,直至被前驅節(jié)點的線程喚醒。 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 循環(huán)獲取同步狀態(tài) for (;;) { final Node p = node.predecessor(); /* * 前驅節(jié)點如果是頭結點,表明前驅節(jié)點已經獲取了同步狀態(tài)。前驅節(jié)點釋放同步狀態(tài)后, * 在不出異常的情況下, tryAcquire(arg) 應返回 true。此時節(jié)點就成功獲取了同 * 步狀態(tài),并將自己設為頭節(jié)點,原頭節(jié)點出隊。 */ if (p == head && tryAcquire(arg)) { // 成功獲取同步狀態(tài),設置自己為頭節(jié)點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* * 如果獲取同步狀態(tài)失敗,則根據(jù)條件判斷是否應該阻塞自己。 * 如果不阻塞,CPU 就會處于忙等狀態(tài),這樣會浪費 CPU 資源 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { /* * 如果在獲取同步狀態(tài)中出現(xiàn)異常,failed = true,cancelAcquire 方法會被執(zhí)行。 * tryAcquire 需同步組件開發(fā)者覆寫,難免不了會出現(xiàn)異常。 */ if (failed) cancelAcquire(node); } } /** 設置頭節(jié)點 */ private void setHead(Node node) { // 僅有一個線程可以成功獲取同步狀態(tài),所以這里不需要進行同步控制 head = node; node.thread = null; node.prev = null; } /** * 該方法主要用途是,當線程在獲取同步狀態(tài)失敗時,根據(jù)前驅節(jié)點的等待狀態(tài),決定后續(xù)的動作。比如前驅 * 節(jié)點等待狀態(tài)為 SIGNAL,表明當前節(jié)點線程應該被阻塞住了。不能老是嘗試,避免 CPU 忙等。 * ————————————————————————————————————————————————————————————————— * | 前驅節(jié)點等待狀態(tài) | 相應動作 | * ————————————————————————————————————————————————————————————————— * | SIGNAL | 阻塞 | * | CANCELLED | 向前遍歷, 移除前面所有為該狀態(tài)的節(jié)點 | * | waitStatus < 0 | 將前驅節(jié)點狀態(tài)設為 SIGNAL, 并再次嘗試獲取同步狀態(tài) | * ————————————————————————————————————————————————————————————————— */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /* * 前驅節(jié)點等待狀態(tài)為 SIGNAL,表示當前線程應該被阻塞。 * 線程阻塞后,會在前驅節(jié)點釋放同步狀態(tài)后被前驅節(jié)點線程喚醒 */ if (ws == Node.SIGNAL) return true; /* * 前驅節(jié)點等待狀態(tài)為 CANCELLED,則以前驅節(jié)點為起點向前遍歷, * 移除其他等待狀態(tài)為 CANCELLED 的節(jié)點。 */ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 等待狀態(tài)為 0 或 PROPAGATE,設置前驅節(jié)點等待狀態(tài)為 SIGNAL, * 并再次嘗試獲取同步狀態(tài)。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 調用 LockSupport.park 阻塞自己 LockSupport.park(this); return Thread.interrupted(); } /** * 取消獲取同步狀態(tài) */ private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; // 前驅節(jié)點等待狀態(tài)為 CANCELLED,則向前遍歷并移除其他為該狀態(tài)的節(jié)點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 記錄 pred 的后繼節(jié)點,后面會用到 Node predNext = pred.next; // 將當前節(jié)點等待狀態(tài)設為 CANCELLED node.waitStatus = Node.CANCELLED; /* * 如果當前節(jié)點是尾節(jié)點,則通過 CAS 設置前驅節(jié)點 prev 為尾節(jié)點。設置成功后,再利用 CAS 將 * prev 的 next 引用置空,斷開與后繼節(jié)點的聯(lián)系,完成清理工作。 */ if (node == tail && compareAndSetTail(node, pred)) { /* * 執(zhí)行到這里,表明 pred 節(jié)點被成功設為了尾節(jié)點,這里通過 CAS 將 pred 節(jié)點的后繼節(jié)點 * 設為 null。注意這里的 CAS 即使失敗了,也沒關系。失敗了,表明 pred 的后繼節(jié)點更新 * 了。pred 此時已經是尾節(jié)點了,若后繼節(jié)點被更新,則是有新節(jié)點入隊了。這種情況下,CAS * 會失敗,但失敗不會影響同步隊列的結構。 */ compareAndSetNext(pred, predNext, null); } else { int ws; // 根據(jù)條件判斷是喚醒后繼節(jié)點,還是將前驅節(jié)點和后繼節(jié)點連接到一起 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) /* * 這里使用 CAS 設置 pred 的 next,表明多個線程同時在取消,這里存在競爭。 * 不過此處沒針對 compareAndSetNext 方法失敗后做一些處理,表明即使失敗了也 * 沒關系。實際上,多個線程同時設置 pred 的 next 引用時,只要有一個能設置成 * 功即可。 */ compareAndSetNext(pred, predNext, next); } else { /* * 喚醒后繼節(jié)點對應的線程。這里簡單講一下為什么要喚醒后繼線程,考慮下面一種情況: * head node1 node2 tail * ws=0 ws=1 ws=-1 ws=0 * +------+ prev +-----+ prev +-----+ prev +-----+ * | | <---- | | <---- | | <---- | | * | | ----> | | ----> | | ----> | | * +------+ next +-----+ next +-----+ next +-----+ * * 頭結點初始狀態(tài)為 0,node1、node2 和 tail 節(jié)點依次入隊。node1 自旋過程中調用 * tryAcquire 出現(xiàn)異常,進入 cancelAcquire。head 節(jié)點此時等待狀態(tài)仍然是 0,它 * 會認為后繼節(jié)點還在運行中,所它在釋放同步狀態(tài)后,不會去喚醒后繼等待狀態(tài)為非取消的 * 節(jié)點 node2。如果 node1 再不喚醒 node2 的線程,該線程面臨無法被喚醒的情況。此 * 時,整個同步隊列就回全部阻塞住。 */ unparkSuccessor(node); } node.next = node; // help GC } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; /* * 通過 CAS 將等待狀態(tài)設為 0,讓后繼節(jié)點線程多一次 * 嘗試獲取同步狀態(tài)的機會 */ if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; /* * 這里如果 s == null 處理,是不是表明 node 是尾節(jié)點?答案是不一定。原因之前在分析 * enq 方法時說過。這里再啰嗦一遍,新節(jié)點入隊時,隊列瞬時結構可能如下: * node1 node2 * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * node2 節(jié)點為新入隊節(jié)點,此時 tail 已經指向了它,但 node1 后繼引用還未設置。 * 這里 node1 就是 node 參數(shù),s = node1.next = null,但此時 node1 并不是尾 * 節(jié)點。所以這里不能從前向后遍歷同步隊列,應該從后向前。 */ for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒 node 的后繼節(jié)點線程 LockSupport.unpark(s.thread); }
到這里,獨占式獲取同步狀態(tài)的分析就講完了。如果僅分析獲取同步狀態(tài)的大致流程,那么這個流程并不難。但若深入到細節(jié)之中,還是需要思考思考。這里對獨占式獲取同步狀態(tài)的大致流程做個總結,如下:
調用 tryAcquire 方法嘗試獲取同步狀態(tài)
獲取成功,直接返回
獲取失敗,將線程封裝到節(jié)點中,并將節(jié)點入隊
入隊節(jié)點在 acquireQueued 方法中自旋獲取同步狀態(tài)
若節(jié)點的前驅節(jié)點是頭節(jié)點,則再次調用 tryAcquire 嘗試獲取同步狀態(tài)
獲取成功,當前節(jié)點將自己設為頭節(jié)點并返回
獲取失敗,可能再次嘗試,也可能會被阻塞。這里簡單認為會被阻塞。
上面的步驟對應下面的流程圖:
上面流程圖參考自《Java并發(fā)編程》第128頁圖 5-5,這里進行了重新繪制,并做了一定的修改。
4.2.2 釋放同步狀態(tài)相對于獲取同步狀態(tài),釋放同步狀態(tài)的過程則要簡單的多,這里簡單羅列一下步驟:
調用 tryRelease(arg) 嘗試釋放同步狀態(tài)
根據(jù)條件判斷是否應該喚醒后繼線程
就兩個步驟,下面看一下源碼分析。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; /* * 這里簡單列舉條件分支的可能性,如下: * 1. head = null * head 還未初始化。初始情況下,head = null,當?shù)谝粋€節(jié)點入隊后,head 會被初始 * 為一個虛擬(dummy)節(jié)點。這里,如果還沒節(jié)點入隊就調用 release 釋放同步狀態(tài), * 就會出現(xiàn) h = null 的情況。 * * 2. head != null && waitStatus = 0 * 表明后繼節(jié)點對應的線程仍在運行中,不需要喚醒 * * 3. head != null && waitStatus < 0 * 后繼節(jié)點對應的線程可能被阻塞了,需要喚醒 */ if (h != null && h.waitStatus != 0) // 喚醒后繼節(jié)點,上面分析過了,這里不再贅述 unparkSuccessor(h); return true; } return false; }4.3 共享模式分析
與獨占模式不同,共享模式下,同一時刻會有多個線程獲取共享同步狀態(tài)。共享模式是實現(xiàn)讀寫鎖中的讀鎖、CountDownLatch 和 Semaphore 等同步組件的基礎,搞懂了,再去理解一些共享同步組件就不難了。
4.3.1 獲取同步狀態(tài)共享類型的節(jié)點獲取共享同步狀態(tài)后,如果后繼節(jié)點也是共享類型節(jié)點,當前節(jié)點則會喚醒后繼節(jié)點。這樣,多個節(jié)點線程即可同時獲取共享同步狀態(tài)。
public final void acquireShared(int arg) { // 嘗試獲取共享同步狀態(tài),tryAcquireShared 返回的是整型 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 這里和前面一樣,也是通過有限次自旋的方式獲取同步狀態(tài) for (;;) { final Node p = node.predecessor(); /* * 前驅是頭結點,其類型可能是 EXCLUSIVE,也可能是 SHARED. * 如果是 EXCLUSIVE,線程無法獲取共享同步狀態(tài)。 * 如果是 SHARED,線程則可獲取共享同步狀態(tài)。 * 能不能獲取共享同步狀態(tài)要看 tryAcquireShared 具體的實現(xiàn)。比如多個線程競爭讀寫 * 鎖的中的讀鎖時,均能成功獲取讀鎖。但多個線程同時競爭信號量時,可能就會有一部分線 * 程因無法競爭到信號量資源而阻塞。 */ if (p == head) { // 嘗試獲取共享同步狀態(tài) int r = tryAcquireShared(arg); if (r >= 0) { // 設置頭結點,如果后繼節(jié)點是共享類型,喚醒后繼節(jié)點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 這個方法做了兩件事情: * 1. 設置自身為頭結點 * 2. 根據(jù)條件判斷是否要喚醒后繼節(jié)點 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設置頭結點 setHead(node); /* * 這個條件分支由 propagate > 0 和 h.waitStatus < 0 兩部分組成。 * h.waitStatus < 0 時,waitStatus = SIGNAL 或 PROPAGATE。這里僅依賴 * 條件 propagate > 0 判斷是否喚醒后繼節(jié)點是不充分的,至于原因請參考第五章 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; /* * 節(jié)點 s 如果是共享類型節(jié)點,則應該喚醒該節(jié)點 * 至于 s == null 的情況前面分析過,這里不在贅述。 */ if (s == null || s.isShared()) doReleaseShared(); } } /** * 該方法用于在 acquires/releases 存在競爭的情況下,確保喚醒動作向后傳播。 */ private void doReleaseShared() { /* * 下面的循環(huán)在 head 節(jié)點存在后繼節(jié)點的情況下,做了兩件事情: * 1. 如果 head 節(jié)點等待狀態(tài)為 SIGNAL,則將 head 節(jié)點狀態(tài)設為 0,并喚醒后繼節(jié)點 * 2. 如果 head 節(jié)點等待狀態(tài)為 0,則將 head 節(jié)點狀態(tài)設為 PROPAGATE,保證喚醒能夠正 * 常傳播下去。關于 PROPAGATE 狀態(tài)的細節(jié)分析,后面會講到。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } /* * ws = 0 的情況下,這里要嘗試將狀態(tài)從 0 設為 PROPAGATE,保證喚醒向后 * 傳播。setHeadAndPropagate 在讀到 h.waitStatus < 0 時,可以繼續(xù)喚醒 * 后面的節(jié)點。 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
到這里,共享模式下獲取同步狀態(tài)的邏輯就分析完了,不過我這里只做了簡單分析。相對于獨占式獲取同步狀態(tài),共享式的情況更為復雜。獨占模式下,只有一個節(jié)點線程可以成功獲取同步狀態(tài),也只有獲取已同步狀態(tài)節(jié)點線程才可以釋放同步狀態(tài)。但在共享模式下,多個共享節(jié)點線程可以同時獲得同步狀態(tài),在一些線程獲取同步狀態(tài)的同時,可能還會有另外一些線程正在釋放同步狀態(tài)。所以,共享模式更為復雜。這里我的腦力跟不上了,沒法面面俱到的分析,見諒。
最后說一下共享模式下獲取同步狀態(tài)的大致流程,如下:
獲取共享同步狀態(tài)
若獲取失敗,則生成節(jié)點,并入隊
如果前驅為頭結點,再次嘗試獲取共享同步狀態(tài)
獲取成功則將自己設為頭結點,如果后繼節(jié)點是共享類型的,則喚醒
若失敗,將節(jié)點狀態(tài)設為 SIGNAL,再次嘗試。若再次失敗,線程進入等待狀態(tài)
4.3.2 釋放共享狀態(tài)釋放共享狀態(tài)主要邏輯在 doReleaseShared 中,doReleaseShared 上節(jié)已經分析過,這里就不贅述了。共享節(jié)點線程在獲取同步狀態(tài)和釋放同步狀態(tài)時都會調用 doReleaseShared,所以 doReleaseShared 是多線程競爭集中的地方。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }5.PROPAGATE 狀態(tài)存在的意義
AQS 的節(jié)點有幾種不同的狀態(tài),這個在 4.1 節(jié)介紹過。在這幾個狀態(tài)中,PROPAGATE 的用途可能是最不好理解的。網上包括一些書籍關于該狀態(tài)的敘述基本都是一句帶過,也就是 PROPAGATE 字面意義,即向后傳播喚醒動作。至于怎么傳播,鮮有資料說明過。不過,好在最終我還是找到了一篇詳細敘述了 PROPAGATE 狀態(tài)的文章。在博客園上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀 對 PROPAGATE,以及其他的一些細節(jié)進行了說明,很有深度。在欽佩之余,不由得感嘆作者思考的很深入。在征得他的同意后,我將在本節(jié)中引用他文章中對 PROPAGATE 狀態(tài)說明的部分,并進行一定的補充說明。這里感謝作者 活在夢裡 的精彩分享,若不參考他的文章,我的這篇文章內容會比較空洞。好了,其他的不多說了,繼續(xù)往下分析。
在本節(jié)中,將會說明兩個個問題,如下:
PROPAGATE 狀態(tài)用在哪里,以及怎樣向后傳播喚醒動作的?
引入 PROPAGATE 狀態(tài)是為了解決什么問題?
這兩個問題將會在下面兩節(jié)中分別進行說明。
5.1 利用 PROPAGATE 傳播喚醒動作PROPAGATE 狀態(tài)是用來傳播喚醒動作的,那么它是在哪里進行傳播的呢?答案是在setHeadAndPropagate方法中,這里再來看看 setHeadAndPropagate 方法的實現(xiàn):
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
大家注意看 setHeadAndPropagate 方法中那個長長的判斷語句,其中有一個條件是h.waitStatus < 0,當 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,這個條件就會成立。那么 PROPAGATE 狀態(tài)是在何時被設置的呢?答案是在doReleaseShared方法中,如下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {...} // 如果 ws = 0,則將 h 狀態(tài)設為 PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } ... } }
再回到 setHeadAndPropagate 的實現(xiàn),該方法既然引入了h.waitStatus < 0這個條件,就意味著僅靠條件propagate > 0判斷是否喚醒后繼節(jié)點線程的機制是不充分的。至于為啥不充分,請繼續(xù)往看下看。
5.2 引入 PROPAGATE 所解決的問題PROPAGATE 的引入是為了解決一個 BUG -- JDK-6801020,復現(xiàn)這個 BUG 的代碼如下:
import java.util.concurrent.Semaphore; public class TestSemaphore { private static Semaphore sem = new Semaphore(0); private static class Thread1 extends Thread { @Override public void run() { sem.acquireUninterruptibly(); } } private static class Thread2 extends Thread { @Override public void run() { sem.release(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10000000; i++) { Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread2(); Thread t4 = new Thread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); } } }
根據(jù) BUG 的描述消息可知 JDK 6u11,6u17 兩個版本受到影響。那么,接下來再來看看引起這個 BUG 的代碼 -- JDK 6u17 中 setHeadAndPropagate 和 releaseShared 兩個方法源碼,如下:
private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { /* * Don"t bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } // 和 release 方法的源碼基本一樣 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
下面來簡單說明 TestSemaphore 這個類的邏輯。這個類持有一個數(shù)值為 0 的信號量對象,并創(chuàng)建了4個線程,線程 t1 和 t2 用于獲取信號量,t3 和 t4 則是調用 release() 方法釋放信號量。在一般情況下,TestSemaphore 這個類的代碼都可以正常執(zhí)行。但當有極端情況出現(xiàn)時,可能會導致同步隊列掛掉。這里演繹一下這個極端情況,考慮某次循環(huán)時,隊列結構如下:
時刻1:線程 t3 調用 unparkSuccessor 方法,head 節(jié)點狀態(tài)由 SIGNAL(-1) 變?yōu)?b>0,并喚醒線程 t1。此時信號量數(shù)值為1。
時刻2:線程 t1 恢復運行,t1 調用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后線程 t1 被切換,暫停運行。
時刻3:線程 t4 調用 releaseShared 方法,因 head 的狀態(tài)為0,所以 t4 不會調用 unparkSuccessor 方法。
時刻4:線程 t1 恢復運行,t1 成功獲取信號量,調用 setHeadAndPropagate。但因為 propagate = 0,線程 t1 無法調用 unparkSuccessor 喚醒線程 t2,t2 面臨無線程喚醒的情況。因為 t2 無法退出等待狀態(tài),所以 t2.join 會阻塞主線程,導致程序掛住。
下面再來看一下修復 BUG 后的代碼,根據(jù) BUG 詳情頁顯示,該 BUG 在 JDK 1.7 中被修復。這里找一個 JDK 7 較早版本(JDK 7u10)的代碼看一下,如下:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
在按照上面的代碼演繹一下邏輯,如下:
時刻1:線程 t3 調用 unparkSuccessor 方法,head 節(jié)點狀態(tài)由 SIGNAL(-1) 變?yōu)?b>0,并喚醒線程t1。此時信號量數(shù)值為1。
時刻2:線程 t1 恢復運行,t1 調用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后線程 t1 被切換,暫停運行。
時刻3:線程 t4 調用 releaseShared 方法,檢測到h.waitStatus = 0,t4 將頭節(jié)點等待狀態(tài)由0設為PROPAGATE(-3)。
時刻4:線程 t1 恢復運行,t1 成功獲取信號量,調用 setHeadAndPropagate。因 propagate = 0,propagate > 0 條件不滿足。而 h.waitStatus = PROPAGATE(-3),所以條件h.waitStatus < 0成立。進而,線程 t1 可以喚醒線程 t2,完成喚醒動作的傳播。
到這里關于狀態(tài) PROPAGATE 的內容就講完了。最后,簡單總結一下本章開頭提的兩個問題。
問題一:PROPAGATE 狀態(tài)用在哪里,以及怎樣向后傳播喚醒動作的?
答:PROPAGATE 狀態(tài)用在 setHeadAndPropagate。當頭節(jié)點狀態(tài)被設為 PROPAGATE 后,后繼節(jié)點成為新的頭結點后。若 propagate > 0 條件不成立,則根據(jù)條件h.waitStatus < 0成立與否,來決定是否喚醒后繼節(jié)點,即向后傳播喚醒動作。
問題二:引入 PROPAGATE 狀態(tài)是為了解決什么問題?
答:引入 PROPAGATE 狀態(tài)是為了解決并發(fā)釋放信號量所導致部分請求信號量的線程無法被喚醒的問題。
聲明:
本章內容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源碼解讀 基礎上,進行了一定的補充說明。本章所參考的觀點已經過原作者同意,為避免抄襲嫌疑,特此聲明。
到這里,本文就差不多結束了。如果大家從頭看到尾,到這里就可以放松一下了。寫到這里,我也可以放松一下了。這篇文章總共花費了我12天的空閑時間,確實不容易。本來我只打算講一下基本原理,但知道后來看到本文多次推薦的那篇文章。那篇文章給我的第一感覺是,作者很厲害。第二感覺是,我也要寫出一篇較為深入的 AQS 分析文章。雖然寫出來也不能怎么樣,水平也不會因此提高多少,也不會去造個類似的輪子。但是寫完后,確實感覺很有成就感。本文的最后,來說一下如何學習 AQS 原理。AQS 的大致原理不是很難理解,所以一開始不建議糾結細節(jié),應該先弄懂它的大致原理。在此基礎上,再去分析一些細節(jié),分析細節(jié)時,要從多線程的角度去考慮。比如,有點地方 CAS 失敗后要重試,有的不用重試。總體來說 AQS 的大致原理容易理解,細節(jié)部分比較復雜。很多細節(jié)要在腦子里演繹一遍,好好思考才能想通,有點燒腦。另外因為文章篇幅的問題,關于 AQS ConditionObject 部分的分析將會放在下一篇文章中進行。
最后,再向 AQS 的作者 Doug Lea 致以崇高的敬意。僅盡力弄懂 AQS 的原理都很難了,可想而知,實現(xiàn) AQS 的難度有多大。
限于本人的能力,加之深入分析 AQS 本身就比較有難度。所以文中難免會有錯誤出現(xiàn),如果不慎翻車,請見諒。也歡迎在評論區(qū)指明這些錯誤,感謝。
參考??AbstractQueuedSynchronizer源碼解讀 - 活在夢裡
《Java并發(fā)編程的藝術》 - 方騰飛 / 魏鵬 / 程曉明
本文在知識共享許可協(xié)議 4.0 下發(fā)布,轉載需在明顯位置處注明出處
作者:coolblog
本文同步發(fā)布在我的個人博客:http://www.coolblog.xyz
本作品采用知識共享署名-非商業(yè)性使用-禁止演繹 4.0 國際許可協(xié)議進行許可。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76386.html
摘要:簡介抽象隊列同步器,以下簡稱出現(xiàn)在中,由大師所創(chuàng)作。獲取成功則返回,獲取失敗,線程進入同步隊列等待。響應中斷版的超時響應中斷版的共享式獲取同步狀態(tài),同一時刻可能會有多個線程獲得同步狀態(tài)。 1.簡介 AbstractQueuedSynchronizer (抽象隊列同步器,以下簡稱 AQS)出現(xiàn)在 JDK 1.5 中,由大師 Doug Lea 所創(chuàng)作。AQS 是很多同步器的基礎框架,比如 ...
摘要:同步器擁有三個成員變量隊列的頭結點隊列的尾節(jié)點和狀態(tài)。對于同步器維護的狀態(tài),多個線程對其的獲取將會產生一個鏈式的結構。使用將當前線程,關于后續(xù)會詳細介紹。 簡介提供了一個基于FIFO隊列,可以用于構建鎖或者其他相關同步裝置的基礎框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態(tài),期望它能夠成為實現(xiàn)大部分同步需求的基礎。使用的方法是繼承,子類通過繼承同步器并需要實現(xiàn)它的方法來管理...
摘要:當前節(jié)點擁有的線程。方法返回值表示在線程等待過程中,是否有另一個線程調用該線程的方法,發(fā)起中斷。如果前一個節(jié)點狀態(tài)是,那么直接返回,阻塞當前線程如果前一個節(jié)點狀態(tài)是大于就是,表示前一個 AQS是JUC鎖框架中最重要的類,通過它來實現(xiàn)獨占鎖和共享鎖的。本章是對AbstractQueuedSynchronizer源碼的完全解析,分為四個部分介紹: CLH隊列即同步隊列:儲存著所有等待鎖...
摘要:當前節(jié)點擁有的線程。方法返回值表示在線程等待過程中,是否有另一個線程調用該線程的方法,發(fā)起中斷。如果前一個節(jié)點狀態(tài)是,那么直接返回,阻塞當前線程如果前一個節(jié)點狀態(tài)是大于就是,表示前一個 AQS是JUC鎖框架中最重要的類,通過它來實現(xiàn)獨占鎖和共享鎖的。本章是對AbstractQueuedSynchronizer源碼的完全解析,分為四個部分介紹: CLH隊列即同步隊列:儲存著所有等待鎖...
摘要:當前節(jié)點擁有的線程。方法返回值表示在線程等待過程中,是否有另一個線程調用該線程的方法,發(fā)起中斷。如果前一個節(jié)點狀態(tài)是,那么直接返回,阻塞當前線程如果前一個節(jié)點狀態(tài)是大于就是,表示前一個 AQS是JUC鎖框架中最重要的類,通過它來實現(xiàn)獨占鎖和共享鎖的。本章是對AbstractQueuedSynchronizer源碼的完全解析,分為四個部分介紹: CLH隊列即同步隊列:儲存著所有等待鎖...
閱讀 3367·2021-11-04 16:10
閱讀 3871·2021-09-29 09:43
閱讀 2706·2021-09-24 10:24
閱讀 3362·2021-09-01 10:46
閱讀 2514·2019-08-30 15:54
閱讀 594·2019-08-30 13:19
閱讀 3241·2019-08-29 17:19
閱讀 1062·2019-08-29 16:40