摘要:與之相關的方法有三個原子性地修改都是類型,可見我們可以進行,來定義的獲取與釋放從而實現我們自定義的同步器。
前言
源碼分析我認為主要有兩個作用:滿足好奇心,我想每一個有追求的人都不會滿足于僅僅做一個API Caller實現功能就好,我們也想知道它到底是怎么實現的;借鑒與升華,當我們明白了一個類的設計原理,在一定的情境下我們可以借鑒其設計哲學,甚至針對我們自己特殊的業務場景對其進行改良與優化。
下面我就以這篇文章開啟我的源碼閱讀之旅。總體而言,我會從這個類基本結構入手,然后分析原理,再看看已有的應用,并進行分析與理解。
我之前一篇文章里提到過java的顯示鎖:ReentrantLock。此外,如果你編寫過并發程序那你一般也應該用過CountDownLatch,Semaphore等等,這些都是同步器,而它們都基于AbstractQueuedSynchronizer(簡稱AQS)實現的,那么我們今天就來看看這個牛逼的AQS是怎么實現這么多功能的。
首先打開IDEA,隨便新建一個類,然后輸入CountDownLatch,在它上面敲下Ctrl+B,就打開了CountDownLatch的源碼,然后發現有一個非常重要的靜態內部類Sync繼承了AbstractQueuedSynchronizer,再次Ctrl+B,我們就打開了AQS的源碼,馬上就可以解開它的神秘面紗了,哼哼。
映入眼簾的首先就是大段大段的文檔,大意就是這個類 提供了一個基于FIFO隊列的實現了阻塞鎖和相關同步器(信號量,事件等)的框架...... 讀完了大概就了解這個類到底是怎么工作的了。下面我們開始分類型研究源碼,當然不可能全部分析一遍,這里只把重點的列出來。
實際代碼分析中,我一般先看看這個結構圖:
然后讀一讀開始的綜述文檔,然后從實例開始,像方法調用那樣依次深入查看,就能依次看到相關的方法、內部類和屬性,還是Ctrl+B大法好啊,這屬于自底向上的源碼分析方法。如果直接從上面那張圖開始,對屬性、方法、內部類挨個分析就屬于自頂向下的分析法了。我覺得對一個陌生的東西要想有清晰的認知最好先自底向上捋一遍,便于搞清楚一個個具體功能的實現機制,然后再自頂向下看一遍,便于把控整體架構,宏觀把握。這樣走兩遍再來總結一下就能比較透徹的掌握該技術了。
一、方法與屬性方法中,protected類型的一般要求具體的同步器子類來實現但是有些也可以直接用,public類型一般都是可以直接使用的當然也可以自己實現,private就是AQS自己的內部實現了,與具體子類無關。
state相關一個private volatile int state;屬性代表了線程之間爭用的資源。與之相關的方法有三個
protected final int getState() protected final void setState(int newState) protected final boolean compareAndSetState(int expect, int update)//CAS原子性地修改state
都是protected類型,可見我們可以進行Override,來定義state的獲取與釋放從而實現我們自定義的同步器。非常簡單就不把全部源碼擺出來了。
同步隊列queue相關這個queue是一個FIFO的隊列,每個節點都是下面的內部類Node類型,等待著state這個資源,主要由兩個屬性決定private transient volatile Node head;和private transient volatile Node tail; 與之相關的方法有:
// 節點node進入隊列,采用CAS的方式,返回其前驅 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 隊列為空,先初始化 if (compareAndSetHead(new Node()))//設置頭結點 tail = head; } else {// 隊列不為空 node.prev = t;// 插入節點至隊列尾部 if (compareAndSetTail(t, node)) {//CAS修改隊尾為node,之所以CAS是因為可能有多個線程爭相入隊 t.next = node; return t; } } } } // 將當前線程以mode的方式(EXCLUSIVE或者SHARED)構成新節點并入隊,返回這個新節點 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 更快的入隊方式,如果失敗再采用較慢的標準入隊方式enq Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } // 把node設置為新的頭,老的頭出隊 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }資源獲取與釋放相關
資源獲取分為EXCLUSIVE和SHARED兩種模式,對應acquire與release、acquireShared與releaseShared。
首先是EXCLUSIVE資源獲取:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這里tryAcquire需要繼承類自己實現(成功true,失敗false),如果tryAcquire成功則直接返回,否則addWaiter將當前線程以獨占節點的方式置于同步隊列尾部等待。acquireQueued使得該節點等待獲取資源,一直獲取到資源才返回,整個等待過程中如果有中斷是不響應的,但是獲取資源后會用selfInterrupt補上。
// 節點獲得資源才能返回否則一直自旋,中斷該線程不會實時響應,但是如果被中斷過會返回true,否則返回false final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {// node前驅是頭結點,那么便可以嘗試去獲取資源了 setHead(node);// 獲取成功,可以把node設為頭結點,也就是說頭結點是獨占資源的唯一擁有者 p.next = null; // help GC failed = false; return interrupted; } // 走到這里說明獲取失敗,檢查是否應該阻塞和中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node);//如果失敗了,就把waitStatus置為CANCELLED表示取消了 } } // 獲取資源失敗后,當前節點是否應該阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 前驅pred獲得資源后會通知當前節點node,所以可以放心的阻塞了(waitStatus會在下面內部類解釋) return true; if (ws > 0) {// 前驅取消了資源獲取,那么當前節點就要找到前面最近一個正在等待的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;//此處 pred.waitStatus < 0,亦即pred 還在等待嘗試獲取資源 } else {// 前驅正在等待,則設置其狀態為SIGNAL,讓他獲取資源后通知本節點, compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 但是本節點不能馬上阻塞,因為設置不一定能成功,需要下次再次檢查 } return false; } // 阻塞本線程。被喚醒后要返回本線程是否被中斷過。 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
然后是EXCLUSIVE資源釋放:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
與上面相對應,這里tryRelease也需要繼承類自己實現(成功true,失敗false),如果釋放成功,則調用unparkSuccessor喚醒后繼節點返回true,否則返回false。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0)// 可能需要釋放通知信號,把狀態置零,允許失敗 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;// 找到后繼節點 if (s == null || s.waitStatus > 0) {// 如果后繼節點為空或者已經取消 s = null;// 確保該節點的釋放 for (Node t = tail; t != null && t != node; t = t.prev)// 從隊尾開始找到需要通知的最近的后繼節點 if (t.waitStatus <= 0) s = t; } if (s != null)// 如果需喚醒的后繼節點存在則喚醒之 LockSupport.unpark(s.thread); }
再看SHARED資源獲取:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
這里tryAcquireShared也需要自己實現(負值說明失敗,非負值表示獲取成功后剩下的可用資源數),如果獲取失敗就調用doAcquireShared進入同步隊列等待。
// 等待獲取共享資源時不響應中斷,但是獲取資源成功后會用selfInterrupt補上 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);// 入隊尾 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {// 處于隊列第二個位置,可以嘗試獲取資源 int r = tryAcquireShared(arg); if (r >= 0) {// 獲取成功 setHeadAndPropagate(node, r);// 將自己設為隊列頭,并喚醒可能獲取資源的后面幾個節點 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 同acquireQueued的分析 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 舊的頭 setHead(node); // 設置新的頭 // 如果還有資源,則喚醒下一個,采用保守策略,多喚醒幾次即使沒獲取到資源也無所謂,盡量做到不漏掉資源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
最后SHARED資源釋放:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
這里tryReleaseShared依然要自己實現(如果可以允許下一個節點獲得資源則返回true,否則false),如果釋放成功則調用doReleaseShared喚醒后繼節點。需要注意的是tryReleaseShared由于可能多個線程并發操作所以一般需要CAS而tryRelease不需要。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {// 需要喚醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//設置WaitStatus失敗 continue; unparkSuccessor(h);// 一定要設置成功才喚醒 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// CAS設置失敗則繼續循環 } if (h == head)// 頭變了,不需要繼續喚醒 break; } }
此外,資源獲取除了一直等待的方式之外還有對應的限制等待時間的方法如tryAcquire與tryAcquireNanos,不必多言,釋放就只有一直等而沒有限制等待時間的了。也有響應中斷與不響應的對應,如acquireInterruptibly與acquire,差別不大,不必多言。
二、內部類 Node等待隊列的節點類,等待隊列是CLH(Craig,Landin,Hagersten)鎖隊列的一種變體,CLH鎖通常用來作為自旋鎖。
每個節點主要維護了下面一些狀態
對應的線程thread
等待狀態waitStatus 含,0:初始狀態;CANCELLED 1:被取消;SIGNAL -1:當前線程釋放資源或取消后需要喚醒后繼節點;CONDITION -2:條件等待;PROPAGATE -3:下一個acquireShared操作應該被無條件傳播。實際使用中,一般只關注正負,非負數就意味著節點不需要釋放信號
資源獲取模式有SHARED(默認)和EXCLUSIVE兩個
同步隊列中的前驅后繼節點prev和next
作為同步隊列節點時,nextWaiter有:EXCLUSIVE、SHARED標識當前節點是獨占模式還是共享模式;與ConditionObject搭配使用作為條件等待隊列節點時,nextWaiter保存后繼節點。所以實際上這個Node類是被復用了,既用于同步隊列,也用于條件等待隊列。
ConditionObject這個類實現了Condition接口,主要用來完成常見的條件等待、喚醒等操作。一個ConditionObject 包含一個等待隊列,由firstWaiter和lastWaiter決定。當前線程調用Condition.await()方法時,會被構造成為節點,然后置于條件等待隊列隊尾。
我們看最常用的條件等待方法
// 條件等待,響應中斷 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();// 響應中斷 Node node = addConditionWaiter();// 當前線程加入條件等待隊列 int savedState = fullyRelease(node);// 釋放資源,并獲得本節點需要的資源數,以便再次獲取 int interruptMode = 0; while (!isOnSyncQueue(node)) {// 如果當前節點不是在同步隊列,也就是還在等待隊列等待著條件發生 LockSupport.park(this);// 就會一直阻塞 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//加入同步隊列等待獲取資源,成功后要檢查中斷響應 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // 如果最后一個條件等待節點是取消的狀態 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 清理整個鏈路的無效節點 t = lastWaiter; } //以條件等待的方式將當前線程封裝成節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null)//條件等待隊列為空就初始化 firstWaiter = node; else// 隊列不空,插入隊尾 t.nextWaiter = node; lastWaiter = node; return node;// 返回新插入的節點 } final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 本節點需要的資源數 if (release(savedState)) {// 釋放掉這么多資源 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
然后是信號方法:
public final void signal() { if (!isHeldExclusively())// 要使用該方法必須先是獨占線程 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first);//找到第一個條件等待節點,并發出信號 } // 去掉條件等待隊列的節點,直到遇上沒取消的或者空節點 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 節點已被取消 return false; Node p = enq(node);// 條件等待隊列的第一個節點被加入同步隊列的隊尾 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);// 喚醒節點對應線程 return true; }三、已有應用分析
下面用兩個例子來看看AQS的具體使用場景,分別是使用獨占模式的ReentrantLock和共享模式的CountDownLatch 。
一般使用AQS的類,都會用一個內部類Sync來繼承AQS,并實現那幾個protected的方法。
ReentrantLock 有FairSync和NonfairSync兩個類來實現公平鎖和非公平鎖,我們看非公平鎖,主要幾個方法是
lock(),使得NonfairSync調用compareAndSetState把state從0設為1并用setExclusiveOwnerThread把當前線程設為獨占線程(亦即首次獲得鎖),如果失敗則使用acquire(1)調用nonfairTryAcquire。總體流程就是如果state為0,那么就是本線程首次獲得鎖,把state置為1,否則如果當前線程是獨占線程則將state+1(這也是鎖可重入的關鍵),如果都不是就進入acquireQueued流程等待獲得鎖了了
unlock(),調用AQS的release(1)方法,實際上是調用了Sync的tryRelease(1)方法,如果state-1為0,那么返回true,否則返回false。也就是說,重入鎖必須釋放夠重入次數才算真正釋放成功,但是unlock()方法本身不會管這個最終結果,只管釋放
tryLock(),與lock()區別是不等待,立即返回,只有喚醒時就是獨占線程才能返回true,實現方法是nonfairTryAcquire
newCondition()直接返回了了AQS的內部類ConditionObject
isLocked() 如果state為0則表示未加鎖返回false,否則返回true
CountDownLatchCountDownLatch 主要幾個方法是
CountDownLatch(int count),構造方法,設置 AQS 的 state 為 count
await(),調用 AQS 的 acquireSharedInterruptibly(int arg) 方法,然后調用自己覆蓋的tryAcquireShared(int acquires)來獲得state的值是否為0,如果是0就結束等待直接返回了,如果不是0就調用 AQS 的 doAcquireSharedInterruptibly(int arg)方法,該方法會循環等待,直到state為0才返回或者被中斷。
countDown(),調用 AQS 的 releaseShared(int arg) 方法,實際上是調用了自己覆蓋的 tryReleaseShared(int releases) 方法,把 state 減了1,如果此時state為0,則調用 AQS 的doReleaseShared()方法
分析總體而言,AQS提供了一個模板方法模式,將獲得鎖釋放鎖一些必要的流程操作都規定好了,我們只需要填充一些具體的獲得與釋放方法
getState(),setState(int newState),compareAndSetState(int expect,int update):是資源相關操作,保證原子性
tryAcquire(int arg):嘗試獨占獲取資源。成功返回true,失敗返回false。
tryRelease(int arg):嘗試獨占釋放資源。成功返回true,失敗返回false。
tryAcquireShared(int arg):嘗試共享獲取資源。負數表示失敗,非負數表示成功代表剩余可用資源
tryReleaseShared(int arg):嘗試共享釋放資源。如果釋放后可以喚醒后續等待結點返回true,否則返回false。
isHeldExclusively():代表當前線程是否獨占資源,只有用到Condition之時才需要去實現它。
自定義同步器時,一般都是自己寫一個 static class Sync extends AbstractQueuedSynchronizer 靜態內部類來實現具體的方法。
閱讀原文:MageekChiu
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/71090.html
摘要:與之相關的方法有三個原子性地修改都是類型,可見我們可以進行,來定義的獲取與釋放從而實現我們自定義的同步器。 前言 源碼分析我認為主要有兩個作用:滿足好奇心,我想每一個有追求的人都不會滿足于僅僅做一個API Caller實現功能就好,我們也想知道它到底是怎么實現的;借鑒與升華,當我們明白了一個類的設計原理,在一定的情境下我們可以借鑒其設計哲學,甚至針對我們自己特殊的業務場景對其進行改良與...
摘要:與之相關的方法有三個原子性地修改都是類型,可見我們可以進行,來定義的獲取與釋放從而實現我們自定義的同步器。 前言 源碼分析我認為主要有兩個作用:滿足好奇心,我想每一個有追求的人都不會滿足于僅僅做一個API Caller實現功能就好,我們也想知道它到底是怎么實現的;借鑒與升華,當我們明白了一個類的設計原理,在一定的情境下我們可以借鑒其設計哲學,甚至針對我們自己特殊的業務場景對其進行改良與...
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
閱讀 3495·2023-04-26 02:44
閱讀 1632·2021-11-25 09:43
閱讀 1523·2021-11-08 13:27
閱讀 1888·2021-09-09 09:33
閱讀 906·2019-08-30 15:53
閱讀 1768·2019-08-30 15:53
閱讀 2780·2019-08-30 15:53
閱讀 3114·2019-08-30 15:44