摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護(hù)版本號,則應(yīng)該保證遞增。
前言
相比上一篇而言,本文不需要太多的準(zhǔn)備知識,但技巧性更強(qiáng)一些。因為分析、設(shè)計的過程比較復(fù)雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內(nèi)容也是后一篇實戰(zhàn)中需要用到的一個工具類。
需求介紹我需要編寫一個同步工具,它需要提供這樣幾個方法:await、pass、cancel。某個線程調(diào)用await時,會被阻塞;當(dāng)調(diào)用pass方法時,之前因為await而阻塞的線程將全部被解除阻塞,之后調(diào)用await的線程繼續(xù)被阻塞,直到下一次調(diào)用pass。
該工具同時還維護(hù)一個版本號,await方法可以帶一個目標(biāo)版本號,如果當(dāng)前的版本號比目標(biāo)版本號新或相同,則直接通過,否則,阻塞本線程,直到到達(dá)或超過目標(biāo)版本。調(diào)用pass的時候,更新版本號。
如果停止了版本更新,可使用cancel方法來解除所有因await而阻塞的線程,包括指定版本號的。此方法用于避免無謂地等待。若await發(fā)生在cancel之后,則仍將被阻塞。
因為CountDownLatch不允許重復(fù)使用,CyclicBarrier只支持固定個數(shù)的線程,并且都沒有維護(hù)一個版本號,所以沒有已有的類能實現(xiàn)上面的需求,需要自己實現(xiàn)。
問題分析簡單分析可知,應(yīng)該維護(hù)一個隊列,來保存當(dāng)前被阻塞的線程,用于在pass時對它們一一解除阻塞,pass時應(yīng)該使用一個新的隊列,否則不方便正確處理pass前和pass后調(diào)用await的線程。
至此,問題的關(guān)鍵就明了了:如何將隊列的替換和版本號的更新這兩個操作做成原子的。
解決方案以前在《JAVA并發(fā)編程實踐》曾看到過這樣一個小技巧,如果要原子地更新兩個變量,那么可以創(chuàng)建一個新的類將它們封裝起來,將這兩個變量當(dāng)定義成類成員變量,更新時,用CAS更新這個類的引用即可。
因為較為復(fù)雜,下面先給出完整的代碼,再講解其中的關(guān)鍵。
注意:上面所說pass,在代碼中的具體實現(xiàn)為nextCycle,有兩個版本,一個自動維護(hù)版本號,一個由調(diào)用者維護(hù)版本號。
/** * @author trytocatch@163.com * @time 2013-1-31 */ public class BoundlessCyclicBarrier { protected final AtomicReference代碼分析waitQueueRef; public BoundlessCyclicBarrier() { this(0); } public BoundlessCyclicBarrier(int startVersion) { waitQueueRef = new AtomicReference (new VersionQueue(startVersion)); } public final void awaitWithAssignedVersion(int myVersion) throws InterruptedException { awaitImpl(true, myVersion, 0); } /** * * @param myVersion * @param nanosTimeout * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException { return awaitImpl(true, myVersion, nanosTimeout); } public final void await() throws InterruptedException { awaitImpl(false, 0, 0); } /** * * @param nanosTimeout * @return if and only if timeout, returns false * @throws InterruptedException */ public final boolean await(long nanosTimeout) throws InterruptedException { return awaitImpl(false, 0, nanosTimeout); } /** * pass and version++(some threads may not be unparked when awaitImpl is in process, but it"s OK in this Barrier) * @return old queue version */ public int nextCycle() { VersionQueue oldQueue = waitQueueRef.get(); VersionQueue newQueue = new VersionQueue(oldQueue.version + 1); for(;;){ if (waitQueueRef.compareAndSet(oldQueue, newQueue)) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); break; } oldQueue = waitQueueRef.get(); newQueue.version = oldQueue.version + 1; } return oldQueue.version; } /** * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right) * @param newAssignVersion */ public void nextCycle(int newAssignVersion) { VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion)); for (Thread t : oldQueue.queue) LockSupport.unpark(t); } /** * if version update has stopped, invoke this to awake all threads */ public void cancel(){ VersionQueue oldQueue = waitQueueRef.get(); if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); } public final int getVersion() { return waitQueueRef.get().version; } private static final class VersionQueue { final private ConcurrentLinkedQueue queue; int version; final boolean isCancelQueue; VersionQueue(int curVersion){ this(curVersion, false); } VersionQueue(int curVersion, boolean isCancelQueue) { this.version = curVersion; this.isCancelQueue = isCancelQueue; queue = new ConcurrentLinkedQueue(); } } /** * * @param assignVersion is myVersion available * @param myVersion wait for this version * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl(boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0; long lastTime = System.nanoTime(); VersionQueue newQueue = waitQueueRef.get();//A if (assignVersion && newQueue.version - myVersion >= 0) return true; while (true) { VersionQueue submitQueue = newQueue;//B submitQueue.queue.add(Thread.currentThread());//C while (true) { newQueue = waitQueueRef.get();//D if (newQueue != submitQueue){//E: it"s a new cycle if(assignVersion == false) return true; else if(newQueue.version - myVersion >= 0) return true; else if (newQueue.isCancelQueue)//F: be canceled return false; else//just like invoking awaitImpl again break; } if (timeOutEnable) { if (nanosTimeout <= 0) return false; LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } else LockSupport.park(this); if (Thread.interrupted()) throw new InterruptedException(); } } } }
先分析一下awaitImpl方法,A和D是該方法的關(guān)鍵點,決定著它屬于哪一個批次,對應(yīng)哪一個版本。這里有個小細(xì)節(jié),在nexeCycle,cancel解除阻塞時,該線程可能并不在隊列中,因為插入隊列發(fā)生在C處,這在A和D之后(雖然看起來C在D之前,但D取到的queue要在下一次循環(huán)時才被當(dāng)作submitQueue),所以,在E處再進(jìn)行了一次判斷,開始解除阻塞時,舊隊列肯定被新隊列所替換,newQueue != submitQueue一定為真,就會不調(diào)用park進(jìn)行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時,該線程不在隊列中也是沒問題的。
再看E處,當(dāng)進(jìn)入一個新的cycle時(當(dāng)前隊列與提交的隊列不同),a)如果沒指定版本,或者到達(dá)或超過了指定版本,則返回true;b)如果當(dāng)前調(diào)用了cancel,則當(dāng)前隊列的isCancelQueue將為true,則不繼續(xù)傻等,返回false;c)或者還未到達(dá)指定版本,break,插入到當(dāng)前隊列中,繼續(xù)等待指定版本的到達(dá)。
如果沒有進(jìn)入E處的IF內(nèi),則當(dāng)前線程會被阻塞,直到超時,然后返回false;或被中斷,然后拋出InterruptedException;或被解除阻塞,重新進(jìn)行E處的判定。
這里還有個小細(xì)節(jié),既然cancel時,把當(dāng)前的隊列設(shè)置了isCancelQueue,那么之后指定版本的await會不會也直接返回了呢?其實不會的,因為它若要執(zhí)行F處的判斷,則先必需通過E處的判定,這意味著,當(dāng)前隊列已經(jīng)不是提交時的那個設(shè)置了isCancelQueue的隊列了。
代碼中對于cancel的處理,其實并不保證cancel后,之前的await都會被解除阻塞并返回,如果cancel后,緊接著又調(diào)用了nextCycle,那么可能某線程感知不到cancel的調(diào)用,喚醒后又繼續(xù)等待指定的版本。cancel的目的是在于不讓線程傻等,既然恢復(fù)版本更新了,那就繼續(xù)等待吧。
如果自己維護(hù)版本號,則應(yīng)該保證遞增。另外,版本號的設(shè)計,考慮到了int溢出的情況,版本的前后判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號就相當(dāng)于循環(huán)使用了,只要兩個比較的版本號的差不超過int的最大值,那么都是正確的,int的最大值可是20多億,幾乎不可能出現(xiàn)跨度這么大的兩個版本號的比較,所以,認(rèn)為它是正確的。
小結(jié)本文講到了一個非阻塞同步算法設(shè)計時的小技巧,如果多個變量之間要維護(hù)某種特定關(guān)系,那么可以將它們封裝到一個類中,再用CAS更新這個類的引用,這樣就達(dá)到了:要么都被更新,要么都沒被更新,保持了多個變量之間的一致性。同時需要注意的是,每次更新都必需創(chuàng)建新的包裝對象,假如有其它更好的辦法,應(yīng)該避免使用該方法。
via ifeve.com
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/64034.html
摘要:黑色的線表示,可在任意狀態(tài)下發(fā)起主動取消,進(jìn)入該狀態(tài)。所以當(dāng)線程阻塞時,可能處于停止?fàn)顟B(tài)或者主動取消狀態(tài)。非阻塞同步相對于鎖同步而言,由代碼塊,轉(zhuǎn)為了點,是另一種思考方式。 前言 閱讀本文前,需要讀者對happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運用,和一些解決問題的小技巧,分析問題的方式。 背景介紹 原始需...
摘要:前言學(xué)習(xí)情況記錄時間子目標(biāo)多線程記錄在學(xué)習(xí)線程安全知識點中,關(guān)于的有關(guān)知識點。對于資源競爭嚴(yán)重線程沖突嚴(yán)重的情況,自旋的概率會比較大,從而浪費更多的資源,效率低于。 前言 學(xué)習(xí)情況記錄 時間:week 1 SMART子目標(biāo) :Java 多線程 記錄在學(xué)習(xí)線程安全知識點中,關(guān)于CAS的有關(guān)知識點。 線程安全是指:多個線程不管以何種方式訪問某個類,并且在主調(diào)代碼中不需要進(jìn)行同步,都能表...
摘要:注意這里指的不是當(dāng)次而是之后,所以如果我們使用隊列的方法返回,就知道隊列是否為空,但是不知道之后是否為空,并且,當(dāng)關(guān)注的操作發(fā)生時,在插入或取出操作的返回值里告知此信息,來指導(dǎo)是否繼續(xù)注冊寫操作。 前言 本文寫給對ConcurrentLinkedQueue的實現(xiàn)和非阻塞同步算法的實現(xiàn)原理有一定了解,但缺少實踐經(jīng)驗的朋友,文中包括了實戰(zhàn)中的嘗試、所走的彎路,經(jīng)驗和教訓(xùn)。 背景介紹 ...
摘要:如問到是否使用某框架,實際是是問該框架的使用場景,有什么特點,和同類可框架對比一系列的問題。這兩個方向的區(qū)分點在于工作方向的側(cè)重點不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個核心必考點完全解析(完) 課程預(yù)習(xí) 1.1 課程內(nèi)容分為三個模塊 基礎(chǔ)模塊: 技術(shù)崗位與面試 計算機(jī)基礎(chǔ) JVM原理 多線程 設(shè)計模式 數(shù)據(jù)結(jié)構(gòu)與算法 應(yīng)用模塊: 常用工具集 ...
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
閱讀 1936·2023-04-26 01:56
閱讀 3119·2021-11-18 10:02
閱讀 3066·2021-09-09 11:35
閱讀 1304·2021-09-03 10:28
閱讀 3426·2019-08-29 18:36
閱讀 2855·2019-08-29 17:14
閱讀 838·2019-08-29 16:10
閱讀 1622·2019-08-26 13:45