摘要:當生產者線程調用方法時,如果沒有消費者等待接收元素,則會立即返回。方法方法,用于將指定元素傳遞給消費者線程調用方法。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、LinkedTransferQueue簡介
LinkedTransferQueue是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞隊列,它除了具備阻塞隊列的常用功能外,還有一個比較特殊的transfer方法。
我們知道,在普通阻塞隊列中,當隊列為空時,消費者線程(調用take或poll方法的線程)一般會阻塞等待生產者線程往隊列中存入元素。而LinkedTransferQueue的transfer方法則比較特殊:
當有消費者線程阻塞等待時,調用transfer方法的生產者線程不會將元素存入隊列,而是直接將元素傳遞給消費者;
如果調用transfer方法的生產者線程發現沒有正在等待的消費者線程,則會將元素入隊,然后會阻塞等待,直到有一個消費者線程來獲取該元素。
TransferQueue接口可以看到,LinkedTransferQueue實現了一個名為TransferQueue的接口,TransferQueue也是JDK1.7時J.U.C包新增的接口,正是該接口提供了上述的transfer方法:
除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)、tryTransfer(E e, long timeout, TimeUnit unit)。
tryTransfer(E e)
當生產者線程調用tryTransfer方法時,如果沒有消費者等待接收元素,則會立即返回false。該方法和transfer方法的區別就是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法必須等到消費者消費后才返回。
tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,如果沒有消費者消費該元素,則等待指定的時間再返回;如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。
TransferQueue接口定義:
LinkedTransferQueue的特點簡要概括如下:
LinkedTransferQueue是一種無界阻塞隊列,底層基于單鏈表實現;
LinkedTransferQueue中的結點有兩種類型:數據結點、請求結點;
LinkedTransferQueue基于無鎖算法實現。
二、LinkedTransferQueue原理 內部結構LinkedTransferQueue提供了兩種構造器,也沒有參數設置隊列初始容量,所以是一種無界隊列:
/** * 隊列結點定義. */ static final class Node { final boolean isData; // true: 數據結點; false: 請求結點 volatile Object item; // 結點值 volatile Node next; // 后驅結點指針 volatile Thread waiter; // 等待線程 // 設置當前結點的后驅結點為val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 設置當前結點的值為val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 設置當前結點的后驅結點為自身 final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 設置當前結點的值為自身. * 設置當前結點的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判斷當前結點是否匹配成功. * Node.item == this || (Node.isData == true && Node.item == null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 判斷是否為未匹配的請求結點. * Node.isData == false && Node.item == null */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 當該結點(havaData)是未匹配結點, 且與當前的結點類型不同時, 返回true. */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試匹配數據結點. */ final boolean tryMatchData() { // assert isData; 當前結點必須為數據結點 Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); // 喚醒等待線程 return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
關于Node結點,有以下幾點需要特別注意:
Node結點有兩種類型:數據結點、請求結點,通過字段isData區分,只有不同類型的結點才能相互匹配;
Node結點的值保存在item字段,匹配前后值會發生變化;
Node結點的狀態變化如下表:
結點/狀態 | 數據結點 | 請求結點 |
---|---|---|
匹配前 | isData = true; item = 數據結點值 | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
從上表也可以看出,對于一個數據結點,當item == null表示匹配成功;對于一個請求結點,當item == this表示匹配成功。歸納起來,匹配成功的結點Node就是滿足(Node.item == this) || ((Node.item == null) == Node.isData)。
LinkedTransferQueue內部的其余字段定義如下,主要就是通過Unsafe類操作字段值,內部定義了很多常量字段,比如自旋,這些都是為了非阻塞算法的鎖優化而定義的:
public class LinkedTransferQueueextends AbstractQueue implements TransferQueue , java.io.Serializable { /** * True如果是多核CPU */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 線程自旋次數(僅多核CPU時用到). */ private static final int FRONT_SPINS = 1 << 7; /** * 線程自旋次數(僅多核CPU時用到). */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ static final int SWEEP_THRESHOLD = 32; /** * 隊首結點指針. */ transient volatile Node head; /** * 隊尾結點指針. */ private transient volatile Node tail; /** * The number of apparent failures to unsplice removed nodes */ private transient volatile int sweepVotes; // CAS設置隊尾tail指針為val private boolean casTail(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } // CAS設置隊首head指針為val private boolean casHead(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } private boolean casSweepVotes(int cmp, int val) { return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); } /* * xfer方法的入參, 不同類型的方法內部調用xfer方法時入參不同. */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long sweepVotesOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = LinkedTransferQueue.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes")); } catch (Exception e) { throw new Error(e); } } //... }
上述比較重要的就是4個常量值的定義:
/* * xfer方法的入參, 不同類型的方法內部調用xfer方法時入參不同. */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
這四個常量值,作為xfer方法的入參,用于標識不同操作類型。其實從常量的命名也可以看出它們對應的操作含義:
NOW表示即時操作(可能失敗),即不會阻塞調用線程:
poll(獲取并移除隊首元素,如果隊列為空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,如果沒有等待的消費者,則立即返回false,也不會將元素入隊)
ASYNC表示異步操作(必然成功):
offer(插入指定元素至隊尾,由于是無界隊列,所以會立即返回true);put(插入指定元素至隊尾,由于是無界隊列,所以會立即返回);add(插入指定元素至隊尾,由于是無界隊列,所以會立即返回true)
SYNC表示同步操作(阻塞調用線程):
transfer(阻塞直到出現一個消費者線程);take(從隊首移除一個元素,如果隊列為空,則阻塞線程)
TIMED表示限時同步操作(限時阻塞調用線程):
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
關于xfer方法,它是LinkedTransferQueued的核心內部方法,我們后面會詳細介紹。
transfer方法transfer方法,用于將指定元素e傳遞給消費者線程(調用take/poll方法)。如果有消費者線程正在阻塞等待,則調用transfer方法的線程會直接將元素傳遞給它;如果沒有消費者線程等待獲取元素,則調用transfer方法的線程會將元素插入到隊尾,然后阻塞等待,直到出現一個消費者線程獲取元素:
/** * 將指定元素e傳遞給消費者線程(調用take/poll方法). */ public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { // 進入到此處, 說明調用線程被中斷了 Thread.interrupted(); // 清除中斷狀態, 然后拋出中斷異常 throw new InterruptedException(); } }
transfer方法的內部實際是調用了xfer方法,入參為SYNC=2:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 如果結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創建一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失?。? if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
我們通過示例看下xfer方法到底做了哪些事:
①隊列初始狀態
②ThreadA線程調用transfer入隊元素“9”
注意,此時入隊一個數據結點,且隊列為空,所以會直接進入xfer中的下述代碼:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創建一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
上述代碼會插入一個結點至隊尾,然后線程進入阻塞,等待一個出隊線程(消費者)的到來。
隊尾插入結點的方法是tryAppend,由于此時隊列為空,會進入CASE1分支,設置隊首指針head指向新結點,tryAppend方法的返回值有三種情況:
入隊失敗,返回null;
入隊成功且隊列只有一個結點,返回該結點自身;
入隊成功且隊列不止一個結點,返回該入隊結點的前驅結點。
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能鏈接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
等待出隊線程方法awaitMatch,該方法核心作用就是進行結點匹配:
匹配成功,返回匹配值;
匹配失敗(中斷或限時等待的超時情況),返回原匹配結點的值;
阻塞線程,等待與之匹配的結點的到來。
從awaitMatch方法其實可以看到一種經典的“鎖優化”思路,就是 自旋 -> yield -> 阻塞,線程不會立即進入阻塞,因為線程上下文切換的開銷往往比較大,所以會先自旋一定次數,中途可能伴隨隨機的yield操作,讓出cpu時間片,如果自旋次數用完后,還是沒有匹配線程出現,再真正阻塞線程。
經過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的隊列結構如下:
注意,此時的隊列中tail隊尾指針并不指向結點“9”,這是一種“松弛”策略,后面會講到。
③ThreadB線程調用transfer入隊元素“2”
由于此時隊首head指針不為null,所以會進入transfer方法中的以下循環:
for (Node h = head, p = h; p != null; ) { boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 如果結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // match for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
上述方法會讀取隊首結點,判斷該結點有沒被匹配過(item != p && (item != null) == isData):
如果已經被其它線程匹配過了,則繼續判斷下一個結點(p.next);
如果還沒有被匹配,則判斷下當前的入隊結點類型是否和隊首中的一致;如果一致(isData == haveData)就匹配失敗,跳出循環,否則進行匹配操作。
顯然,目前隊首結點是“數據結點”,ThreadB線程的入隊結點也是“數據結點”,結點類型一致,所以匹配失敗,直接跳過循環,也進入以下代碼塊:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創建一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
再次調用tryAppend方法, 會在CASE4分支中將元素“2”插入隊尾,然后在CASE5分支中重新設置隊尾指針tail:
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能鏈接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
此時隊列結構如下:
最終,ThreadB也會在awaitMatch方法中進入阻塞,最終隊列結構如下:
④ThreadC線程調用transfer入隊元素“93”
過程和前幾步幾乎相同,不再贅述,最終隊列結構如下:
可以看到,隊尾指針tail的設置實際是滯后的,這是一種“松弛”策略,用以提升無鎖算法并發修改過程中的性能。
take方法再來看下消費者線程調用的take方法,該方法會從隊首取出一個元素,如果隊列為空,則線程會阻塞:
/** * 從隊首出隊一個元素. */ public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一個請求結點 if (e != null) // 如果e!=null, 則表示匹配成功, 此時e為與之匹配的數據結點的值 return e; Thread.interrupted(); throw new InterruptedException(); }
內部依然調用了xfer方法,不過此時入參有所不同,由于是消費線程調用,所以入參e == null && hasData == false,表示一個“請求結點”:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 如果結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創建一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
還是通過示例看:
①隊列初始狀態
②ThreadD調用take方法,消費元素
此時,在xfer方法中,會從隊首開始,向后找到第一個匹配結點,并交換元素值,然后喚醒隊列中匹配結點上的等待線程:
/** * 入隊/出隊元素的真正實現. * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 如果結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創建一個入隊結點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(隊列中只有一個結點)或null(tryAppend失?。? if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
最終隊列結構如下,匹配結點的值被置換為null,ThreadA被喚醒,ThreadD拿到匹配結點上的元素值“9”并返回:
③ThreadA被喚醒后繼續執行
ThreadA被喚醒后,從原阻塞處——繼續向下執行,然后進入下一次自旋,進入CASE1分支:
/** * 自旋/yield/阻塞,直到結點s被匹配. * * @param s 等待被匹配的結點s * @param pred s的前驅結點或s自身(隊列中只有一個結點的情況) * @param e 結點s的值 * @return 匹配值, 或e本身(中斷或超時情況) */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限時等待情況下使用 Thread w = Thread.currentThread(); int spins = -1; // 自旋次數, 鎖優化操作 ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; if (item != e) { // CASE1: 匹配成功 // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // CASE2: 取消(線程被中斷或超時) unsplice(pred, s); return e; } // CASE3: 設置輕量級鎖(自旋 -> yield) if (spins < 0) { // 初始化自旋次數 if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // 自選次數減1 --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // 隨機yield線程 } else if (s.waiter == null) { // waiter保存待阻塞線程 s.waiter = w; } else if (timed) { // 限時等待情況, 計算剩余有效時間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // CASE4: 阻塞線程 LockSupport.park(this); } } }
在CASE1分支中,由于結點的item項已經被替換成了null,所以調用s.forgetContents(),并返回null
/** * 設置當前結點的值為自身. * 設置當前結點的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); }
最終隊列結構如下:
④ThreadE調用take方法出隊元素
ThreadE調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由于head指針指向的元素已經匹配過了,所以
向后繼續查找,找到第一個未匹配過的結點“2”,然后置換結點“2”中的元素值為null,喚醒線程ThreadB,返回匹配結點的元素值“2”:
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點 boolean isData = p.isData; // 結點類型 Object item = p.item; // 結點值 if (item != p && (item != null) == isData) { // 如果結點還未匹配過 if (isData == haveData) // 同種類型結點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
此時隊列狀態如下,可以看到,隊首指針head一次性向后跳了2個位置,原來已經匹配過的元素的next指針指向自身,等待被GC回收,這其實就是LinkedTransferQueue的“松弛”策略:
⑤ThreadB被喚醒后繼續執行
過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然后返回匹配結點值——null,最終隊列結構如下:
⑥ThreadF調用take方法出隊元素
ThreadF調用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由于head指針指向的元素此時沒有匹配,所以不用像步驟②那樣向后查找,而是直接置換匹配結點的元素值“93”,然后喚醒ThreadC,返回匹配值“93”。最終隊列結構如下:
⑦ThreadC被喚醒后繼續執行
過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然后返回匹配結點值——null,最終隊列結構如下:
此時的隊列結構,讀者移一定感到非常奇怪,并不嚴格遵守隊列的定義,這其實就是“Dual Queue”算法的實現,為了對自旋優化,做了很多看似別扭的操作,不必奇怪。
假設此時再有一個線程ThreadH調用take方法出隊元素會怎么樣?其實這是隊列已經空了,ThreadH會被阻塞,但是會創建一個“請求結點”入隊:
/** * 嘗試將結點s添加到隊尾. * * @param s 待添加的結點 * @param haveData true: 數據結點 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能鏈接到結點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
調用完tryAppend方法后,隊列結構如下,橙色的為“請求結點”—— item==null && isData==false:
然后ThreadH也會進入在awaitMatch方法后進入阻塞,并等待一個入隊線程的到來。最終隊列結構如下:
三、總結截止本篇為止,我們已經學習完了juc-collection框架中的所有阻塞隊列,如下表所示:
隊列特性 | 有界隊列 | 近似無界隊列 | 無界隊列 | 特殊隊列 |
---|---|---|---|---|
有鎖算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無鎖算法 | / | / | LinkedTransferQueue | SynchronousQueue |
可以看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖算法的性能,并且是一種無界隊列:
和SynchronousQueue相比,LinkedTransferQueue可以存儲實際的數據;
和其它阻塞隊列相比,LinkedTransferQueue直接用無鎖算法實現,性能有所提升。
另外,由于LinkedTransferQueue可以存放兩種不同類型的結點,所以稱之為“Dual Queue”:
內部Node結點定義了一個 boolean 型字段——isData,表示該結點是“數據結點”還是“請求結點”。
為了節省 CAS 操作的開銷,LinkedTransferQueue使用了松弛(slack)操作:
在結點被匹配(被刪除)之后,不會立即更新隊列的head、tail,而是當 head、tail結點與最近一個未匹配的結點之間的距離超過“松弛閥值”后才會更新(默認為 2)。這個“松弛閥值”一般為1到3,如果太大會增加沿鏈表查找未匹配結點的時間,太小會增加 CAS 的開銷。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77196.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
閱讀 1444·2023-04-25 16:31
閱讀 2046·2021-11-24 10:33
閱讀 2751·2021-09-23 11:33
閱讀 2537·2021-09-23 11:31
閱讀 2915·2021-09-08 09:45
閱讀 2345·2021-09-06 15:02
閱讀 2652·2019-08-30 14:21
閱讀 2321·2019-08-30 12:56