摘要:同步器擁有三個成員變量隊列的頭結點隊列的尾節點和狀態。對于同步器維護的狀態,多個線程對其的獲取將會產生一個鏈式的結構。使用將當前線程,關于后續會詳細介紹。
簡介
下面通過一個排它鎖的例子來深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的并發組件。
排他鎖的實現,一次只能一個線程獲取到鎖。
class Mutex implements Lock, java.io.Serializable { // 內部類,自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否處于占用狀態 protected boolean isHeldExclusively() { return getState() == 1; } // 當狀態為0的時候獲取鎖 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 釋放鎖,將狀態設置為0 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 返回一個Condition,每個condition都包含了一個condition隊列 Condition newCondition() { return new ConditionObject(); } } // 僅需要將操作代理到Sync上即可 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
可以看到Mutex將Lock接口均代理給了同步器的實現。
使用方將Mutex構造出來之后,調用lock獲取鎖,調用unlock進行解鎖。下面以Mutex為例子,詳細分析以下同步器的實現邏輯。
實現分析
如果獲取不到,將當前線程構造成節點Node并加入sync隊列;
進入隊列的每個線程都是一個節點Node,從而形成了一個雙向隊列,類似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規模(也就是兩個節點左右)。
再次嘗試獲取,如果沒有獲取到那么將當前線程從線程調度器上摘下,進入等待狀態。
使用LockSupport將當前線程unpark,關于LockSupport后續會詳細介紹。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 快速嘗試在尾部添加 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
上述邏輯主要包括:
使用當前線程構造Node;
對于一個節點需要做的是將當節點前驅節點指向尾節點(current.prev = tail),尾節點指向它(tail = current),原有的尾節點的后繼節點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節點的設置來保證的,也就是compareAndSetTail來完成的。
先行嘗試在隊尾添加;
如果尾節點已經有了,然后做如下操作:
分配引用T指向尾節點;
將節點的前驅節點更新為尾節點(current.prev = tail);
如果尾節點是T,那么將當尾節點設置為該節點(tail = current,原子更新);
T的后繼節點指向當前節點(T.next = current)。
注意第3點是要求原子的。
這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式。
如果隊尾添加失敗或者是第一個入隊的節點。
如果是第1個節點,也就是sync隊列沒有初始化,那么會進入到enq這個方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這個方法。
可以看到enq的邏輯是確保進入的Node都會有機會順序的添加到sync隊列中,而加入的步驟如下:
如果尾節點為空,那么原子化的分配一個頭節點,并將尾節點指向頭節點,這一步是初始化;
然后是重復在addWaiter中做的工作,但是在一個while(true)的循環中,直到當前節點入隊為止。
進入sync隊列之后,接下來就是要進行鎖的獲取,或者說是訪問控制了,只有一個線程能夠在同一時刻繼續的運行,而其他的進入等待狀態。而每個線程都是一個獨立的個體,它們自省的觀察,當條件滿足的時候(自己的前驅是頭結點并且原子性的獲取了狀態),那么這個線程能夠繼續運行。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
獲取當前節點的前驅節點;
需要獲取當前節點的前驅節點,而頭結點所對應的含義是當前站有鎖且正在運行。
當前驅節點是頭結點并且能夠獲取狀態,代表該當前節點占有鎖;
如果滿足上述條件,那么代表能夠占有鎖,根據節點對鎖占有的含義,設置頭結點為當前節點。
否則進入等待狀態。
如果沒有輪到當前節點運行,那么將當前線程從線程調度器上摘下,也就是進入等待狀態。
這里針對acquire做一下總結:
狀態的維護;
需要在鎖定時,需要維護一個狀態(int類型),而對狀態的操作是原子和非阻塞的,通過同步器提供的對狀態訪問的方法對狀態進行操縱,并且利用compareAndSet來確保原子性的修改。
狀態的獲取;
一旦成功的修改了狀態,當前線程或者說節點,就被設置為頭節點。
sync隊列的維護。
在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅節點不是頭節點或者沒有獲取到資源)進入睡眠狀態,停止線程調度器對當前節點線程的調度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關鍵,就是前驅節點的通知,而這一個過程就是釋放,釋放會通知它的后繼節點從睡眠中返回準備運行。
下面的流程圖基本描述了一次acquire所需要經歷的過程:
如上圖所示,其中的判定退出隊列的條件,判定條件是否滿足和休眠當前線程就是完成了自旋spin的過程。
public final boolean release(int arg)在unlock方法的實現中,使用了同步器的release方法。相對于在之前的acquire方法中可以得出調用acquire,保證能夠獲取到鎖(成功獲取狀態),而release則表示將狀態設置回去,也就是將資源釋放,或者說將鎖釋放。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
上述邏輯主要包括:
嘗試釋放狀態;
tryRelease能夠保證原子化的將狀態設置回去,當然需要使用compareAndSet來保證。如果釋放狀態成功過之后,將會進入后繼節點的喚醒過程。
喚醒當前節點的后繼節點所包含的線程。
通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。
private void unparkSuccessor(Node node) { // 將狀態設置為同步狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前節點的后繼節點,如果滿足狀態,那么進行喚醒操作 // 如果沒有滿足狀態,從尾部開始找尋符合要求的節點并將其喚醒 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
上述邏輯主要包括,該方法取出了當前節點的next引用,然后對其線程(Node)進行了喚醒,這時就只有一個或合理個數的線程被喚醒,被喚醒的線程繼續進行對資源的獲取與爭奪。
回顧整個資源的獲取和釋放過程:
在獲取時,維護了一個sync隊列,每個節點都是一個線程在進行自旋,而依據就是自己是否是首節點的后繼并且能夠獲取資源; 在釋放時,僅僅需要將資源還回去,然后通知一下后繼節點并將其喚醒。
這里需要注意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節點就會被設置成為首節點。
protected boolean tryAcquire(int arg)tryAcquire是自定義同步器需要實現的方法,也就是自定義同步器非阻塞原子化的獲取狀態,如果鎖該方法一般用于Lock的tryLock實現中,這個特性是synchronized無法提供的。
public final void acquireInterruptibly(int arg)該方法提供獲取狀態能力,當然在無法獲取狀態的情況下會進入sync隊列進行排隊,這類似acquire,但是和acquire不同的地方在于它能夠在外界對當前線程進行中斷的時候提前結束獲取狀態的操作,換句話說,就是在類似synchronized獲取鎖時,外界能夠對當前線程進行中斷,并且獲取鎖的這個操作能夠響應中斷并提前返回。一個線程處于synchronized塊中或者進行同步I/O操作時,對該線程進行中斷操作,這時該線程的中斷標識位被設置為true,但是線程依舊繼續運行。
如果在獲取一個通過網絡交互實現的鎖時,這個鎖資源突然進行了銷毀,那么使用acquireInterruptibly的獲取方式就能夠讓該時刻嘗試獲取鎖的線程提前返回。而同步器的這個特性被實現Lock接口中的lockInterruptibly方法。根據Lock的語義,在被中斷時,lockInterruptibly將會拋出InterruptedException來告知使用者。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 檢測中斷標志位 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
檢測當前線程是否被中斷;
判斷當前線程的中斷標志位,如果已經被中斷了,那么直接拋出異常并將中斷標志位設置為false。
嘗試獲取狀態;
調用tryAcquire獲取狀態,如果順利會獲取成功并返回。
構造節點并加入sync隊列;
獲取狀態失敗后,將當前線程引用構造為節點并加入到sync隊列中。退出隊列的方式在沒有中斷的場景下和acquireQueued類似,當頭結點是自己的前驅節點并且能夠獲取到狀態時,即可以運行,當然要將本節點設置為頭結點,表示正在運行。
中斷檢測。
在每次被喚醒時,進行中斷檢測,如果發現當前線程被中斷,那么拋出InterruptedException并退出循環。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException該方法提供了具備有超時功能的獲取狀態的調用,如果在指定的nanosTimeout內沒有獲取到狀態,那么返回false,反之返回true。可以將該方法看做acquireInterruptibly的升級版,也就是在判斷是否被中斷的基礎上增加了超時控制。
針對超時控制這部分的實現,主要需要計算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+
lastTime(睡眠之前記錄的時間)。如果nanosTimeout大于0,那么還需要使當前線程睡眠,反之則返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); //計算時間,當前時間減去睡眠之前的時間得到睡眠的時間,然后被 //原有超時時間減去,得到了還應該睡眠的時間 nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
加入sync隊列;
將當前線程構造成為節點Node加入到sync隊列中。
條件滿足直接返回;
退出條件判斷,如果前驅節點是頭結點并且成功獲取到狀態,那么設置自己為頭結點并退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。
獲取狀態失敗休眠一段時間;
通過LockSupport.unpark來指定當前線程休眠一段時間。
計算再次休眠的時間;
喚醒后的線程,計算仍需要休眠的時間,該時間表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠之前記錄的時間)。其中now
– lastTime表示這次睡眠所持續的時間。
休眠時間的判定。
喚醒后的線程,計算仍需要休眠的時間,并無阻塞的嘗試再獲取狀態,如果失敗后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超時,沒有獲取到鎖。 如果nanosTimeout小于等于1000L納秒,則進入快速的自旋過程。那么快速自旋會造成處理器資源緊張嗎?結果是不會,經過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應該測算了在線程調度器上的切換造成的額外開銷,因此在短時1000納秒內就讓當前線程進入快速自旋狀態,如果這時再休眠相反會讓nanosTimeout的獲取時間變得更加不精確。
上述過程可以如下圖所示:
上述這個圖中可以理解為在類似獲取狀態需要排隊的基礎上增加了一個超時控制的邏輯。每次超時的時間就是當前超時剩余的時間減去睡眠的時間,而在這個超時時間的基礎上進行了判斷,如果大于0那么繼續睡眠(等待),可以看出這個超時版本的獲取狀態只是一個近似超時的獲取狀態,因此任何含有超時的調用基本結果就是近似于給定超時。
public final void acquireShared(int arg)調用該方法能夠以共享模式獲取狀態,共享模式和之前的獨占模式有所區別。以文件的查看為例,如果一個程序在對其進行讀取操作,那么這一時刻,對這個文件的寫操作就被阻塞,相反,這一時刻另一個程序對其進行同樣的讀操作是可以進行的。如果一個程序在對其進行寫操作,那么所有的讀與寫操作在這一時刻就被阻塞,直到這個程序完成寫操作。
以讀寫場景為例,描述共享和獨占的訪問模式,如下圖所示:
上圖中,紅色代表被阻塞,綠色代表可以通過。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
嘗試獲取共享狀態;
調用tryAcquireShared來獲取共享狀態,該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。
獲取失敗進入sync隊列;
在獲取共享狀態失敗后,當前時刻有可能是獨占鎖被其他線程所把持,那么將當前線程構造成為節點(共享模式)加入到sync隊列中。
循環內判斷退出隊列條件;
如果當前節點的前驅節點是頭結點并且獲取共享狀態成功,這里和獨占鎖acquire的退出隊列條件類似。
獲取共享狀態成功;
在退出隊列的條件上,和獨占鎖之間的主要區別在于獲取共享狀態成功之后的行為,而如果共享狀態獲取成功之后會判斷后繼節點是否是共享模式,如果是共享模式,那么就直接對其進行喚醒操作,也就是同時激發多個線程并發的運行。
獲取共享狀態失敗。
通過使用LockSupport將當前線程從線程調度器上摘下,進入休眠狀態。
對于上述邏輯中,節點之間的通知過程如下圖所示:
上圖中,綠色表示共享節點,它們之間的通知和喚醒操作是在前驅節點獲取狀態時就進行的,紅色表示獨占節點,它的被喚醒必須取決于前驅節點的釋放,也就是release操作,可以看出來圖中的獨占節點如果要運行,必須等待前面的共享節點均釋放了狀態才可以。而獨占節點如果獲取了狀態,那么后續的獨占式獲取和共享式獲取均被阻塞。
public final boolean releaseShared(int arg)調用該方法釋放共享狀態,每次獲取共享狀態acquireShared都會操作狀態,同樣在共享鎖釋放的時候,也需要將狀態釋放。比如說,一個限定一定數量訪問的同步工具,每次獲取都是共享的,但是如果超過了一定的數量,將會阻塞后續的獲取操作,只有當之前獲取的消費者將狀態釋放才可以使阻塞的獲取操作得以運行。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
上述邏輯主要就是調用同步器的tryReleaseShared方法來釋放狀態,并同時在doReleaseShared方法中喚醒其后繼節點。
一個例子
在上述對同步器AbstractQueuedSynchronizer進行了實現層面的分析之后,我們通過一個例子來加深對同步器的理解:
設計一個同步工具,該工具在同一時刻,只能有兩個線程能夠并行訪問,超過限制的其他線程進入阻塞狀態。
對于這個需求,可以利用同步器完成一個這樣的設定,定義一個初始狀態,為2,一個線程進行獲取那么減1,一個線程釋放那么加1,狀態正確的范圍在[0,1,2]三個之間,當在0時,代表再有新的線程對資源進行獲取時只能進入阻塞狀態(注意在任何時候進行狀態變更的時候均需要以CAS作為原子性保障)。
public class TwinsLock implements Lock { private static final Sync sync = new Sync(); private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7889272986162341211L; { setState(2); } protected boolean tryAcquire(int arg) { if (arg != 1) { return false; } int currentStats = getState(); if (currentStats <= 0) { return false; } if (compareAndSetState(currentStats, currentStats - 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int arg) { if (arg != 1) { return false; } for(;;) { int currentStats = getState(); if (compareAndSetState(currentStats, currentStats + 1)) { setExclusiveOwnerThread(null); return true; } } } protected boolean isHeldExclusively() { return getState() < 2; } } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.tryAcquire(1); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } public void unlock() { sync.release(1); } }
這里我們編寫一個測試來驗證TwinsLock是否能夠正常工作并達到預期。
public class TwinsLockTest { @Test public void test() { final Lock lock = new TwinsLock(); class Worker extends Thread { public void run() { while (true) { lock.lock(); try { Thread.sleep(1000L); System.out.println(Thread.currentThread()); Thread.sleep(1000L); } catch (Exception ex) { } finally { lock.unlock(); } } } } for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.start(); } new Thread() { public void run() { while (true) { try { Thread.sleep(200L); System.out.println(); } catch (Exception ex) { } } } }.start(); try { Thread.sleep(20000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
上述測試用例的邏輯主要包括:
打印線程
Worker在兩次睡眠之間打印自身線程,如果一個時刻只能有兩個線程同時訪問,那么打印出來的內容將是成對出現。
分隔線程
不停的打印換行,能讓Worker的輸出看起來更加直觀。
該測試的結果是在一個時刻,僅有兩個線程能夠獲得到鎖,并完成打印,而表象就是打印的內容成對出現。
by 魏鵬 via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/64024.html
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:的主要功能和關鍵字一致,均是用于多線程的同步。而僅支持通過查詢當前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進入方法,并再次獲得鎖,而不會被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關鍵字類似。所謂的可重入是指,線程可對同一把鎖進行重復加鎖,而不會被阻...
摘要:簡介抽象隊列同步器,以下簡稱出現在中,由大師所創作。獲取成功則返回,獲取失敗,線程進入同步隊列等待。響應中斷版的超時響應中斷版的共享式獲取同步狀態,同一時刻可能會有多個線程獲得同步狀態。 1.簡介 AbstractQueuedSynchronizer (抽象隊列同步器,以下簡稱 AQS)出現在 JDK 1.5 中,由大師 Doug Lea 所創作。AQS 是很多同步器的基礎框架,比如 ...
閱讀 1460·2023-04-25 17:18
閱讀 1893·2021-10-27 14:18
閱讀 2132·2021-09-09 09:33
閱讀 1848·2019-08-30 15:55
閱讀 2023·2019-08-30 15:53
閱讀 3446·2019-08-29 16:17
閱讀 3434·2019-08-26 13:57
閱讀 1738·2019-08-26 13:46