摘要:所以就有了讀寫鎖。只要沒有,讀取鎖可以由多個線程同時保持。其讀寫鎖為兩個內部類都實現了接口。讀寫鎖同樣依賴自定義同步器來實現同步狀態的,而讀寫狀態就是其自定義同步器的狀態。判斷申請寫鎖數量是否超標超標則直接異常,反之則設置共享狀態。
一、寫在前面
在上篇我們聊到了可重入鎖(排它鎖)ReentrantLcok ,具體參見《J.U.C|可重入鎖ReentrantLock》
ReentrantLcok 屬于排它鎖,本章我們再來一起聊聊另一個我們工作中出鏡率很高的讀-寫鎖。
二、簡介重入鎖ReentrantLock是排他鎖(互斥鎖),排他鎖在同一時刻僅有一個線程可訪問,但是在大多數場景下,大部分時間都是提供讀服務的,而寫服務占用極少的時間,然而讀服務不存在數據競爭的問題,如果一個線程在讀時禁止其他線程讀勢必會降低性能。所以就有了讀寫鎖。
讀寫鎖內部維護著一對鎖,一個讀鎖和一個寫鎖。通過分離讀鎖和寫鎖,使得并發性比一般排他鎖有著顯著的提升。
讀寫鎖在同一時間可以允許多個讀線程同時訪問,但是寫線程訪問時,所有的讀線程和寫線程都會阻塞。
主要有以下特征:
公平性選擇:支持非公平(默認)和公平的鎖獲取方式,吞吐量還是非公平優于公平。
重進入:該鎖支持重進入,以讀寫線程為列,讀線程在獲取到讀鎖之后,能再次獲取讀鎖。而寫線程在獲取寫鎖后能夠再次獲取寫鎖,同時也可以獲取讀鎖。
鎖降級:遵循獲取寫鎖、讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖。
讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。 鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成為讀鎖 讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用于只讀操作,另一個用于寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。三、主要方法介紹
讀寫鎖ReentrantReadWriteLock 實現了ReadWriteLock 接口,該接口維護一對相關的鎖即讀鎖和寫鎖。
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
ReadWriteLock定義了兩個方法。readLock()返回用于讀操作的鎖,writeLock()返回用于寫操作的鎖。ReentrantReadWriteLock定義如下:
/** 內部類 讀鎖*/ private final ReentrantReadWriteLock.ReadLock readerLock; /** 內部類 寫鎖*/ private final ReentrantReadWriteLock.WriteLock writerLock; /** 執行所有同步機制 */ final Sync sync; // 默認實現非公平鎖 public ReentrantReadWriteLock() { this(false); } // 利用給定的公平策略初始化ReentrantReadWriteLock public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } // 返回寫鎖 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } //返回讀鎖 public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } // 實現同步器,也是實現鎖的核心 abstract static class Sync extends AbstractQueuedSynchronizer { // 省略實現代碼 } // 公平鎖的實現 static final class FairSync extends Sync { // 省略實現代碼 } // 非公平鎖的實現 static final class NonfairSync extends Sync { // 省略實現代碼 } // 讀鎖實現 public static class ReadLock implements Lock, java.io.Serializable { // 省略實現代碼 } // 寫鎖實現 public static class WriteLock implements Lock, java.io.Serializable { // 省略實現代碼 }
ReentrantReadWriteLock 和 ReentrantLock 其實都一樣,鎖核心都是Sync, 讀鎖和寫鎖都是基于Sync來實現的。從這分析其實ReentrantReadWriteLock就是一個鎖,只不過內部根據不同的場景設計了兩個不同的實現方式。其讀寫鎖為兩個內部類: ReadLock、WriteLock 都實現了Lock 接口。
讀寫鎖同樣依賴自定義同步器來實現同步狀態的, 而讀寫狀態就是其自定義同步器的狀態?;叵隦eentantLock 中自定義同步器的實現,同步狀態表示鎖被一個線程重復獲取的次數,而讀寫鎖中的自定義同步器需要在一個同步狀態(一個整型變量)上維護多個讀線程和寫線程的狀況,而該狀態的設計成為關鍵。
如何在一個整型上維護多種狀態,那就需要‘按位切割使用’這個變量,讀寫鎖將變量切割成兩部分,高16位表示讀,低16位表示寫。
分割之后,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態呢?通過位運算,假如當前同步狀態為S,那么寫狀態等于 S & 0x0000FFFF(將高16位全部抹去),讀狀態等于S >>> 16(無符號補0右移16位)。代碼如下:
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }四、寫鎖的獲取與釋放
寫鎖是一個支持重入的排他鎖,如果當前線程已經獲取了寫鎖,則增加寫狀態。如果當前線程獲取寫鎖時讀鎖已經被獲取或者該線程不是已經獲取寫鎖的線程,則當前線程進入等待狀態。
寫鎖的獲取
寫鎖的獲取入口通過WriteLock的lock方法
public void lock() { sync.acquire(1); }
Sync的acquire(1)方法 定義在AQS中
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg) 方法除了重入方法外,還增加了是否存在讀鎖的判斷,如果讀鎖存在、則不能獲取寫鎖。原因在于寫操作要對所有的讀操作的可見性。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); // 獲取同步狀態 int c = getState(); // 獲取寫鎖的獲取次數 int w = exclusiveCount(c); // 已有線程獲取鎖 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) /** * w == 0 表示存在讀鎖(同步狀態不等于0說明已有線程獲取到鎖(讀/寫) * 而寫鎖狀態為0則說明不存在寫鎖,所以只能是讀鎖了) * current != getExclusiveOwnerThread()) 不是自己獲取的寫鎖 * * 如果存在讀鎖或者持有寫鎖的線程不是自己,直接返回false */ if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果獲取寫鎖的數量超過最大值65535 ,直接異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 設置共享狀態 setState(c + acquires); return true; } /** * writerShouldBlock() 是否需要阻塞寫鎖,這里直接返回的是false * compareAndSetState(c, c + acquires) 設置寫鎖的狀態 */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
寫鎖的獲取基本和RenntrantLock 類似
判斷當前是否有線程持有寫鎖(寫鎖的狀態是否為0)
寫鎖的狀態不為0,如果存在讀鎖或者寫鎖不是自己持有則直接返回fasle。
判斷申請寫鎖數量是否超標(> 65535),超標則直接異常,反之則設置共享狀態。
寫鎖狀態為0,如果寫鎖需要阻塞或者CAS設置共享狀態失敗,則直接返回false,否則獲取鎖成功,設置持鎖線程為自己。
來張圖加深下理解
寫鎖的釋放
寫鎖的釋放和ReentrantLock 極為相似, 每次釋放就是狀態減1 ,當狀態為0表示釋放成功。
寫鎖釋放的入口WriteLock中的unlock方法
public void unlock() { sync.release(1) }
Sync 中release方法由AQS中實現的
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease(arg) 方法釋放共享狀態,非常簡單就是共享狀態減1,為0表示釋放成功
protected final boolean tryRelease(int releases) { // 判斷鎖持有者是否是自己 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 共享狀態值 - release int nextc = getState() - releases; // 判斷寫鎖數量是否為0 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
寫鎖的釋放很簡單
首先判斷鎖持有者不是自己則直接異常
是自己則將共享狀態 -1
判斷寫鎖數量是否為0,如果為0將持有鎖線程變量設為null
設置共享狀態
來張圖加深下理解
五、讀鎖的獲取與釋放讀鎖為一個可重入的共享鎖,它能夠被多個線程同時持有,在沒有其他寫線程訪問時,讀鎖總是獲取成功,所需要的也就是(線程安全的)增加讀狀態。
讀鎖的獲取
讀鎖的獲取可以通過ReadLock.lock()方法。
public void lock() { //讀鎖是一個可重入共享鎖,委托給內部類Sync實現 sync.acquireShared(1); }
Sync的acquireShared(1)方法定義在AQS中
public final void acquireShared(int arg) { // AQS 中 嘗試獲取共享狀態,如果共享狀態大于等于0則說明獲取鎖成功,否則加入同步隊列。 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared(int unused)方法中,如果其他線程獲取了寫鎖,則讀鎖獲取失敗線程將進入等待狀態,如果當前線程獲取寫鎖或者寫鎖未被獲取則利用CAS(線程安全的)增加同步狀態,成功則獲取鎖。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 獲取共享狀態 int c = getState(); // 判斷是否有寫鎖 && 持有寫鎖的線程是否是自己,為true直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲取共享資源的數量 int r = sharedCount(c); /** * readerShouldBlock():判斷鎖是否需要等待(公平鎖原則) * r < MAX_COUNT:判斷鎖的數量是否超過最大值65535 * compareAndSetState(c, c + SHARED_UNIT): 設置共享狀態(讀鎖狀態) */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // r==0 :當前沒有任何線程獲取讀鎖 if (r == 0) { // 設置當前線程為第一個獲取讀鎖的線程 firstReader = current; // 計數設置為1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 表示重入鎖,在計數其上+1 firstReaderHoldCount++; } else { /** * HoldCounter 主要是一個類來記錄線程獲取鎖的數量 * cachedHoldCounter 緩存的是最后一個獲取鎖線程的HoldCounter對象 */ HoldCounter rh = cachedHoldCounter; // 如果緩存不存在,或者線程不是自己 if (rh == null || rh.tid != getThreadId(current)) // 從當前線程本地變量ThreadLocalHoldCounter 中獲取HoldCounter 并賦值給 cachedHoldCounter, rh cachedHoldCounter = rh = readHolds.get(); // 如果緩存的HoldCounter 是當前的線程的,且計數為0 else if (rh.count == 0) // 將rh 存到ThreadLocalHoldCounter 中,將計數+1 readHolds.set(rh); rh.count++; } return 1; } /** * 進入fullTryAcquireShared(current) 條件 * 1: readerShouldBlock() = true * 2: r < MAX_COUNT = false 讀鎖達到最大 * 3: 設置共享狀態失敗 return fullTryAcquireShared(current); }
NonfairSync 中的 readerShouldBlock() 方法判斷當前申請讀鎖的線程是否需要阻塞
final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); }
apparentlyFirstQueuedIsExclusive() 判斷同步隊列中老二節點是否是獨占式(獲取寫鎖請求)是返回ture 否則返回false
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; // 主要條件判斷下一個節點是否是獲取寫鎖線程在排隊 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
自旋來獲取讀鎖,個人感覺對tryAcquireShared(int unused) 方法獲取讀鎖失敗的一種補救,其實現邏輯基本相同。
final int fullTryAcquireShared(Thread current) { // 線程內部計數器 HoldCounter rh = null; // 自旋 for (;;) { // 獲取共享狀態 int c = getState(); /** * exclusiveCount(c) !=0:存在獨占鎖(寫鎖) * getExclusiveOwnerThread() != current 判斷是否是自己持有寫鎖 * 再次是寫鎖是否是自己 */ if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) {//判斷讀鎖是否需要阻塞 // 如果需要阻塞,表示除了當前線程持有寫鎖外,還有其他線程在等待獲取寫鎖,故,即使申請讀鎖的線程已經持有寫鎖(寫鎖內部再次申請讀鎖,俗稱鎖降級)還是會失敗,因為有其他線程也在申請寫鎖,此時,只能結束本次申請讀鎖的請求,轉而去排隊,否則,將造成死鎖 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 到這里其實就寫鎖的一個讓步, 清楚HoldCounter 緩存 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 下面邏輯和tryAcquireShared(int unused) 基本相同不再解釋了 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
讀鎖的獲取稍微有點復雜,整個過程如下
如果其他線程獲取了寫鎖、則獲取讀鎖失敗。
如果當前線程獲取到了寫鎖或者寫鎖未被獲取則利用CAS(線程安全的)增加讀鎖狀態
否則 fullTryAcquireShared(Thread current) 自旋方式再次來嘗試獲取。
讀鎖獲取流程圖如下
讀鎖的釋放
讀鎖的釋放通過ReadLock的unlock()方式釋放的。
public void unlock() { sync.releaseShared(1); }
Sync的releaseShared(1)同樣定義在AQS中
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
調用tryReleaseShared(int unused) 方法來釋放共享狀態。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //判斷當前線程釋放是第一個獲取讀鎖的線程 if (firstReader == current) { // assert firstReaderHoldCount > 0; // 判斷獲取鎖的次數釋放為1,如果為1說明沒有重入情況,直接釋放firstReader = null;否則將該線程持有鎖的數量 -1 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果當前線程不是第一個獲取讀鎖的線程。 // 獲取緩存中的HoldCounter HoldCounter rh = cachedHoldCounter; // 如果緩存中的HoldCounter 不屬于當前線程則獲取當前線程的HoldCounter。 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 如果線程持有鎖的數量小于等1 直接刪除HoldCounter readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 持有鎖數量大于1 則執行 - 1操作 --rh.count; } // 自旋釋放同步狀態 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
鎖的釋放比較簡單,
首先看當前線程是否是第一個獲取讀鎖的線程,如果是并且沒有發生重入,則將首次獲取讀鎖變量設為null, 如果發生重入,則將首次獲取讀鎖計數器 -1
其次 查看緩存中計數器是否為空或者是否是當前線程,如果為空或者不是則獲取當前線程的計數器,如果計數器個數小于等1, 從ThreadLocl 中刪除計數器,并計數器值-1,如果小于等于0異常 。
最后自旋修改同步狀態。
讀鎖釋放流程圖如下
六、總結通過上面的源碼分析,我們來總結下:
在線程持有讀鎖的情況下,該線程不能取得寫鎖(為了保證寫操作對后續所有的讀操作保持可見性)。
在線程持有寫鎖的情況下,該線程可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被占用,只有寫鎖沒有被當前線程占用的情況才會獲取失敗)。
仔細想想,這個設計是合理的:因為當線程獲取讀鎖的時候,可能有其他線程同時也在持有讀鎖,因此不能把獲取讀鎖的線程“升級”為寫鎖;而對于獲得寫鎖的線程,它一定獨占了讀寫鎖,因此可以繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖后,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀
一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;寫鎖可以“降級”為讀鎖;讀鎖不能“升級”為寫鎖。
因技術水平有限,如有不對的地方,歡迎拍磚
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74375.html
摘要:所以就有了讀寫鎖。只要沒有,讀取鎖可以由多個線程同時保持。其讀寫鎖為兩個內部類都實現了接口。讀寫鎖同樣依賴自定義同步器來實現同步狀態的,而讀寫狀態就是其自定義同步器的狀態。判斷申請寫鎖數量是否超標超標則直接異常,反之則設置共享狀態。 一、寫在前面 在上篇我們聊到了可重入鎖(排它鎖)ReentrantLcok ,具體參見《J.U.C|可重入鎖ReentrantLock》 Reentra...
摘要:關于,最后有兩點規律需要注意當的等待隊列隊首結點是共享結點,說明當前寫鎖被占用,當寫鎖釋放時,會以傳播的方式喚醒頭結點之后緊鄰的各個共享結點。當的等待隊列隊首結點是獨占結點,說明當前讀鎖被使用,當讀鎖釋放歸零后,會喚醒隊首的獨占結點。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首發于一世流云的專欄:...
摘要:不同的是它還多了內部類和內部類,以及讀寫對應的成員變量和方法。另外是給和內部類使用的。內部類前面說到的操作是分配到里面執行的。他們都是接口的實現,所以其實最像應該是這個兩個內部類。而且大體上也沒什么差異,也是用的內部類。 之前講了《AQS源碼閱讀》和《ReentrantLock源碼閱讀》,本次將延續閱讀下ReentrantReadWriteLock,建議沒看過之前兩篇文章的,先大概了解...
摘要:類顧名思義是一種讀寫鎖它是接口的直接實現該類在內部實現了具體獨占鎖特點的寫鎖以及具有共享鎖特點的讀鎖和一樣類也是通過定義內部類實現框架的來實現獨占共享的功能屬于排他鎖這些鎖在同一時刻只允許一個線程進行訪問但是在大多數場景下大部分時間都是提供 ReentrantReadWriteLock 類, 顧名思義, 是一種讀寫鎖, 它是 ReadWriteLock 接口的直接實現, 該類在內部實現...
摘要:我們知道,的作用其實是對類的和的增強,是為了讓線程在指定對象上等待,是一種線程之間進行協調的工具。當線程調用對象的方法時,必須拿到和這個對象關聯的鎖。 showImg(https://segmentfault.com/img/remote/1460000016012566); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、Reentr...
閱讀 1216·2021-11-22 12:05
閱讀 1343·2021-09-29 09:35
閱讀 640·2019-08-30 15:55
閱讀 3133·2019-08-30 14:12
閱讀 960·2019-08-30 14:11
閱讀 2881·2019-08-30 13:10
閱讀 2406·2019-08-29 16:33
閱讀 3335·2019-08-29 11:02