摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內(nèi)部通過棧或隊列結(jié)構(gòu)保存阻塞線程。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、SynchronousQueue簡介
SynchronousQueue是JDK1.5時,隨著J.U.C包一起引入的一種阻塞隊列,它實現(xiàn)了BlockingQueue接口,底層基于棧和隊列實現(xiàn):
沒有看錯,SynchronousQueue的底層實現(xiàn)包含兩種數(shù)據(jù)結(jié)構(gòu)——棧和隊列。這是一種非常特殊的阻塞隊列,它的特點簡要概括如下:
入隊線程和出隊線程必須一一匹配,否則任意先到達的線程會阻塞。比如ThreadA進行入隊操作,在有其它線程執(zhí)行出隊操作之前,ThreadA會一直等待,反之亦然;
SynchronousQueue內(nèi)部不保存任何元素,也就是說它的容量為0,數(shù)據(jù)直接在配對的生產(chǎn)者和消費者線程之間傳遞,不會將數(shù)據(jù)緩沖到隊列中。
SynchronousQueue支持公平/非公平策略。其中非公平模式,基于內(nèi)部數(shù)據(jù)結(jié)構(gòu)——“棧”來實現(xiàn),公平模式,基于內(nèi)部數(shù)據(jù)結(jié)構(gòu)——“隊列”來實現(xiàn);
SynchronousQueue基于一種名為“Dual stack and Dual queue”的無鎖算法實現(xiàn)。
注意:上述的特點1,和我們之前介紹的Exchanger其實非常相似,可以類比Exchanger的功能來理解。二、SynchronousQueue原理 構(gòu)造
之前提到,SynchronousQueue根據(jù)公平/非公平訪問策略的不同,內(nèi)部使用了兩種不同的數(shù)據(jù)結(jié)構(gòu):棧和隊列。我們先來看下對象的構(gòu)造,SynchronousQueue只有2種構(gòu)造器:
/** * 默認(rèn)構(gòu)造器. * 默認(rèn)使用非公平策略. */ public SynchronousQueue() { this(false); }
/** * 指定策略的構(gòu)造器. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); }
可以看到,對于公平策略,內(nèi)部構(gòu)造了一個TransferQueue對象,而非公平策略則是構(gòu)造了TransferStack對象。這兩個類都繼承了內(nèi)部類Transferer,SynchronousQueue中的所有方法,其實都是委托調(diào)用了TransferQueue/TransferStack的方法:
public class SynchronousQueue棧結(jié)構(gòu)extends AbstractQueue implements BlockingQueue , java.io.Serializable { ? /** * tranferer對象, 構(gòu)造時根據(jù)策略類型確定. */ private transient volatile Transferer transferer; ? /** * Shared internal API for dual stacks and queues. */ abstract static class Transferer { /** * Performs a put or take. * * @param e 非null表示 生產(chǎn)者 -> 消費者; * null表示, 消費者 -> 生產(chǎn)者. * @return 非null表示傳遞的數(shù)據(jù); null表示傳遞失敗(超時或中斷). */ abstract E transfer(E e, boolean timed, long nanos); } ? /** * Dual stack(雙棧結(jié)構(gòu)). * 非公平策略時使用. */ static final class TransferStack extends Transferer { // ... } ? /** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueue extends Transferer { // ... } ? // ... }
非公平策略由TransferStack類實現(xiàn),既然TransferStack是棧,那就有結(jié)點。TransferStack內(nèi)部定義了名為SNode的結(jié)點:
static final class SNode { volatile SNode next; volatile SNode match; // 與當(dāng)前結(jié)點配對的結(jié)點 volatile Thread waiter; // 當(dāng)前結(jié)點對應(yīng)的線程 Object item; // 實際數(shù)據(jù)或null int mode; // 結(jié)點類型 ? SNode(Object item) { this.item = item; } ?? // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } // ... }
上述SNode結(jié)點的定義中有個mode字段,表示結(jié)點的類型。TransferStack一共定義了三種結(jié)點類型,任何線程對TransferStack的操作都會創(chuàng)建下述三種類型的某種結(jié)點:
REQUEST:表示未配對的消費者(當(dāng)線程進行出隊操作時,會創(chuàng)建一個mode值為REQUEST的SNode結(jié)點 )
DATA:表示未配對的生產(chǎn)者(當(dāng)線程進行入隊操作時,會創(chuàng)建一個mode值為DATA的SNode結(jié)點 )
FULFILLING:表示配對成功的消費者/生產(chǎn)者
static final class TransferStack核心操作——put/takeextends Transferer { ? /** * 未配對的消費者 */ static final int REQUEST = 0; /** * 未配對的生產(chǎn)者 */ static final int DATA = 1; /** * 配對成功的消費者/生產(chǎn)者 */ static final int FULFILLING = 2; ? volatile SNode head; ? // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } ? // ... }
SynchronousQueue的入隊操作調(diào)用了put方法:
/** * 入隊指定元素e. * 如果沒有另一個線程進行出隊操作, 則阻塞該入隊線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
SynchronousQueue的出隊操作調(diào)用了take方法:
/** * 出隊一個元素. * 如果沒有另一個線程進行出隊操作, 則阻塞該入隊線程. */ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
可以看到,SynchronousQueue一樣不支持null元素,實際的入隊/出隊操作都是委托給了transfer方法,該方法返回null表示出/入隊失敗(通常是線程被中斷或超時):
/** * 入隊/出隊一個元素. */ E transfer(E e, boolean timed, long nanos) { SNode s = null; // s表示新創(chuàng)建的結(jié)點 // 入?yún)==null, 說明當(dāng)前是出隊線程(消費者), 否則是入隊線程(生產(chǎn)者) // 入隊線程創(chuàng)建一個DATA結(jié)點, 出隊線程創(chuàng)建一個REQUEST結(jié)點 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { // 自旋 SNode h = head; if (h == null || h.mode == mode) { // CASE1: 棧為空 或 棧頂結(jié)點類型與當(dāng)前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的情況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當(dāng)前結(jié)點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當(dāng)前調(diào)用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結(jié)點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結(jié)點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執(zhí)行到此處說明入棧失敗(多個線程同時入棧導(dǎo)致CAS操作head失敗),則進入下一次自旋繼續(xù)執(zhí)行 } else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結(jié)點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消情況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當(dāng)前結(jié)點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結(jié)點(也就是與當(dāng)前結(jié)點匹配的結(jié)點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環(huán), 重新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結(jié)點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結(jié)點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結(jié)點 } } } else { // CASE3: 其它線程正在匹配 SNode m = h.next; if (m == null) // 棧頂?shù)膎ext==null, 則直接彈出, 重新進入下一次自旋 casHead(h, null); else { // 嘗試和其它線程競爭匹配 SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); // 匹配成功 else h.casNext(m, mn); // 匹配失敗(被其它線程搶先匹配成功了) } } } }
整個transfer方法考慮了限時等待的情況,且入隊/出隊其實都是調(diào)用了同一個方法,其主干邏輯就是在一個自旋中完成以下三種情況之一的操作,直到成功,或者被中斷或超時取消:
棧為空,或棧頂結(jié)點類型與當(dāng)前入隊結(jié)點相同。這種情況,調(diào)用線程會阻塞;
棧頂結(jié)點還未配對成功,且與當(dāng)前入隊結(jié)點可以配對。這種情況,直接進行配對操作;
棧頂結(jié)點正在配對中。這種情況,直接進行下一個結(jié)點的配對。
出/入隊示例講解為了便于理解,我們來看下面這個調(diào)用示例(假設(shè)不考慮限時等待的情況),假設(shè)一共有三個線程ThreadA、ThreadB、ThreadC:
①初始棧結(jié)構(gòu)
初始棧為空,head為棧頂指針,始終指向棧頂結(jié)點:
②ThreadA(生產(chǎn)者)執(zhí)行入隊操作
由于此時棧為空,所以ThreadA會進入CASE1,創(chuàng)建一個類型為DATA的結(jié)點:
if (h == null || h.mode == mode) { // CASE1: 棧為空 或 棧頂結(jié)點類型與當(dāng)前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的情況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當(dāng)前結(jié)點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當(dāng)前調(diào)用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結(jié)點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結(jié)點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執(zhí)行到此處說明入棧失敗(多個線程同時入棧導(dǎo)致CAS操作head失敗),則進入下一次自旋繼續(xù)執(zhí)行 }
CASE1分支中,將結(jié)點壓入棧后,會調(diào)用awaitFulfill方法,該方法會阻塞調(diào)用線程:
/** * 阻塞當(dāng)前調(diào)用線程, 并將線程信息記錄在s.waiter字段上. * * @param s 等待的結(jié)點 * @return 返回配對的結(jié)點 或 當(dāng)前結(jié)點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優(yōu)化操作(計算自旋次數(shù)) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當(dāng)前結(jié)點的匹配結(jié)點. * s.match==null說明還沒有匹配結(jié)點 * s.match==s說明當(dāng)前結(jié)點s對應(yīng)的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 還沒有匹配結(jié)點, 則保存當(dāng)前線程 s.waiter = w; // s.waiter保存當(dāng)前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當(dāng)前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
此時棧結(jié)構(gòu)如下,結(jié)點的waiter字段保存著創(chuàng)建該結(jié)點的線程ThreadA,ThreadA等待著被配對消費者線程喚醒:
③ThreadB(生產(chǎn)者)執(zhí)行入隊操作
此時棧頂結(jié)點的類型和ThreadB創(chuàng)建的結(jié)點相同(都是DATA類型的結(jié)點),所以依然走CASE1分支,直接將結(jié)點壓入棧:
④ThreadC(消費者)執(zhí)行出隊操作
此時棧頂結(jié)點的類型和ThreadC創(chuàng)建的結(jié)點匹配(棧頂DATA類型,ThreadC創(chuàng)建的是REQUEST類型),所以走CASE2分支,該分支會將匹配的兩個結(jié)點彈出棧:
else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結(jié)點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消情況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當(dāng)前結(jié)點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結(jié)點(也就是與當(dāng)前結(jié)點匹配的結(jié)點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環(huán), 重新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結(jié)點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結(jié)點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結(jié)點 } } }
上述isFulfilling方法就是判斷結(jié)點是否匹配:
/** * 判斷m是否已經(jīng)配對成功. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
ThreadC創(chuàng)建結(jié)點并壓入棧后,棧的結(jié)構(gòu)如下:
此時,ThreadC會調(diào)用tryMatch方法進行匹配,該方法的主要作用有兩點:
將待結(jié)點的match字段置為與當(dāng)前配對的結(jié)點(如上圖中,結(jié)點m是待配對結(jié)點,最終m.math == s)
喚醒待配對結(jié)點中的線程(如上圖中,喚醒結(jié)點m中ThreadB線程)
/** * 嘗試將當(dāng)前結(jié)點和s結(jié)點配對. */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // 喚醒當(dāng)前結(jié)點對應(yīng)的線程 waiter = null; LockSupport.unpark(w); } return true; } return match == s; // 配對成功返回true }
匹配完成后,會將匹配的兩個結(jié)點彈出棧,并返回匹配值:
if (m.tryMatch(s)) { // 進行結(jié)點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結(jié)點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 }
最終,ThreadC拿到了等待配對結(jié)點中的數(shù)據(jù)并返回,此時棧的結(jié)構(gòu)如下:
注意: CASE2分支中ThreadC創(chuàng)建的結(jié)點的mode值并不是REQUEST,其mode值為FULFILLING | mode,FULFILLING | mode的主要作用就是給棧頂結(jié)點置一個標(biāo)識(二進制為11或10),表示當(dāng)前有線程正在對棧頂匹配,這時如果有其它線程進入自旋(并發(fā)情況),則CASE2一定失敗,因為isFulfilling的結(jié)果必然為true,所以會進入CASE3分支——跳過棧頂結(jié)點進行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))
⑤ThreadB(生產(chǎn)者)喚醒后繼續(xù)執(zhí)行
ThreadB被喚醒后,會從原阻塞處繼續(xù)執(zhí)行,并進入下一次自旋,在下一次自旋中,由于結(jié)點的match字段已經(jīng)有了匹配結(jié)點,所以直接返回配對結(jié)點:
/** * 阻塞當(dāng)前調(diào)用線程, 并將線程信息記錄在s.waiter字段上. * * @param s 等待的結(jié)點 * @return 返回配對的結(jié)點 或 當(dāng)前結(jié)點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優(yōu)化操作(計算自旋次數(shù)) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當(dāng)前結(jié)點的匹配結(jié)點. * s.match==null說明還沒有匹配結(jié)點 * s.match==s說明當(dāng)前結(jié)點s對應(yīng)的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 還沒有匹配結(jié)點, 則保存當(dāng)前線程 s.waiter = w; // s.waiter保存當(dāng)前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當(dāng)前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
最終,在下面分支中返回:
else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當(dāng)前結(jié)點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當(dāng)前調(diào)用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結(jié)點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結(jié)點的值 return (E) ((mode == REQUEST) ? m.item : s.item); }
注意:對于入隊線程(生產(chǎn)者),返回的是它入隊時攜帶的原有元素值。隊列結(jié)構(gòu)
SynchronousQueue的公平策略由TransferQueue類實現(xiàn),TransferQueue內(nèi)部定義了名為QNode的結(jié)點,一個head隊首指針,一個tail隊尾指針:
/** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueueextends Transferer { /** * Head of queue */ transient volatile QNode head; /** * Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe; /** * 隊列結(jié)點定義. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS"ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; // ... } // ... }
關(guān)于TransferQueue的transfer方法就不再贅述了,其思路和TransferStack大致相同,總之就是入隊/出隊必須一一匹配,否則任意一方就會加入隊列并等待匹配線程喚醒。讀者可以自行閱讀TransferQueued的源碼。三、總結(jié)
TransferQueue主要用于線程之間的數(shù)據(jù)交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內(nèi)部通過棧或隊列結(jié)構(gòu)保存阻塞線程。后面我們講JUC線程池框架的時候,還會再次看到它的身影。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77078.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復(fù)雜度均為。事實上,內(nèi)部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 3687·2021-09-22 15:28
閱讀 1303·2021-09-03 10:35
閱讀 885·2021-09-02 15:21
閱讀 3487·2019-08-30 15:53
閱讀 3501·2019-08-29 17:25
閱讀 577·2019-08-29 13:22
閱讀 1563·2019-08-28 18:15
閱讀 2294·2019-08-26 13:57