摘要:在多線程中可以使用關(guān)鍵字來實(shí)現(xiàn)多線程之間同步互斥但在中新增加了類也能達(dá)到同樣的效果并且在擴(kuò)展功能上也更加強(qiáng)大比如具有嗅探鎖定多路分支通知公平鎖和非公平鎖等默認(rèn)功能而且在使用上也比更加的靈活使用實(shí)現(xiàn)同步調(diào)用對(duì)象的方法獲取鎖調(diào)用方法釋放鎖從運(yùn)行
在 Java 多線程中, 可以使用 synchronized 關(guān)鍵字來實(shí)現(xiàn)多線程之間同步互斥, 但在 JDK 1.5 中新增加了 ReentrantLock 類也能達(dá)到同樣的效果, 并且在擴(kuò)展功能上也更加強(qiáng)大, 比如具有嗅探鎖定, 多路分支通知, 公平鎖和非公平鎖等(默認(rèn))功能, 而且在使用上也比 synchronized 更加的靈活.
使用 ReentrantLock 實(shí)現(xiàn)同步public class MyService { private Lock lock = new ReentrantLock(); public void testMethod() { lock.lock(); for (int i = 0; i < 10; i++){ System.out.println("ThreadName=" + Thread.currentThread().getName() + (" " + (i + 1))); } lock.unlock(); } }
public class MyThread extends Thread { private MyService myService; public MyThread(MyService myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
public static void main(String[] args) throws IOException, InterruptedException { MyService myService = new MyService(); MyThread myThreadA = new MyThread(myService); MyThread myThreadB = new MyThread(myService); MyThread myThreadC = new MyThread(myService); MyThread myThreadD = new MyThread(myService); MyThread myThreadE = new MyThread(myService); myThreadA.start(); myThreadB.start(); myThreadC.start(); myThreadD.start(); myThreadE.start(); }
調(diào)用 ReentrantLock 對(duì)象的 lock() 方法獲取鎖, 調(diào)用 unLock() 方法釋放鎖.
從運(yùn)行結(jié)果來看, 當(dāng)前線程打印完畢之后將鎖進(jìn)行釋放, 其他的線程才可以繼續(xù)打印. 線程打印的數(shù)據(jù)是分組打印, 因?yàn)楫?dāng)前線程已經(jīng)持有鎖, 但線程之間打印的順序是隨機(jī)的.
使用 Condition 實(shí)現(xiàn)等待/通知關(guān)鍵字 synchronized 與 wait() 和 notify() / notifyall() 方法結(jié)合可以實(shí)現(xiàn)等待/通知模式, 只不過在使用時(shí), 調(diào)用 notify() 方法 JVM 會(huì)隨機(jī)選擇一個(gè) WAITNG 狀態(tài)的線程來執(zhí)行.
而使用 Condition 則可以更加靈活, 可以實(shí)現(xiàn) "選擇性通知", 可以指定的選擇喚醒哪些線程, 哪些線程繼續(xù)等待.
public class MyService { private Lock lock = new ReentrantLock(); public Condition conditionA = lock.newCondition(); public Condition conditionB = lock.newCondition(); public void awaitA() throws InterruptedException { lock.lock(); System.out.println("begin awaitA 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); conditionA.await(); System.out.println("end awaitA 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); lock.unlock(); } public void awaitB() throws InterruptedException { lock.lock(); System.out.println("begin awaitB 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); conditionB.await(); System.out.println("end awaitB 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); lock.unlock(); } public void signalAll_A() throws InterruptedException { lock.lock(); System.out.println("begin signalAll_A 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); conditionA.signalAll(); lock.unlock(); } public void signalAll_B() throws InterruptedException { lock.lock(); System.out.println("begin signalAll_B 時(shí)間" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName()); conditionB.signalAll(); lock.unlock(); } }
public class ThreadA extends Thread { private MyService myService; public ThreadA(MyService myService) { this.myService = myService; } @Override public void run() { try { myService.awaitA(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class ThreadB extends Thread { private MyService myService; public ThreadB(MyService myService) { this.myService = myService; } @Override public void run() { try { myService.awaitB(); } catch (InterruptedException e) { e.printStackTrace(); } } }
public static void main(String[] args) throws IOException, InterruptedException { MyService myService = new MyService(); ThreadA threadA = new ThreadA(myService); threadA.setName("a"); threadA.start(); ThreadB threadB = new ThreadB(myService); threadB.setName("b"); threadB.start(); Thread.sleep(3000); myService.signalAll_A(); }
Object 類中的 wait() 方法相當(dāng)于 Condition 類中的 await() 方法.
Object 類中的 wait(long timeout) 方法相當(dāng)于 Condition 類中的 await(long time, TimeUnit unit) 方法.
Object 類中的 notify() 方法相當(dāng)于 Condition 類中的 signal() 方法.
Object 類中的 notifyAll() 方法相當(dāng)于 Condition 類中的 signalAll() 方法.
從執(zhí)行結(jié)果來看, a 和 b 線程被暫停, 當(dāng)執(zhí)行 myService.signalAll_A() 方法時(shí), a 線程繼續(xù)執(zhí)行, 而 b 線程仍然是等待狀態(tài).
源碼ReentrantLock 類實(shí)現(xiàn)了 Lock, java.io.Serializable
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
兩個(gè)構(gòu)造方法, 默認(rèn)是非公平鎖, 如果為 true 表明是公平鎖.
可以看到 NonfairSync 和 FairSync 都是繼承了 Sync 這個(gè)抽象類, 而 Sync 則繼承了AQS. Sync、NonfairSync、FairSync 都是 ReentrantLock 的靜態(tài)內(nèi)部類, ReentrantLock 的許多方法都是Sync類代為實(shí)現(xiàn).
AbstractQueuedSynchronizer 核心方法AQS最核心的數(shù)據(jù)結(jié)構(gòu)是一個(gè) volatile int state 和 一個(gè) FIFO 線程等待對(duì)列.
state 代表共享資源的數(shù)量, 如果是互斥訪問, 一般設(shè)置為1, 而如果是共享訪問, 可以設(shè)置為N(N為可共享線程的個(gè)數(shù));
而線程等待隊(duì)列是一個(gè)雙向鏈表, 無法立即獲得鎖而進(jìn)入阻塞狀態(tài)的線程會(huì)加入隊(duì)列的尾部. 當(dāng)然對(duì) state 以及隊(duì)列的操作都是采用了 volatile + CAS + 自旋 的操作方式, 采用的是樂觀鎖的概念.
acquire 方法此方法是獨(dú)占模式下線程獲取共享資源的頂層入口.
public void lock() { sync.acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && //嘗試獲取鎖,若獲取成功,則state減1,返回true acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //若獲取鎖不成功,調(diào)用addWaiter方法使線程進(jìn)入等待隊(duì)列,acquireQueued方法讓線程進(jìn)入阻塞狀態(tài) selfInterrupt(); //檢查在等待過程中是否有中斷,若有中斷,則在此時(shí)再響應(yīng) }tryAcquire方法
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
為什么要拋出異常而不是聲明為抽象類呢?
因?yàn)锳QS是可選模式的, 我們選擇的是獨(dú)占模式, 就不需要去重寫 tryAcquireShared 方法, 如果我們選的是共享模式, 也不需要重寫 tryAcquire 方法, 因此AQS雖然是抽象類, 但是沒有抽象方法, 而是用拋出異常的方式代替.
addWaiter方法的主要是把當(dāng)前線程加入到FIFO等待隊(duì)列隊(duì)尾.
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//創(chuàng)建節(jié)點(diǎn) // 首先嘗試快速插入隊(duì)尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//CAS操作 pred.next = node; return node; } } enq(node);//若不成功,則嘗試以自旋方式插入隊(duì)尾 return node; }
private Node enq(final Node node) { for (;;) {//自旋方式不斷嘗試插入隊(duì)尾,直至成功為止 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }acquireQueued 方法
acquireQueued 方法, 主要是讓加入隊(duì)尾的線程進(jìn)入等待狀態(tài), 等到前面的進(jìn)程執(zhí)行完了, 再喚醒該線程, 去執(zhí)行同步代碼在這里是檢測(cè)是否應(yīng)該park()(park是一個(gè)Unsafe包中的native方法), 以及檢測(cè)在隊(duì)列的等待過程中是否有中斷, 在等待過程中是不響應(yīng)中斷的, 等到等待結(jié)束被喚醒時(shí), 才去向上傳遞是否中斷過的值.
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//自旋 final Node p = node.predecessor();//獲取前驅(qū)節(jié)點(diǎn) if (p == head && tryAcquire(arg)) {//若前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),便可以嘗試去獲取資源,若獲取到資源,則進(jìn)行下面的隊(duì)列修改 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //檢測(cè)是否需要等待及找到一個(gè)前驅(qū)未放棄的節(jié)點(diǎn),連接在后面 parkAndCheckInterrupt()) //等待,并且等到等待結(jié)束,返回是否被中斷過 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }常用方法 ReentrantLock 類
int getHoldCount() 查詢調(diào)用 lock() 方法的次數(shù).
final int getQueueLength() 估計(jì)等待鎖的線程數(shù). 比如有5個(gè)線程, 1個(gè)線程首先執(zhí)行 await() 方法, 那么在調(diào)用此方法后返回值是4, 說明有4個(gè)線程同時(shí)在等待lock的釋放.
int getWaitQueueLength(Condition condition) 返回與此鎖相關(guān)聯(lián)給定條件等待的線程數(shù)的估計(jì). 比如有5個(gè)線程, 每個(gè)線程都執(zhí)行了同一個(gè) condition 對(duì)象的 await() 方法, 則調(diào)用此方法時(shí)返回的值是5.
final boolean hasQueuedThreads() 判斷是否有線程等待此鎖.
final boolean hasQueuedThread(Thread thread) 判斷指定線程是否等待獲取此鎖.
boolean hasWaiters(Condition condition) 判斷線程有沒有調(diào)用 await() 方法.
void lockInterruptibly() throws InterruptedException 獲取鎖, 除非當(dāng)前線程為interrupted.
Condition 類void awaitUninterruptibly() 和 await() 區(qū)別就是當(dāng)調(diào)用 interrupt() 方法時(shí)不會(huì)拋出 InterrputedException 異常.
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/74007.html
摘要:公平策略在多個(gè)線程爭(zhēng)用鎖的情況下,公平策略傾向于將訪問權(quán)授予等待時(shí)間最長的線程。使用方式的典型調(diào)用方式如下二類原理的源碼非常簡單,它通過內(nèi)部類實(shí)現(xiàn)了框架,接口的實(shí)現(xiàn)僅僅是對(duì)的的簡單封裝,參見原理多線程進(jìn)階七鎖框架獨(dú)占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發(fā)于一世流云的專欄:https...
摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類似。所謂的可重入是指,線程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...
摘要:二什么是重入鎖可重入鎖,顧名思義,支持重新進(jìn)入的鎖,其表示該鎖能支持一個(gè)線程對(duì)資源的重復(fù)加鎖。將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有。可以使用和方法來檢查此情況是否發(fā)生。 一、寫在前面 前幾篇我們具體的聊了AQS原理以及底層源碼的實(shí)現(xiàn),具體參見 《J.U.C|一文搞懂AQS》《J.U.C|同步隊(duì)列(CLH)》《J.U.C|AQS獨(dú)占式源碼分析》《J.U.C|AQS共享式源...
摘要:前言回顧前面多線程三分鐘就可以入個(gè)門了源碼剖析多線程基礎(chǔ)必要知識(shí)點(diǎn)看了學(xué)習(xí)多線程事半功倍鎖機(jī)制了解一下簡簡單單過一遍只有光頭才能變強(qiáng)上一篇已經(jīng)將鎖的基礎(chǔ)簡單地過了一遍了,因此本篇主要是講解鎖主要的兩個(gè)子類那么接下來我們就開始吧一鎖首先我們來 前言 回顧前面: 多線程三分鐘就可以入個(gè)門了! Thread源碼剖析 多線程基礎(chǔ)必要知識(shí)點(diǎn)!看了學(xué)習(xí)多線程事半功倍 Java鎖機(jī)制了解一下 AQ...
摘要:在多線程編程中我們會(huì)遇到很多需要使用線程同步機(jī)制去解決的并發(fā)問題,而這些同步機(jī)制就是多線程編程中影響正確性和運(yùn)行效率的重中之重。這五個(gè)方法之所以能指定同步器的行為,則是因?yàn)橹械钠渌椒ň褪峭ㄟ^對(duì)這五個(gè)方法的調(diào)用來實(shí)現(xiàn)的。 在多線程編程中我們會(huì)遇到很多需要使用線程同步機(jī)制去解決的并發(fā)問題,而這些同步機(jī)制就是多線程編程中影響正確性和運(yùn)行效率的重中之重。這不禁讓我感到好奇,這些同步機(jī)制是如何...
閱讀 2563·2023-04-26 01:44
閱讀 2571·2021-09-10 10:50
閱讀 1419·2019-08-30 15:56
閱讀 2276·2019-08-30 15:44
閱讀 520·2019-08-29 11:14
閱讀 3425·2019-08-26 11:56
閱讀 3024·2019-08-26 11:52
閱讀 916·2019-08-26 10:27