摘要:而對于共享鎖而言,由于鎖是可以被共享的,因此它可以被多個線程同時持有。換句話說,如果一個線程成功獲取了共享鎖,那么其他等待在這個共享鎖上的線程就也可以嘗試去獲取鎖,并且極有可能獲取成功。
前言
前面兩篇我們以ReentrantLock為例了解了AQS獨占鎖的獲取與釋放,本篇我們來看看共享鎖。由于AQS對于共享鎖與獨占鎖的實現框架比較類似,因此如果你搞定了前面的獨占鎖模式,則共享鎖也就很容易弄懂了。
系列文章目錄
共享鎖與獨占鎖的區別共享鎖與獨占鎖最大的區別在于,獨占鎖是獨占的,排他的,因此在獨占鎖中有一個exclusiveOwnerThread屬性,用來記錄當前持有鎖的線程。當獨占鎖已經被某個線程持有時,其他線程只能等待它被釋放后,才能去爭鎖,并且同一時刻只有一個線程能爭鎖成功。
而對于共享鎖而言,由于鎖是可以被共享的,因此它可以被多個線程同時持有。換句話說,如果一個線程成功獲取了共享鎖,那么其他等待在這個共享鎖上的線程就也可以嘗試去獲取鎖,并且極有可能獲取成功。
共享鎖的實現和獨占鎖是對應的,我們可以從下面這張表中看出:
獨占鎖 | 共享鎖 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
可以看出,除了最后一個屬于共享鎖的doReleaseShared()方法沒有對應外,其他的方法,獨占鎖和共享鎖都是一一對應的。
事實上,其實與doReleaseShared()對應的獨占鎖的方法應當是unparkSuccessor(h),只是doReleaseShared()邏輯不僅僅包含了unparkSuccessor(h),還包含了其他操作,這一點我們下面分析源碼的時候再看。
另外,尤其需要注意的是,在獨占鎖模式中,我們只有在獲取了獨占鎖的節點釋放鎖時,才會喚醒后繼節點——這是合理的,因為獨占鎖只能被一個線程持有,如果它還沒有被釋放,就沒有必要去喚醒它的后繼節點。
然而,在共享鎖模式下,當一個節點獲取到了共享鎖,我們在獲取成功后就可以喚醒后繼節點了,而不需要等到該節點釋放鎖的時候,這是因為共享鎖可以被多個線程同時持有,一個鎖獲取到了,則后繼的節點都可以直接來獲取。因此,在共享鎖模式下,在獲取鎖和釋放鎖結束時,都會喚醒后繼節點。 這一點也是doReleaseShared()方法與unparkSuccessor(h)方法無法直接對應的根本原因所在。
共享鎖的獲取public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
我們拿它和獨占鎖模式對比一下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這兩者的結構看上去似乎有點差別,但事實上是一樣的,只不過是共享鎖模式下,將與addWaiter(Node.EXCLUSIVE)對應的addWaiter(Node.SHARED),以及selfInterrupt()操作全部移到了doAcquireShared方法內部,這一點我們在下面分析doAcquireShared方法時就一目了然了。
不過這里先插一句,相對于獨占的鎖的tryAcquire(int arg)返回boolean類型的值,共享鎖的tryAcquireShared(int acquires)返回的是一個整型值:
如果該值小于0,則代表當前線程獲取共享鎖失敗
如果該值大于0,則代表當前線程獲取共享鎖成功,并且接下來其他線程嘗試獲取共享鎖的行為很可能成功
如果該值等于0,則代表當前線程獲取共享鎖成功,但是接下來其他線程嘗試獲取共享鎖的行為會失敗
因此,只要該返回值大于等于0,就表示獲取共享鎖成功。
acquireShared中的tryAcquireShared方法由具體的子類負責實現,這里我們暫且不表。
接下來我們看看doAcquireShared方法,它對應于獨占鎖的acquireQueued,兩者其實很類似,我們把它們相同的部分注釋掉,只看不同的部分:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); /*boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();*/ if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } /*if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }*/ }
關于上面的if部分,獨占鎖對應的acquireQueued方法為:
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
因此,綜合來看,這兩者的邏輯僅有兩處不同:
addWaiter(Node.EXCLUSIVE) -> addWaiter(Node.SHARED)
setHead(node) -> setHeadAndPropagate(node, r)
這里第一點不同就是獨占鎖的acquireQueued調用的是addWaiter(Node.EXCLUSIVE),而共享鎖調用的是addWaiter(Node.SHARED),表明了該節點處于共享模式,這兩種模式的定義為:
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
該模式被賦值給了節點的nextWaiter屬性:
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
我們知道,在條件隊列中,nextWaiter是指向條件隊列中的下一個節點的,它將條件隊列中的節點串起來,構成了單鏈表。但是在sync queue隊列中,我們只用prev,next屬性來串聯節點,形成雙向鏈表,nextWaiter屬性在這里只起到一個標記作用,不會串聯節點,這里不要被Node SHARED = new Node()所指向的空節點迷惑,這個空節點并不屬于sync queue,不代表任何線程,它只起到標記作用,僅僅用作判斷節點是否處于共享模式的依據:
// Node#isShard() final boolean isShared() { return nextWaiter == SHARED; }
這里的第二點不同就在于獲取鎖成功后的行為,對于獨占鎖而言,是直接調用了setHead(node)方法,而共享鎖調用的是setHeadAndPropagate(node, r):
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
在該方法內部我們不僅調用了setHead(node),還在一定條件下調用了doReleaseShared()來喚醒后繼的節點。這是因為在共享鎖模式下,鎖可以被多個線程所共同持有,既然當前線程已經拿到共享鎖了,那么就可以直接通知后繼節點來拿鎖,而不必等待鎖被釋放的時候再通知。
關于這個doReleaseShared方法,我們到下面分析鎖釋放的時候再看。
共享鎖的釋放我們使用releaseShared(int arg)方法來釋放共享鎖:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法對應于獨占鎖的release(int arg)方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在獨占鎖模式下,由于頭節點就是持有獨占鎖的節點,在它釋放獨占鎖后,如果發現自己的waitStatus不為0,則它將負責喚醒它的后繼節點。
在共享鎖模式下,頭節點就是持有共享鎖的節點,在它釋放共享鎖后,它也應該喚醒它的后繼節點,但是值得注意的是,我們在之前的setHeadAndPropagate方法中可能已經調用過該方法了,也就是說它可能會被同一個頭節點調用兩次,也有可能在我們從releaseShared方法中調用它時,當前的頭節點已經易主了,下面我們就來詳細看看這個方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
該方法可能是共享鎖模式最難理解的方法了,在看該方法時,我們需要明確以下幾個問題:
(1) 該方法有幾處調用?
該方法有兩處調用,一處在acquireShared方法的末尾,當線程成功獲取到共享鎖后,在一定條件下調用該方法;一處在releaseShared方法中,當線程釋放共享鎖的時候調用。
(2) 調用該方法的線程是誰?
在獨占鎖中,只有獲取了鎖的線程才能調用release釋放鎖,因此調用unparkSuccessor(h)喚醒后繼節點的必然是持有鎖的線程,該線程可看做是當前的頭節點(雖然在setHead方法中已經將頭節點的thread屬性設為了null,但是這個頭節點曾經代表的就是這個線程)
在共享鎖中,持有共享鎖的線程可以有多個,這些線程都可以調用releaseShared方法釋放鎖;而這些線程想要獲得共享鎖,則它們必然曾經成為過頭節點,或者就是現在的頭節點。因此,如果是在releaseShared方法中調用的doReleaseShared,可能此時調用方法的線程已經不是頭節點所代表的線程了,頭節點可能已經被易主好幾次了。
(3) 調用該方法的目的是什么?
無論是在acquireShared中調用,還是在releaseShared方法中調用,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。這一點看上去和獨占鎖似乎一樣,但是它們的一個重要的差別是——在共享鎖中,當頭節點發生變化時,是會回到循環中再立即喚醒head節點的下一個節點的。也就是說,在當前節點完成喚醒后繼節點的任務之后將要退出時,如果發現被喚醒后繼節點已經成為了新的頭節點,則會立即觸發喚醒head節點的下一個節點的操作,如此周而復始。
(4) 退出該方法的條件是什么
該方法是一個自旋操作(for(;;)),退出該方法的唯一辦法是走最后的break語句:
if (h == head) // loop if head changed break;
即,只有在當前head沒有易主時,才會退出,否則繼續循環。
這個怎么理解呢?
為了說明問題,這里我們假設目前sync queue隊列中依次排列有
dummy node -> A -> B -> C -> D
現在假設A已經拿到了共享鎖,則它將成為新的dummy node,
dummy node (A) -> B -> C -> D
此時,A線程會調用doReleaseShared,我們寫做doReleaseShared[A],在該方法中將喚醒后繼的節點B,它很快獲得了共享鎖,成為了新的頭節點:
dummy node (B) -> C -> D
此時,B線程也會調用doReleaseShared,我們寫做doReleaseShared[B],在該方法中將喚醒后繼的節點C,但是別忘了,在doReleaseShared[B]調用的時候,doReleaseShared[A]還沒運行結束呢,當它運行到if(h == head)時,發現頭節點現在已經變了,所以它將繼續回到for循環中,與此同時,doReleaseShared[B]也沒閑著,它在執行過程中也進入到了for循環中。。。
由此可見,我們這里形成了一個doReleaseShared的“調用風暴”,大量的線程在同時執行doReleaseShared,這極大地加速了喚醒后繼節點的速度,提升了效率,同時該方法內部的CAS操作又保證了多個線程同時喚醒一個節點時,只有一個線程能操作成功。
那如果這里doReleaseShared[A]執行結束時,節點B還沒有成為新的頭節點時,doReleaseShared[A]方法不就退出了嗎?是的,但即使這樣也沒有關系,因為它已經成功喚醒了線程B,即使doReleaseShared[A]退出了,當B線程成為新的頭節點時,doReleaseShared[B]就開始執行了,它也會負責喚醒后繼節點的,這樣即使變成這種每個節點只喚醒自己后繼節點的模式,從功能上講,最終也可以實現喚醒所有等待共享鎖的節點的目的,只是效率上沒有之前的“調用風暴”快。
由此我們知道,這里的“調用風暴”事實上是一個優化操作,因為在我們執行到該方法的末尾的時候,unparkSuccessor基本上已經被調用過了,而由于現在是共享鎖模式,所以被喚醒的后繼節點極有可能已經獲取到了共享鎖,成為了新的head節點,當它成為新的head節點后,它可能還是要在setHeadAndPropagate方法中調用doReleaseShared喚醒它的后繼節點。
明確了上面幾個問題后,我們再來詳細分析這個方法,它最重要的部分就是下面這兩個if語句:
if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
第一個if很好理解,如果當前ws值為Node.SIGNAL,則說明后繼節點需要喚醒,這里采用CAS操作先將Node.SIGNAL狀態改為0,這是因為前面講過,可能有大量的doReleaseShared方法在同時執行,我們只需要其中一個執行unparkSuccessor(h)操作就行了,這里通過CAS操作保證了unparkSuccessor(h)只被執行一次。
比較難理解的是第二個else if,首先我們要弄清楚ws啥時候為0,一種是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)會導致ws為0,但是很明顯,如果是因為這個原因,則它是不會進入到else if語句塊的。所以這里的ws為0是指當前隊列的最后一個節點成為了頭節點。為什么是最后一個節點呢,因為每次新的節點加進來,在掛起前一定會將自己的前驅節點的waitStatus修改成Node.SIGNAL的。(對這一點不理解的詳細看這里)
其次,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這個操作什么時候會失敗?既然這個操作失敗,說明就在執行這個操作的瞬間,ws此時已經不為0了,說明有新的節點入隊了,ws的值被改為了Node.SIGNAL,此時我們將調用continue,在下次循環中直接將這個剛剛新入隊但準備掛起的線程喚醒。
其實,如果我們再結合外部的整體條件,就很容易理解這種情況所針對的場景,不要忘了,進入上面這段還有一個條件是
if (h != null && h != tail)
它處于最外層:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // 注意這里說明了隊列至少有兩個節點 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
這個條件意味著,隊列中至少有兩個節點。
結合上面的分析,我們可以看出,這個
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
描述了一個極其嚴苛且短暫的狀態:
首先,大前提是隊列里至少有兩個節點
其次,要執行到else if語句,說明我們跳過了前面的if條件,說明頭節點是剛剛成為頭節點的,它的waitStatus值還為0,尾節點是在這之后剛剛加進來的,它需要執行shouldParkAfterFailedAcquire,將它的前驅節點(即頭節點)的waitStatus值修改為Node.SIGNAL,但是目前這個修改操作還沒有來的及執行。這種情況使我們得以進入else if的前半部分else if (ws == 0 &&
緊接著,要滿足!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這一條件,說明此時頭節點的waitStatus已經不是0了,這說明之前那個沒有來得及執行的 在shouldParkAfterFailedAcquire將前驅節點的的waitStatus值修改為Node.SIGNAL的操作現在執行完了。
由此可見,else if 的 && 連接了兩個不一致的狀態,分別對應了shouldParkAfterFailedAcquire的compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行成功前和執行成功后,因為doReleaseShared和
shouldParkAfterFailedAcquire是可以并發執行的,所以這一條件是有可能滿足的,只是滿足的條件非常嚴苛,可能只是一瞬間的事。
這里不得不說,如果以上的分析沒有錯的話,那作者對于AQS性能的優化已經到了“令人發指”的地步!!!雖說這種短暫的瞬間確實存在,也確實有必要重新回到for循環中再次去喚醒后繼節點,但是這種優化也太太太~~~過于精細了吧!
我們來看看如果不加入這個精細的控制條件有什么后果呢?
這里我們復習一下新節點入隊的過程,前面說過,在發現新節點的前驅不是head節點的時候,它將調用shouldParkAfterFailedAcquire:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
由于前驅節點的ws值現在還為0,新節點將會把它改為Node.SIGNAL,
但修改后,該方法返回的是false,也就是說線程不會立即掛起,而是回到上層再嘗試一次搶鎖:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // shouldParkAfterFailedAcquire的返回處 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
當我們再次回到for(;;)循環中,由于此時當前節點的前驅節點已經成為了新的head,所以它可以參與搶鎖,由于它搶的是共享鎖,所以大概率它是搶的到的,所以極有可能它不會被掛起。這有可能導致在上面的doReleaseShared調用unparkSuccessor方法unpark了一個并沒有被park的線程。然而,這一操作是被允許的,當我們unpark一個并沒有被park的線程時,該線程在下一次調用park方法時就不會被掛起,而這一行為是符合我們的場景的——因為當前的共享鎖處于可獲取的狀態,后繼的線程應該直接來獲取鎖,不應該被掛起。
事實上,我個人認為:
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS
這一段其實也可以省略,當然有了這一段肯定會加速喚醒后繼節點的過程,作者針對上面那種極其短暫的情況進行了優化可以說是和它之前“調用風暴”的設計一脈相承,可能也正是由于作者對于性能的極致追求才使得AQS如此之優秀吧。
總結共享鎖的調用框架和獨占鎖很相似,它們最大的不同在于獲取鎖的邏輯——共享鎖可以被多個線程同時持有,而獨占鎖同一時刻只能被一個線程持有。
由于共享鎖同一時刻可以被多個線程持有,因此當頭節點獲取到共享鎖時,可以立即喚醒后繼節點來爭鎖,而不必等到釋放鎖的時候。因此,共享鎖觸發喚醒后繼節點的行為可能有兩處,一處在當前節點成功獲得共享鎖后,一處在當前節點釋放共享鎖后。
(完)
系列文章目錄
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77189.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經歷。因為寫作的時候發現,為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。 前言 系列文章目錄 CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執行完成后才能執行的情景。例如,小組早上開會,只有等所有人...
摘要:我們知道,這個函數將返回當前正在執行的線程的中斷狀態,并清除它。注意,中斷對線程來說只是一個建議,一個線程被中斷只是其中斷狀態被設為線程可以選擇忽略這個中斷,中斷一個線程并不會影響線程的執行。 前言 系列文章目錄 上一篇文章 我們逐行分析了獨占鎖的獲取操作, 本篇文章我們來看看獨占鎖的釋放。如果前面的鎖的獲取流程你已經趟過一遍了, 那鎖的釋放部分就很簡單了, 這篇文章我們直接開始看...
摘要:本篇我們將以的公平鎖為例來詳細看看使用獲取獨占鎖的流程。本文中的源碼基于。由于本篇我們分析的是獨占鎖,同一時刻,鎖只能被一個線程所持有。由于在整個搶鎖過程中,我們都是不響應中斷的。 前言 AQS(AbstractQueuedSynchronizer)是JAVA中眾多鎖以及并發工具的基礎,其底層采用樂觀鎖,大量使用了CAS操作, 并且在沖突時,采用自旋方式重試,以實現輕量級和高效地獲取鎖...
閱讀 3205·2023-04-26 01:39
閱讀 3358·2023-04-25 18:09
閱讀 1627·2021-10-08 10:05
閱讀 3241·2021-09-22 15:45
閱讀 2794·2019-08-30 15:55
閱讀 2402·2019-08-30 15:54
閱讀 3174·2019-08-30 15:53
閱讀 1336·2019-08-29 12:32