摘要:當(dāng)?shù)竭_(dá)柵欄后,由于沒有滿足總數(shù)的要求,所以會(huì)一直等待,當(dāng)線程到達(dá)后,柵欄才會(huì)放行。任務(wù)其實(shí)就是當(dāng)最后一個(gè)線程到達(dá)柵欄時(shí),后續(xù)立即要執(zhí)行的任務(wù)。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、CyclicBarrier簡介
CyclicBarrier是一個(gè)輔助同步器類,在JDK1.5時(shí)隨著J.U.C一起引入。
這個(gè)類的功能和我們之前介紹的CountDownLatch有些類似。我們知道,CountDownLatch是一個(gè)倒數(shù)計(jì)數(shù)器,在計(jì)數(shù)器不為0時(shí),所有調(diào)用await的線程都會(huì)等待,當(dāng)計(jì)數(shù)器降為0,線程才會(huì)繼續(xù)執(zhí)行,且計(jì)數(shù)器一旦變?yōu)?,就不能再重置了。
CyclicBarrier可以認(rèn)為是一個(gè)柵欄,柵欄的作用是什么?就是阻擋前行。
顧名思義,CyclicBarrier是一個(gè)可以循環(huán)使用的柵欄,它做的事情就是:
讓線程到達(dá)柵欄時(shí)被阻塞(調(diào)用await方法),直到到達(dá)柵欄的線程數(shù)滿足指定數(shù)量要求時(shí),柵欄才會(huì)打開放行。
這其實(shí)有點(diǎn)像軍訓(xùn)報(bào)數(shù),報(bào)數(shù)總?cè)藬?shù)滿足教官認(rèn)為的總數(shù)時(shí),教官才會(huì)安排后面的訓(xùn)練。
可以看下面這個(gè)圖來理解下:
一共4個(gè)線程A、B、C、D,它們到達(dá)柵欄的順序可能各不相同。當(dāng)A、B、C到達(dá)柵欄后,由于沒有滿足總數(shù)【4】的要求,所以會(huì)一直等待,當(dāng)線程D到達(dá)后,柵欄才會(huì)放行。
從CyclicBarrier的構(gòu)造器,我們也可以看出關(guān)于這個(gè)類的一些端倪,CyclicBarrier有兩個(gè)構(gòu)造器:
構(gòu)造器一:
這個(gè)構(gòu)造器的參數(shù)parties就是之前說的需要滿足的計(jì)數(shù)總數(shù)。
構(gòu)造器二:
這個(gè)構(gòu)造器稍微特殊一些,除了指定了計(jì)數(shù)總數(shù)外,傳入了一個(gè)Runnable任務(wù)。
Runnable任務(wù)其實(shí)就是當(dāng)最后一個(gè)線程到達(dá)柵欄時(shí),后續(xù)立即要執(zhí)行的任務(wù)。
比如,軍訓(xùn)報(bào)數(shù)完畢后,總?cè)藬?shù)滿足了要求,教官就會(huì)開始命令大家執(zhí)行下一個(gè)任務(wù),這個(gè)【下一個(gè)任務(wù)】就是這里的Runnable。二、CyclicBarrier示例
我們來看一個(gè)CyclicBarrier的示例,來理解下它的功能。
假設(shè)現(xiàn)在有這樣一個(gè)場景:
5個(gè)運(yùn)動(dòng)員準(zhǔn)備跑步比賽,運(yùn)動(dòng)員在賽跑前會(huì)準(zhǔn)備一段時(shí)間,當(dāng)裁判發(fā)現(xiàn)所有運(yùn)動(dòng)員準(zhǔn)備完畢后,就舉起發(fā)令槍,比賽開始。
這里的起跑線就是屏障,運(yùn)動(dòng)員必須在起跑線等待其他運(yùn)動(dòng)員準(zhǔn)備完畢。
public class CyclicBarrierTest { public static void main(String[] args) { int N = 5; // 運(yùn)動(dòng)員數(shù) CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 所有運(yùn)動(dòng)員已準(zhǔn)備完畢,發(fā)令槍:跑!******"); } }); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運(yùn)動(dòng)員[" + i + "]"); t.start(); } } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成"); cb.await(); // 在柵欄等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
執(zhí)行上面的程序,可能的輸出結(jié)果如下:
運(yùn)動(dòng)員[3]: 準(zhǔn)備完成 運(yùn)動(dòng)員[1]: 準(zhǔn)備完成 運(yùn)動(dòng)員[0]: 準(zhǔn)備完成 運(yùn)動(dòng)員[2]: 準(zhǔn)備完成 運(yùn)動(dòng)員[4]: 準(zhǔn)備完成 ****** 所有運(yùn)動(dòng)員已準(zhǔn)備完畢,發(fā)令槍:跑!******
從輸出可以看到,線程到達(dá)柵欄時(shí)會(huì)被阻塞(調(diào)用await方法),直到到達(dá)柵欄的線程數(shù)滿足指定數(shù)量要求時(shí),柵欄才會(huì)打開放行。
CyclicBarrier對(duì)異常的處理我們知道,線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數(shù)達(dá)到指定數(shù)目,萬一線程被中斷導(dǎo)致最終的等待線程數(shù)達(dá)不到柵欄的要求怎么辦?
CyclicBarrier一定有考慮到這種異常情況,不然其它所有等待線程都會(huì)無限制地等待下去。
那么CyclicBarrier是如何處理的呢?
我們看下CyclicBarrier的await()方法:
public int await() throws InterruptedException, BrokenBarrierException { //... }
可以看到,這個(gè)方法除了拋出InterruptedException異常外,還會(huì)拋出BrokenBarrierException。
BrokenBarrierException表示當(dāng)前的CyclicBarrier已經(jīng)損壞了,可能等不到所有線程都到達(dá)柵欄了,所以已經(jīng)在等待的線程也沒必要再等了,可以散伙了。
出現(xiàn)以下幾種情況之一時(shí),當(dāng)前等待線程會(huì)拋出BrokenBarrierException異常:
其它某個(gè)正在await等待的線程被中斷了
其它某個(gè)正在await等待的線程超時(shí)了
某個(gè)線程重置了CyclicBarrier(調(diào)用了reset方法,后面會(huì)講到)
另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會(huì)認(rèn)為肯定是湊不齊所有線程了,就會(huì)將柵欄置為損壞(Broken)狀態(tài),并傳播BrokenBarrierException給其它所有正在等待(await)的線程。
我們來對(duì)上面的例子做個(gè)改造,模擬下異常情況:
public class CyclicBarrierTest { public static void main(String[] args) throws InterruptedException { int N = 5; // 運(yùn)動(dòng)員數(shù) CyclicBarrier cb = new CyclicBarrier(N, new Runnable() { @Override public void run() { System.out.println("****** 所有運(yùn)動(dòng)員已準(zhǔn)備完畢,發(fā)令槍:跑!******"); } }); Listlist = new ArrayList<>(); for (int i = 0; i < N; i++) { Thread t = new Thread(new PrepareWork(cb), "運(yùn)動(dòng)員[" + i + "]"); list.add(t); t.start(); if (i == 3) { t.interrupt(); // 運(yùn)動(dòng)員[3]置中斷標(biāo)志位 } } Thread.sleep(2000); System.out.println("Barrier是否損壞:" + cb.isBroken()); } private static class PrepareWork implements Runnable { private CyclicBarrier cb; PrepareWork(CyclicBarrier cb) { this.cb = cb; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成"); cb.await(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + ": 被中斷"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName() + ": 拋出BrokenBarrierException"); } } } }
可能的輸出結(jié)果:
運(yùn)動(dòng)員[0]: 準(zhǔn)備完成 運(yùn)動(dòng)員[2]: 準(zhǔn)備完成 運(yùn)動(dòng)員[1]: 準(zhǔn)備完成 運(yùn)動(dòng)員[3]: 準(zhǔn)備完成 運(yùn)動(dòng)員[3]: 被中斷 運(yùn)動(dòng)員[4]: 準(zhǔn)備完成 運(yùn)動(dòng)員[4]: 拋出BrokenBarrierException 運(yùn)動(dòng)員[0]: 拋出BrokenBarrierException 運(yùn)動(dòng)員[1]: 拋出BrokenBarrierException 運(yùn)動(dòng)員[2]: 拋出BrokenBarrierException Barrier是否損壞:true
這段代碼,模擬了中斷線程3的情況,從輸出可以看到,線程0、1、2首先到達(dá)Brrier等待。
然后線程3到達(dá),由于之前設(shè)置了中斷標(biāo)志位,所以線程3拋出中斷異常,導(dǎo)致Barrier損壞,此時(shí)所有已經(jīng)在柵欄等待的線程(0、1、2)都會(huì)拋出BrokenBarrierException異常。
此時(shí),即使再有其它線程到達(dá)柵欄(線程4),都會(huì)拋出BrokenBarrierException異常。
注意:使用CyclicBarrier時(shí),對(duì)異常的處理一定要小心,比如線程在到達(dá)柵欄前就拋出異常,此時(shí)如果沒有重試機(jī)制,其它已經(jīng)到達(dá)柵欄的線程會(huì)一直等待(因?yàn)闆]有還沒有滿足總數(shù)),最終導(dǎo)致程序無法繼續(xù)向下執(zhí)行。三、CyclicBarrier原理 CyclicBarrier的構(gòu)造
CyclicBarrier有兩個(gè)構(gòu)造器:
CyclicBarrier cb = new CyclicBarrier(10);
構(gòu)造器內(nèi)部的各個(gè)字段含義如下:
字段名 | 作用 |
---|---|
parties | 柵欄開啟需要的到達(dá)線程總數(shù) |
count | 剩余未到達(dá)的線程總數(shù) |
barrierCommand | 最后一個(gè)線程到達(dá)后執(zhí)行的任務(wù) |
CyclicBarrier 并沒有自己去實(shí)現(xiàn)AQS框架的API,而是利用了ReentrantLock和Condition。
public class CyclicBarrier { private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); // 柵欄開啟需要的到達(dá)線程總數(shù) private final int parties; // 最后一個(gè)線程到達(dá)后執(zhí)行的任務(wù) private final Runnable barrierCommand; // 剩余未到達(dá)的線程總數(shù) private int count; // 當(dāng)前輪次的運(yùn)行狀態(tài) private Generation generation = new Generation(); // ... }
需要注意的是generation這個(gè)字段:
我們知道,CyclicBarrier 是可以循環(huán)復(fù)用的,所以CyclicBarrier 的每一輪任務(wù)都需要對(duì)應(yīng)一個(gè)generation 對(duì)象。
generation 對(duì)象內(nèi)部有個(gè)broken字段,用來標(biāo)識(shí)當(dāng)前輪次的CyclicBarrier 是否已經(jīng)損壞。
nextGeneration方法用來創(chuàng)建一個(gè)新的generation 對(duì)象,并喚醒所有等待線程,重置內(nèi)部參數(shù)。
我們先來看下await方法:
可以看到,無論有沒有超時(shí)功能,內(nèi)部都是調(diào)了dowait這個(gè)方法:
dowait方法并不復(fù)雜,一共有3部分:
判斷柵欄是否已經(jīng)損壞或當(dāng)前線程已經(jīng)被中斷,如果是會(huì)分別拋出異常;
如果當(dāng)前線程是最后一個(gè)到達(dá)的線程,會(huì)嘗試執(zhí)行最終任務(wù)(如果構(gòu)造CyclicBarrier對(duì)象時(shí)有傳入Runnable的話),執(zhí)行成功即返回,失敗會(huì)破壞柵欄;
對(duì)于不是最后一個(gè)到達(dá)的線程,會(huì)在Condition隊(duì)列上等待,為了防止被意外喚醒,這里用了一個(gè)自旋操作。
破壞柵欄用的是breakBarrier方法:
再來看下CyclicBarrier的reset方法:
該方法先破壞柵欄,然后開始下一輪(新建一個(gè)generation對(duì)象)。
四、CyclicBarrier接口/類聲明類聲明:
構(gòu)造器聲明:
接口聲明:
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/76618.html
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:線程可以調(diào)用的方法進(jìn)入阻塞,當(dāng)計(jì)數(shù)值降到時(shí),所有之前調(diào)用阻塞的線程都會(huì)釋放。注意的初始計(jì)數(shù)值一旦降到,無法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、CountDownLatch簡介 CountDow...
摘要:分層支持分層一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的對(duì)象的父結(jié)點(diǎn)。當(dāng)一個(gè)的參與者數(shù)量變成時(shí),如果有該有父結(jié)點(diǎn),就會(huì)將它從父結(jié)點(diǎn)中溢移除。當(dāng)首次將某個(gè)結(jié)點(diǎn)鏈接到樹中時(shí),會(huì)同時(shí)向該結(jié)點(diǎn)的父結(jié)點(diǎn)注冊(cè)一個(gè)參與者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首發(fā)于一世流云專欄:https://segme...
摘要:在時(shí),引入了包,該包中的大多數(shù)同步器都是基于來構(gòu)建的。框架提供了一套通用的機(jī)制來管理同步狀態(tài)阻塞喚醒線程管理等待隊(duì)列。指針用于在結(jié)點(diǎn)線程被取消時(shí),讓當(dāng)前結(jié)點(diǎn)的前驅(qū)直接指向當(dāng)前結(jié)點(diǎn)的后驅(qū)完成出隊(duì)動(dòng)作。 showImg(https://segmentfault.com/img/remote/1460000016012438); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:無限期等待另一個(gè)線程執(zhí)行特定操作。線程安全基本版請(qǐng)說明以及的區(qū)別值都不能為空數(shù)組結(jié)構(gòu)上,通過數(shù)組和鏈表實(shí)現(xiàn)。優(yōu)先考慮響應(yīng)中斷,而不是響應(yīng)鎖的普通獲取或重入獲取。只是在最后獲取鎖成功后再把當(dāng)前線程置為狀態(tài)然后再中斷線程。 前段時(shí)間在慕課網(wǎng)直播上聽小馬哥面試勸退(面試虐我千百遍,Java 并發(fā)真討厭),發(fā)現(xiàn)講得東西比自己拿到offer還要高興,于是自己在線下做了一點(diǎn)小筆記,供各位參考。 課...
閱讀 1593·2021-09-02 15:41
閱讀 998·2021-09-02 15:11
閱讀 1280·2021-07-28 00:15
閱讀 2309·2019-08-30 15:55
閱讀 1145·2019-08-30 15:54
閱讀 1694·2019-08-30 15:54
閱讀 2975·2019-08-30 14:02
閱讀 2524·2019-08-29 16:57