摘要:如果節(jié)點(diǎn)不為說(shuō)明已經(jīng)有其他線程進(jìn)行操作將節(jié)點(diǎn)替換為節(jié)點(diǎn)等待有消費(fèi)者消費(fèi)線程。如果頭節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是當(dāng)前節(jié)點(diǎn)以防止其他線程已經(jīng)修改了節(jié)點(diǎn)則運(yùn)算,否則直接返回。
一、介紹
SynchronousQueue是一個(gè)雙棧雙隊(duì)列算法,無(wú)空間的隊(duì)列或棧,任何一個(gè)對(duì)SynchronousQueue寫(xiě)需要等到一個(gè)對(duì)SynchronousQueue的讀操作,反之亦然。一個(gè)讀操作需要等待一個(gè)寫(xiě)操作,相當(dāng)于是交換通道,提供者和消費(fèi)者是需要組隊(duì)完成工作,缺少一個(gè)將會(huì)阻塞線程,知道等到配對(duì)為止。
SynchronousQueue是一個(gè)隊(duì)列和棧算法實(shí)現(xiàn),在SynchronousQueue中雙隊(duì)列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。
對(duì)于SynchronousQueue來(lái)說(shuō),他的put方法和take方法都被抽象成統(tǒng)一方法來(lái)進(jìn)行操作,通過(guò)抽象出內(nèi)部類(lèi)Transferer,來(lái)實(shí)現(xiàn)不同的操作。
注意事項(xiàng):本文分析主要是針對(duì)jdk1.8的版本進(jìn)行分析,下面的代碼中的線程執(zhí)行順序可能并不能完全保證順序性,執(zhí)行時(shí)間比較短,所以暫且認(rèn)定有序執(zhí)行。約定:圖片中以Reference-開(kāi)頭的代表對(duì)象的引用地址,通過(guò)箭頭方式進(jìn)行引用對(duì)象。
Transferer.transfer方法主要介紹如下所示:
abstract static class Transferer{ /** * 執(zhí)行put和take方法. * * @param e 非空時(shí),表示這個(gè)元素要傳遞給消費(fèi)者(提供者-put); * 為空時(shí), 則表示當(dāng)前操作要請(qǐng)求消費(fèi)一個(gè)數(shù)據(jù)(消費(fèi)者-take)。 * offered by producer. * @param timed 決定是否存在timeout時(shí)間。 * @param nanos 超時(shí)時(shí)長(zhǎng)。 * @return 如果返回非空, 代表數(shù)據(jù)已經(jīng)被消費(fèi)或者正常提供; 如果為空, * 則表示由于超時(shí)或中斷導(dǎo)致失敗。可通過(guò)Thread.interrupted來(lái)檢查是那種。 */ abstract E transfer(E e, boolean timed, long nanos); }
接下來(lái)看一下SynchronousQueue的字段信息:
/** CPU數(shù)量 */ static final int NCPUS = Runtime.getRuntime().availableProcessors(); /** * 自旋次數(shù),如果transfer指定了timeout時(shí)間,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32 * 此值為經(jīng)驗(yàn)值,不隨CPU數(shù)量增加而變化,這里只是個(gè)常量。 */ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * 自旋次數(shù),如果沒(méi)有指定時(shí)間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16,否則為0; */ static final int maxUntimedSpins = maxTimedSpins * 16; /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ static final long spinForTimeoutThreshold = 1000L;
NCPUS:代表CPU的數(shù)量
maxTimedSpins:自旋次數(shù),如果transfer指定了timeout時(shí)間,則使用maxTimeSpins,如果CPU數(shù)量小于2則自旋次數(shù)為0,否則為32,此值為經(jīng)驗(yàn)值,不隨CPU數(shù)量增加而變化,這里只是個(gè)常量。
maxUntimedSpins:自旋次數(shù),如果沒(méi)有指定時(shí)間設(shè)置,則使用maxUntimedSpins。如果NCPUS數(shù)量大于等于2則設(shè)定為為32*16,否則為0;
spinForTimeoutThreshold:為了防止自定義的時(shí)間限過(guò)長(zhǎng),而設(shè)置的,如果設(shè)置的時(shí)間限長(zhǎng)于這個(gè)值則取這個(gè)spinForTimeoutThreshold 為時(shí)間限。這是為了優(yōu)化而考慮的。這個(gè)的單位為納秒。
公平模式-TransferQueueTransferQueue內(nèi)部是如何進(jìn)行工作的,這里先大致講解下,隊(duì)列采用了互補(bǔ)模式進(jìn)行等待,QNode中有一個(gè)字段是isData,如果模式相同或空隊(duì)列時(shí)進(jìn)行等待操作,互補(bǔ)的情況下就進(jìn)行消費(fèi)操作。
入隊(duì)操作相同模式
不同模式時(shí)進(jìn)行出隊(duì)列操作:
這時(shí)候來(lái)了一個(gè)isData=false的互補(bǔ)模式,隊(duì)列就會(huì)變成如下?tīng)顟B(tài):
TransferQueue繼承自Transferer抽象類(lèi),并且實(shí)現(xiàn)了transfer方法,它主要包含以下內(nèi)容:
代表隊(duì)列中的節(jié)點(diǎn)元素,它內(nèi)部包含以下字段信息:
字段信息描述
字段 | 描述 | 類(lèi)型 |
---|---|---|
next | 下一個(gè)節(jié)點(diǎn) | QNode |
item | 元素信息 | Object |
waiter | 當(dāng)前等待的線程 | Thread |
isData | 是否是數(shù)據(jù) | boolean |
方法信息描述
方法 | 描述 |
---|---|
casNext | 替換當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn) |
casItem | 替換當(dāng)前節(jié)點(diǎn)的item數(shù)據(jù) |
tryCancel | 取消當(dāng)前操作,將當(dāng)前item賦值為this(當(dāng)前QNode節(jié)點(diǎn)) |
isCancelled | 如果item是this(當(dāng)前QNode節(jié)點(diǎn))的話就返回true,反之返回false |
isOffList | 如果已知此節(jié)點(diǎn)離隊(duì)列,判斷next節(jié)點(diǎn)是不是為this,則返回true,因?yàn)橛捎? advanceHead操作而忘記了其下一個(gè)指針。 |
E transfer(E e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS"ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */ QNode s = null; // constructed/reused as needed // 分為兩種狀態(tài)1.有數(shù)據(jù)=true 2.無(wú)數(shù)據(jù)=false boolean isData = (e != null); // 循環(huán)內(nèi)容 for (;;) { // 尾部節(jié)點(diǎn)。 QNode t = tail; // 頭部節(jié)點(diǎn)。 QNode h = head; // 判斷頭部和尾部如果有一個(gè)為null則自旋轉(zhuǎn)。 if (t == null || h == null) // 還未進(jìn)行初始化的值。 continue; // 自旋 // 頭結(jié)點(diǎn)和尾節(jié)點(diǎn)相同或者尾節(jié)點(diǎn)的模式和當(dāng)前節(jié)點(diǎn)模式相同。 if (h == t || t.isData == isData) { // 空或同模式。 // tn為尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)信息。 QNode tn = t.next; // 這里我認(rèn)為是閱讀不一致,原因是當(dāng)前線程還沒(méi)有阻塞的時(shí)候其他線程已經(jīng)修改了尾節(jié)點(diǎn)tail會(huì)導(dǎo)致當(dāng)前線程的tail節(jié)點(diǎn)不一致。 if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // 這里如果指定timed判斷時(shí)間小于等于0直接返回。 return null; // 判斷新增節(jié)點(diǎn)是否為null,為null直接構(gòu)建新節(jié)點(diǎn)。 if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // 如果next節(jié)點(diǎn)不為null說(shuō)明已經(jīng)有其他線程進(jìn)行tail操作 continue; // 將t節(jié)點(diǎn)替換為s節(jié)點(diǎn) advanceTail(t, s); // 等待有消費(fèi)者消費(fèi)線程。 Object x = awaitFulfill(s, e, timed, nanos); // 如果返回的x,指的是s.item,如果s.item指向自己的話清除操作。 if (x == s) { clean(t, s); return null; } // 如果沒(méi)有取消聯(lián)系 if (!s.isOffList()) { // 將當(dāng)前節(jié)點(diǎn)替換頭結(jié)點(diǎn) advanceHead(t, s); // unlink if head if (x != null) // 取消item值,這里是take方法時(shí)會(huì)進(jìn)行item賦值為this s.item = s; // 將等待線程設(shè)置為null s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode // 獲取頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn) QNode m = h.next; // node to fulfill // 如果當(dāng)前線程尾節(jié)點(diǎn)和全局尾節(jié)點(diǎn)不一致,重新開(kāi)始 // 頭結(jié)點(diǎn)的next節(jié)點(diǎn)為空,代表無(wú)下一個(gè)節(jié)點(diǎn),則重新開(kāi)始, // 當(dāng)前線程頭結(jié)點(diǎn)和全局頭結(jié)點(diǎn)不相等,則重新開(kāi)始 if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // 如果x=null說(shuō)明已經(jīng)被讀取了。 x == m || // x節(jié)點(diǎn)和m節(jié)點(diǎn)相等說(shuō)明被中斷操作,被取消操作了。 !m.casItem(x, e)) { // 這里是將item值設(shè)置為null advanceHead(h, m); // 移動(dòng)頭結(jié)點(diǎn)到頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn) continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
我們來(lái)看一下awaitFulfill方法內(nèi)容:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // 如果指定了timed則為System.nanoTime() + nanos,反之為0。 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 獲取當(dāng)前線程。 Thread w = Thread.currentThread(); // 如果頭節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是當(dāng)前s節(jié)點(diǎn)(以防止其他線程已經(jīng)修改了head節(jié)點(diǎn)) // 則運(yùn)算(timed ? maxTimedSpins : maxUntimedSpins),否則直接返回。 // 指定了timed則使用maxTimedSpins,反之使用maxUntimedSpins int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋 for (;;) { // 判斷是否已經(jīng)被中斷。 if (w.isInterrupted()) //嘗試取消,將當(dāng)前節(jié)點(diǎn)的item修改為當(dāng)前節(jié)點(diǎn)(this)。 s.tryCancel(e); // 獲取當(dāng)前節(jié)點(diǎn)內(nèi)容。 Object x = s.item; // 判斷當(dāng)前值和節(jié)點(diǎn)值不相同是返回,因?yàn)閺棾鰰r(shí)會(huì)將item值賦值為null。 if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
首先先判斷有沒(méi)有被中斷,如果被中斷則取消本次操作,將當(dāng)前節(jié)點(diǎn)的item內(nèi)容賦值為當(dāng)前節(jié)點(diǎn)。
判斷當(dāng)前節(jié)點(diǎn)和節(jié)點(diǎn)值不相同是返回
將當(dāng)前線程賦值給當(dāng)前節(jié)點(diǎn)
自旋,如果指定了timed則使用LockSupport.parkNanos(this, nanos);,如果沒(méi)有指定則使用LockSupport.park(this);。
中斷相應(yīng)是在下次才能被執(zhí)行。
通過(guò)上面源碼分析我們這里做出簡(jiǎn)單的示例代碼演示一下put操作和take操作是如何進(jìn)行運(yùn)作的,首先看一下示例代碼,如下所示:
/** * SynchronousQueue進(jìn)行put和take操作。 * * @author battleheart */ public class SynchronousQueueDemo { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); SynchronousQueuequeue = new SynchronousQueue<>(true); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); Thread.sleep(2000); Thread thread2 = new Thread(() -> { try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); Thread.sleep(10000); Thread thread3 = new Thread(() -> { try { System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }); thread3.start(); } }
首先上來(lái)之后進(jìn)行的是兩次put操作,然后再take操作,默認(rèn)隊(duì)列上來(lái)會(huì)進(jìn)行初始化,初始化的內(nèi)容如下代碼所示:
TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
初始化后隊(duì)列的狀態(tài)如下圖所示:
當(dāng)線程1執(zhí)行put操作時(shí),來(lái)分析下代碼:
QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue;
首先執(zhí)行局部變量t代表隊(duì)尾指針,h代表隊(duì)頭指針,判斷隊(duì)頭和隊(duì)尾不為空則進(jìn)行下面的操作,接下來(lái)是if…else語(yǔ)句這里是分水嶺,當(dāng)相同模式操作的時(shí)候執(zhí)行if語(yǔ)句,當(dāng)進(jìn)行不同模式操作時(shí)執(zhí)行的是else語(yǔ)句,程序是如何控制這樣的操作的呢?接下來(lái)我們慢慢分析一下:
if (h == t || t.isData == isData) { // 隊(duì)列為空或者模式相同時(shí)進(jìn)行if語(yǔ)句 QNode tn = t.next; if (t != tail) // 判斷t是否是隊(duì)尾,不是則重新循環(huán)。 continue; if (tn != null) { // tn是隊(duì)尾的下個(gè)節(jié)點(diǎn),如果tn有內(nèi)容則將隊(duì)尾更換為tn,并且重新循環(huán)操作。 advanceTail(t, tn); continue; } if (timed && nanos <= 0) // 如果指定了timed并且延時(shí)時(shí)間用盡則直接返回空,這里操作主要是offer操作時(shí),因?yàn)殛?duì)列無(wú)存儲(chǔ)空間的當(dāng)offer時(shí)不允許插入。 return null; if (s == null) // 這里是新節(jié)點(diǎn)生成。 s = new QNode(e, isData); if (!t.casNext(null, s)) // 將尾節(jié)點(diǎn)的next節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn)。 continue; advanceTail(t, s); // 隊(duì)尾移動(dòng) Object x = awaitFulfill(s, e, timed, nanos); //自旋并且設(shè)置線程。 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; }
上面代碼是if語(yǔ)句中的內(nèi)容,進(jìn)入到if語(yǔ)句中的判斷是如果頭結(jié)點(diǎn)和尾節(jié)點(diǎn)相等代表隊(duì)列為空,并沒(méi)有元素所有要進(jìn)行插入隊(duì)列的操作,或者是隊(duì)尾的節(jié)點(diǎn)的isData標(biāo)志和當(dāng)前操作的節(jié)點(diǎn)的類(lèi)型一樣時(shí),會(huì)進(jìn)行入隊(duì)操作,isData標(biāo)識(shí)當(dāng)前元素是否是數(shù)據(jù),如果為true代表是數(shù)據(jù),如果為false則代表不是數(shù)據(jù),換句話說(shuō)只有模式相同的時(shí)候才會(huì)往隊(duì)列中存放,如果不是模式相同的時(shí)候則代表互補(bǔ)模式,就不走if語(yǔ)句了,而是走了else語(yǔ)句,上面代碼中做有注釋講解,下面看一下這里:
if (s == null) // 這里是新節(jié)點(diǎn)生成。 s = new QNode(e, isData); if (!t.casNext(null, s)) // 將尾節(jié)點(diǎn)的next節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn)。 continue;
當(dāng)執(zhí)行上面代碼后,隊(duì)列的情況如下圖所示:(這里視為插入第一個(gè)元素圖,方便下面的引用)
接下來(lái)執(zhí)行這段代碼:
advanceTail(t, s); // 隊(duì)尾移動(dòng)
修改了tail節(jié)點(diǎn)后,這時(shí)候就需要進(jìn)行自旋操作,并且設(shè)置QNode的waiter等待線程,并且將線程等待,等到喚醒線程進(jìn)行喚醒操作
Object x = awaitFulfill(s, e, timed, nanos); //自旋并且設(shè)置線程。
方法內(nèi)部分析局部?jī)?nèi)容,上面已經(jīng)全部?jī)?nèi)容的分析:
if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos);
如果自旋時(shí)間spins還有則進(jìn)行循環(huán)遞減操作,接下來(lái)判斷如果當(dāng)前節(jié)點(diǎn)的waiter是空則價(jià)格當(dāng)前線程賦值給waiter,上圖中顯然是為空的所以會(huì)把當(dāng)前線程進(jìn)行賦值給我waiter,接下來(lái)就是等待操作了。
上面線程則處于等待狀態(tài),接下來(lái)是線程二進(jìn)行操作,這里不進(jìn)行重復(fù)進(jìn)行,插入第二個(gè)元素隊(duì)列的狀況,此時(shí)線程二也處于等待狀態(tài)。
上面的主要是put了兩次操作后隊(duì)列的情況,接下來(lái)分析一下take操作時(shí)又是如何進(jìn)行操作的,當(dāng)take操作時(shí),isData為false,而隊(duì)尾的isData為true兩個(gè)不相等,所以不會(huì)進(jìn)入到if語(yǔ)句,而是進(jìn)入到了else語(yǔ)句
} else { // 互補(bǔ)模式 QNode m = h.next; // 獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),進(jìn)行互補(bǔ)操作。 if (t != tail || m == null || h != head) continue; // 這里就是為了防止閱讀不一致的問(wèn)題 Object x = m.item; if (isData == (x != null) || // 如果x=null說(shuō)明已經(jīng)被讀取了。 x == m || // x節(jié)點(diǎn)和m節(jié)點(diǎn)相等說(shuō)明被中斷操作,被取消操作了。 !m.casItem(x, e)) { // 這里是將item值設(shè)置為null advanceHead(h, m); // 移動(dòng)頭結(jié)點(diǎn)到頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn) continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; }
首先獲取頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)用于互補(bǔ)操作,也就是take操作,接下來(lái)進(jìn)行閱讀不一致的判斷,防止其他線程進(jìn)行了閱讀操作,接下來(lái)獲取需要彈出內(nèi)容x=1,首先進(jìn)行判斷節(jié)點(diǎn)內(nèi)容是不是已經(jīng)被消費(fèi)了,節(jié)點(diǎn)內(nèi)容為null時(shí)則代表被消費(fèi)了,接下來(lái)判斷節(jié)點(diǎn)的item值是不是和本身相等如果相等話說(shuō)明節(jié)點(diǎn)被取消了或者被中斷了,然后移動(dòng)頭結(jié)點(diǎn)到下一個(gè)節(jié)點(diǎn)上,然后將refenrence-715的item值修改為null,至于為什么修改為null這里留下一個(gè)懸念,這里還是比較重要的,大家看到這里的時(shí)候需要注意下,顯然這些都不會(huì)成立,所以if語(yǔ)句中內(nèi)容不會(huì)被執(zhí)行,接下來(lái)的隊(duì)列的狀態(tài)是是這個(gè)樣子的:
OK,接下來(lái)就開(kāi)始移動(dòng)隊(duì)頭head了,將head移動(dòng)到m節(jié)點(diǎn)上,執(zhí)行代碼如下所示:
advanceHead(h, m);
此時(shí)隊(duì)列的狀態(tài)是這個(gè)樣子的:
LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e;
接下來(lái)將執(zhí)行喚醒被等待的線程,也就是thread-0,然后返回獲取item值1,take方法結(jié)束,但是這里并沒(méi)有結(jié)束,因?yàn)閱拘蚜藀ut的線程,此時(shí)會(huì)切換到put方法中,這時(shí)候線程喚醒后會(huì)執(zhí)行awaitFulfill方法,此時(shí)循環(huán)時(shí),有與item值修改為null則直接返回內(nèi)容。
Object x = s.item; if (x != e) return x;
這里的代碼我們可以對(duì)照插入第一個(gè)元素圖,s節(jié)點(diǎn)也就是當(dāng)前m節(jié)點(diǎn),獲取值得時(shí)候已經(jīng)修改為null,但是當(dāng)時(shí)插入的值時(shí)1,所以兩個(gè)不想等了,則直接返回null值。
Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e;
又返回到了transfer方法的if語(yǔ)句中,此時(shí)x和s并不相等所以不用進(jìn)行clean操作,首先判斷s節(jié)點(diǎn)是否已經(jīng)離隊(duì)了,顯然并沒(méi)有進(jìn)行離隊(duì)操作,advanceHead(t, s); 操作不會(huì)被執(zhí)行因?yàn)樯厦嬉呀鼘㈩^節(jié)點(diǎn)修改了,但是第一次插入的時(shí)候頭結(jié)點(diǎn)還是reference-716,此時(shí)已經(jīng)是reference-715,而t節(jié)點(diǎn)的引用地址是reference-716,所以不會(huì)操作,接下來(lái)就是將waiter設(shè)置為null,也就是忘記掉等待的線程。
分析了正常的take和put操作,接下來(lái)分析下中斷操作,由于中斷相應(yīng)后,會(huì)被執(zhí)行if(w.isInterrupted())這段代碼,它會(huì)執(zhí)行s.tryCancel(e)方法,這個(gè)方法的作用的是將QNode節(jié)點(diǎn)的item節(jié)點(diǎn)賦值為當(dāng)前QNode,這時(shí)候x和e值就不相等了( if (x != e)),x的值是s.item,則為當(dāng)前QNode,而e的值是用戶指定的值,這時(shí)候返回x(s.item)。返回到函數(shù)調(diào)用地方transfer中,這時(shí)候要執(zhí)行下面語(yǔ)句:
if (x == s) { clean(t, s); return null; }
進(jìn)入到clean方法執(zhí)行清理當(dāng)前節(jié)點(diǎn),下面是方法clean代碼:
/** * Gets rid of cancelled node s with original predecessor pred. */ void clean(QNode pred, QNode s) { s.waiter = null; // forget thread /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version * first. At least one of node s or the node previously * saved can always be deleted, so this always terminates. */ while (pred.next == s) { // Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) return; QNode tn = t.next; // 判斷現(xiàn)在的t是不是末尾節(jié)點(diǎn),可能其他線程插入了內(nèi)容導(dǎo)致不是最后的節(jié)點(diǎn)。 if (t != tail) continue; // 如果不是最后節(jié)點(diǎn)的話將其現(xiàn)在t.next節(jié)點(diǎn)作為tail尾節(jié)點(diǎn)。 if (tn != null) { advanceTail(t, tn); continue; } // 如果當(dāng)前節(jié)點(diǎn)不是尾節(jié)點(diǎn)進(jìn)入到這里面。 if (s != t) { // If not tail, try to unsplice // 獲取當(dāng)前節(jié)點(diǎn)(被取消的節(jié)點(diǎn))的下一個(gè)節(jié)點(diǎn)。 QNode sn = s.next; // 修改上一個(gè)節(jié)點(diǎn)的next(下一個(gè))元素為下下個(gè)節(jié)點(diǎn)。 if (sn == s || pred.casNext(s, sn)) //返回。 return; } QNode dp = cleanMe; if (dp != null) { // 嘗試清除上一個(gè)標(biāo)記為清除的節(jié)點(diǎn)。 QNode d = dp.next; //1.獲取要被清除的節(jié)點(diǎn) QNode dn; if (d == null || // 被清除節(jié)點(diǎn)不為空 d == dp || // 被清除節(jié)點(diǎn)已經(jīng)離隊(duì) !d.isCancelled() || // 被清除節(jié)點(diǎn)是標(biāo)記為Cancel狀態(tài)的。 (d != t && // 被清除節(jié)點(diǎn)不是尾節(jié)點(diǎn) (dn = d.next) != null && // 被清除節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)不為null dn != d && // that is on list dp.casNext(d, dn))) // 將被清除的節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)修改為被清除節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。 casCleanMe(dp, null); // 清空cleanMe節(jié)點(diǎn)。 if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) // 這里將上一個(gè)節(jié)點(diǎn)標(biāo)記為被清除操作,但是其實(shí)要操作的是下一個(gè)節(jié)點(diǎn)。 return; // Postpone cleaning s } }
如果節(jié)點(diǎn)中取消的頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),只需要移動(dòng)當(dāng)前head節(jié)點(diǎn)到下一個(gè)節(jié)點(diǎn)即可。
如果取消的是中間的節(jié)點(diǎn),則將當(dāng)前節(jié)點(diǎn)next節(jié)點(diǎn)修改為下下個(gè)節(jié)點(diǎn)。
如果修改為末尾的節(jié)點(diǎn),則將當(dāng)前節(jié)點(diǎn)放入到QNode的clearMe中,等待有內(nèi)容進(jìn)來(lái)之后下一次進(jìn)行清除操作。
實(shí)例一:清除頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn),下面是實(shí)例代碼進(jìn)行講解:
/** * 清除頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)實(shí)例代碼。 * * @author battleheart */ public class SynchronousQueueDemo { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); SynchronousQueuequeue = new SynchronousQueue<>(true); AtomicInteger atomicInteger = new AtomicInteger(0); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); Thread.sleep(200); Thread thread2 = new Thread(() -> { try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); Thread.sleep(2000); thread1.interrupt(); } }
上面例子說(shuō)明我們啟動(dòng)了兩個(gè)線程,分別向SynchronousQueue隊(duì)列中添加了元素1和元素2,添加成功之后的,讓主線程休眠一會(huì),然后將第一個(gè)線程進(jìn)行中斷操作,添加兩個(gè)元素后節(jié)點(diǎn)所處在的狀態(tài)為下圖所示:
當(dāng)我們調(diào)用thread1.interrupt時(shí),此時(shí)線程1等待的消費(fèi)操作將被終止,會(huì)相應(yīng)上面awaitFulfill方法,該方法會(huì)運(yùn)行下面代碼:
if (w.isInterrupted()) //嘗試取消,將當(dāng)前節(jié)點(diǎn)的item修改為當(dāng)前節(jié)點(diǎn)(this)。 s.tryCancel(e); // 獲取當(dāng)前節(jié)點(diǎn)內(nèi)容。 Object x = s.item; // 判斷當(dāng)前值和節(jié)點(diǎn)值不相同是返回,因?yàn)閺棾鰰r(shí)會(huì)將item值賦值為null。 if (x != e) return x;
首先上來(lái)現(xiàn)將s節(jié)點(diǎn)(上圖中的Reference-715引用對(duì)象)的item節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)引用(Reference-715引用對(duì)象),所以s節(jié)點(diǎn)和e=1不相等則直接返回,此時(shí)節(jié)點(diǎn)的狀態(tài)變化如下所示:
退出awaitFulfill并且返回的是s節(jié)點(diǎn)內(nèi)容(實(shí)際上返回的就是s節(jié)點(diǎn)),接下來(lái)返回到調(diào)用awaitFulfill的方法transfer方法中
Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // 是否是被取消了 clean(t, s); return null; }
首先判斷的事x節(jié)點(diǎn)和s節(jié)點(diǎn)是否相等,上面我們也說(shuō)了明顯是相等的所以這里會(huì)進(jìn)入到clean方法中,clean(QNode pred, QNode s)clean方法一個(gè)是前節(jié)點(diǎn),一個(gè)是當(dāng)前被取消的節(jié)點(diǎn),也就是當(dāng)前s節(jié)點(diǎn)的前節(jié)點(diǎn)是head節(jié)點(diǎn),接下來(lái)我們一步一步的分析代碼:
s.waiter = null; // 刪除等待的線程。
進(jìn)入到方法體之后首先先進(jìn)行的是將當(dāng)前節(jié)點(diǎn)的等待線程刪除,如下圖所示:
接下來(lái)進(jìn)入while循環(huán),循環(huán)內(nèi)容時(shí)pred.next == s如果不是則表示已經(jīng)移除了節(jié)點(diǎn),反之還在隊(duì)列中,則進(jìn)行下面的操作:
QNode h = head; QNode hn = h.next; // 如果取消的是第一個(gè)節(jié)點(diǎn)則進(jìn)入下面語(yǔ)句 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; }
可以看到首先h節(jié)點(diǎn)為head節(jié)點(diǎn),hn為頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),在進(jìn)行判斷頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為空并且頭結(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)是被中斷的節(jié)點(diǎn)(取消的節(jié)點(diǎn)),則進(jìn)入到if語(yǔ)句中,if語(yǔ)句其實(shí)也很簡(jiǎn)單就是將頭結(jié)點(diǎn)修改為頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(s節(jié)點(diǎn),別取消節(jié)點(diǎn),并且將前節(jié)點(diǎn)的next節(jié)點(diǎn)修改為自己,也就是移除了之前的節(jié)點(diǎn),我們看下advanceHead方法:
void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next }
首先上來(lái)先進(jìn)行CAS移動(dòng)頭結(jié)點(diǎn),再講原來(lái)頭結(jié)點(diǎn)h的next節(jié)點(diǎn)修改為自己(h),為什么這樣做呢?因?yàn)樯厦孢M(jìn)行advanceHead之后并沒(méi)有退出循環(huán),是進(jìn)行continue操作,也就是它并沒(méi)有跳出while循環(huán),他還會(huì)循環(huán)一次prev.next此時(shí)已經(jīng)不能等于s所以退出循環(huán),如下圖所示:
實(shí)例二:清除中間的節(jié)點(diǎn)
/** * SynchronousQueue實(shí)例二,清除中間的節(jié)點(diǎn)。 * * @author battleheart */ public class SynchronousQueueDemo { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); SynchronousQueuequeue = new SynchronousQueue<>(true); AtomicInteger atomicInteger = new AtomicInteger(0); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); //休眠一會(huì)。 Thread.sleep(200); Thread thread2 = new Thread(() -> { try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); //休眠一會(huì)。 Thread.sleep(200); Thread thread3 = new Thread(() -> { try { queue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } }); thread3.start(); //休眠一會(huì)。 Thread.sleep(10000); thread2.interrupt(); } }
看上面例子,首先先進(jìn)行put操作三次,也就是入隊(duì)3條數(shù)據(jù),分別是整型值1,整型值2,整型值3,然后將當(dāng)前線程休眠一下,對(duì)中間線程進(jìn)行中斷操作,通過(guò)讓主線程休眠一會(huì)保證線程執(zhí)行順序性(當(dāng)然上面線程不一定能保證執(zhí)行順序,因?yàn)閜ut操作一下子就執(zhí)行完了所以這點(diǎn)時(shí)間是可以的),此時(shí)隊(duì)列所處的狀態(tài)來(lái)看一下下圖:
當(dāng)休眠一會(huì)之后,進(jìn)入到threa2進(jìn)行中斷操作,目前上圖中表示Reference-723被中斷操作,此時(shí)也會(huì)進(jìn)入到awaitFulfill方法中,將Reference-723的item節(jié)點(diǎn)修改為當(dāng)前節(jié)點(diǎn),如下圖所示:
進(jìn)入到clear方法中此時(shí)的prev節(jié)點(diǎn)為Reference-715,s節(jié)點(diǎn)是被清除節(jié)點(diǎn),還是首先進(jìn)入clear方法中先將waiter設(shè)置為null,取消當(dāng)前線程內(nèi)容,如下圖所示:
接下來(lái)進(jìn)入到循環(huán)中,進(jìn)行下面處理
QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; // Ensure consistent read for tail if (t == h) return; QNode tn = t.next; if (t != tail) continue; if (tn != null) { advanceTail(t, tn); continue; } if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; }
第一個(gè)if語(yǔ)句已經(jīng)分析過(guò)了所以說(shuō)這里不會(huì)進(jìn)入到里面去,接下來(lái)是進(jìn)行尾節(jié)點(diǎn)t是否是等于head節(jié)點(diǎn)如果相等則代表沒(méi)有元素,在判斷當(dāng)前方法的t尾節(jié)點(diǎn)是不是真正的尾節(jié)點(diǎn)tail如果不是則進(jìn)行修改尾節(jié)點(diǎn),先來(lái)看一下現(xiàn)在的狀態(tài):
tn != null判斷如果tn不是尾節(jié)點(diǎn),則將tn作為尾節(jié)點(diǎn)處理,如果處理之后還不是尾節(jié)點(diǎn)還會(huì)進(jìn)行處理直到tail是尾節(jié)點(diǎn)未知,我們現(xiàn)在這個(gè)是尾節(jié)點(diǎn)所以跳過(guò)這段代碼。s != t通過(guò)上圖可以看到s節(jié)點(diǎn)是被清除節(jié)點(diǎn),并不是尾節(jié)點(diǎn)所以進(jìn)入到循環(huán)中:
if (s != t) { // If not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; }
首先獲取的s節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),上圖中表示Reference-725節(jié)點(diǎn),判斷sn是都等于當(dāng)前節(jié)點(diǎn)顯然這一條不成立,pred節(jié)點(diǎn)為Reference-715節(jié)點(diǎn),將715節(jié)點(diǎn)的next節(jié)點(diǎn)變成Reference-725節(jié)點(diǎn),這里就將原來(lái)的節(jié)點(diǎn)清理出去了,現(xiàn)在的狀態(tài)如下所示:
實(shí)例三:刪除的節(jié)點(diǎn)是尾節(jié)點(diǎn)
/** * SynchronousQueue實(shí)例三,刪除的節(jié)點(diǎn)為尾節(jié)點(diǎn) * * @author battleheart */ public class SynchronousQueueDemo { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); SynchronousQueuequeue = new SynchronousQueue<>(true); AtomicInteger atomicInteger = new AtomicInteger(0); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); Thread thread2 = new Thread(() -> { try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); Thread.sleep(10000); thread2.interrupt(); Thread.sleep(10000); Thread thread3 = new Thread(() -> { try { queue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } }); thread3.start(); Thread.sleep(10000); thread3.interrupt(); } }
該例子主要說(shuō)明一個(gè)問(wèn)題就是刪除的節(jié)點(diǎn)如果是末尾節(jié)點(diǎn)的話,clear方法又是如何處理的,首先啟動(dòng)了三個(gè)線程其中主線程休眠了一會(huì),為了能讓插入的順序保持線程1,線程2,線程3這樣子,啟動(dòng)第二個(gè)線程后,又將第二個(gè)線程中斷,這是第二個(gè)線程插入的節(jié)點(diǎn)為尾節(jié)點(diǎn),然后再啟動(dòng)第三個(gè)節(jié)點(diǎn)插入值,再中斷了第三個(gè)節(jié)點(diǎn)末尾節(jié)點(diǎn),說(shuō)一下為啥這樣操作,因?yàn)楫?dāng)清除尾節(jié)點(diǎn)時(shí),并不是直接移除當(dāng)前節(jié)點(diǎn),而是將被清除的節(jié)點(diǎn)的前節(jié)點(diǎn)設(shè)置到QNode的CleanMe中,等待下次clear方法時(shí)進(jìn)行清除上次保存在CleanMe的節(jié)點(diǎn),然后再處理當(dāng)前被中斷節(jié)點(diǎn),將新的被清理的節(jié)點(diǎn)prev設(shè)置為cleanMe當(dāng)中,等待下次進(jìn)行處理,接下來(lái)一步一步分析,首先我們先來(lái)看一下第二個(gè)線程啟動(dòng)后節(jié)點(diǎn)的狀態(tài)。
此時(shí)運(yùn)行thread2.interrupt();將第二個(gè)線程中斷,這時(shí)候會(huì)進(jìn)入到clear方法中,前面的代碼都不會(huì)被返回,會(huì)執(zhí)行下面的語(yǔ)句:
QNode dp = cleanMe; if (dp != null) { // Try unlinking previous cancelled node QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return;
首先獲得TransferQueue當(dāng)中cleanMe節(jié)點(diǎn),此時(shí)獲取的為null,當(dāng)判斷dp!=null時(shí)就會(huì)被跳過(guò),直接執(zhí)行
casCleanMe(null, pred)此時(shí)pred傳入的值時(shí)t節(jié)點(diǎn)指向的內(nèi)容,也就是當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),它會(huì)被標(biāo)記為清除操作節(jié)點(diǎn)(其實(shí)并不清楚它而是清除它下一個(gè)節(jié)點(diǎn),也就是說(shuō)item=this的節(jié)點(diǎn)),此時(shí)看一下節(jié)點(diǎn)狀態(tài)為下圖所示:
接下來(lái)第三個(gè)線程啟動(dòng)了這時(shí)候又往隊(duì)列中添加了元素3,此時(shí)隊(duì)列的狀況如下圖所示:
此時(shí)thread3也被中斷操作了,這時(shí)候還是運(yùn)行上面的代碼,但是這次不同的點(diǎn)在于cleanMe已經(jīng)不是空值,是有內(nèi)容的,首先獲取的是cleanMe的下一個(gè)節(jié)點(diǎn)(d),然我來(lái)把變量標(biāo)記在圖上然后看起來(lái)好分析一些,如下圖所示:
dp表示d節(jié)點(diǎn)的前一個(gè)pred節(jié)點(diǎn),dn表示d節(jié)點(diǎn)的next節(jié)點(diǎn),主要邏輯在這里:
if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s
首先判斷d節(jié)點(diǎn)是不是為null,如果d節(jié)點(diǎn)為null代表已經(jīng)清除掉了,如果cleanMe節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)和自己相等,說(shuō)明需要清除的節(jié)點(diǎn)已經(jīng)離隊(duì)了,判斷下個(gè)節(jié)點(diǎn)是不是需要被清除的節(jié)點(diǎn),目前看d節(jié)點(diǎn)是被清除的節(jié)點(diǎn),然后就將被清除的節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值給dn并且判斷d節(jié)點(diǎn)是不是末尾節(jié)點(diǎn),如果不是末尾節(jié)點(diǎn)則進(jìn)行dp.casNext方法,這個(gè)地方是關(guān)鍵點(diǎn),它將被清除節(jié)點(diǎn)d的前節(jié)點(diǎn)的next節(jié)點(diǎn)修改為被清除節(jié)點(diǎn)d的后面節(jié)點(diǎn)dn,然后調(diào)用caseCleanMe將TransferQueue中的cleanMe節(jié)點(diǎn)清空,此時(shí)節(jié)點(diǎn)的內(nèi)容如下所示:
可以看出將上一次標(biāo)記為清除的節(jié)點(diǎn)清除了隊(duì)列中,清除完了就完事兒?那這次的怎么弄呢?因?yàn)楝F(xiàn)在運(yùn)行的是thread3的中斷程序,所以上面并沒(méi)有退出,而是再次進(jìn)入循環(huán),循環(huán)之后發(fā)現(xiàn)dp為null則會(huì)運(yùn)行casCleanMe(null, pred),此時(shí)當(dāng)前節(jié)點(diǎn)s的前一個(gè)節(jié)點(diǎn)已經(jīng)被清除隊(duì)列,但是并不影響后續(xù)的清除操作,因?yàn)榍肮?jié)點(diǎn)的next節(jié)點(diǎn)還在維護(hù)中,也是前節(jié)點(diǎn)的next指向還是reference-725,如下圖所示:
就此分析完畢如果有不正確的地方請(qǐng)指正。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/74510.html
摘要:開(kāi)篇說(shuō)明本文分析采用的是約定下面內(nèi)容中代表的是引用地址,引用對(duì)應(yīng)的節(jié)點(diǎn)前面已經(jīng)講解了公平模式的內(nèi)容,今天來(lái)講解下關(guān)于非公平模式下的是如何進(jìn)行工作的,在源碼分析的時(shí)候,先來(lái)簡(jiǎn)單看一下非公平模式的簡(jiǎn)單原理,它采用的棧這種先進(jìn)后出的方式進(jìn)行非公 開(kāi)篇 說(shuō)明:本文分析采用的是jdk1.8約定:下面內(nèi)容中Ref-xxx代表的是引用地址,引用對(duì)應(yīng)的節(jié)點(diǎn) 前面已經(jīng)講解了公平模式的內(nèi)容,今天來(lái)講解下...
摘要:概述前面已經(jīng)講解了關(guān)于的非公平鎖模式,關(guān)于非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平鎖內(nèi)部是如何實(shí)現(xiàn)公平的如果沒(méi)有看過(guò)非公平鎖的先去了解下非公平鎖,因?yàn)檫@篇文章前面不會(huì)講太多內(nèi)部結(jié)構(gòu),直接會(huì)對(duì)源碼進(jìn)行分析前文 概述 前面已經(jīng)講解了關(guān)于AQS的非公平鎖模式,關(guān)于NonfairSync非公平鎖,內(nèi)部其實(shí)告訴我們誰(shuí)先爭(zhēng)搶到鎖誰(shuí)就先獲得資源,下面就來(lái)分析一下公平...
摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無(wú)鎖算法,其性能一般比單純的其它阻塞隊(duì)列要高。它的最大特點(diǎn)時(shí)不存儲(chǔ)實(shí)際元素,而是在內(nèi)部通過(guò)棧或隊(duì)列結(jié)構(gòu)保存阻塞線程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog... 一、Synchro...
摘要:內(nèi)部提供了兩種的實(shí)現(xiàn),一種公平模式,一種是非公平模式,如果沒(méi)有特別指定在構(gòu)造器中,默認(rèn)是非公平的模式,我們可以看一下無(wú)參的構(gòu)造函數(shù)。 概述 并發(fā)編程中,ReentrantLock的使用是比較多的,包括之前講的LinkedBlockingQueue和ArrayBlockQueue的內(nèi)部都是使用的ReentrantLock,談到它又不能的不說(shuō)AQS,AQS的全稱(chēng)是AbstractQueue...
摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。同時(shí),也用于自帶線程池的緩沖隊(duì)列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線程程序帶來(lái)極大的便利。同時(shí),BlockingQueue也用于...
閱讀 3204·2021-11-25 09:43
閱讀 3415·2021-11-11 16:54
閱讀 842·2021-11-02 14:42
閱讀 3760·2021-09-30 09:58
閱讀 3670·2021-09-29 09:44
閱讀 1287·2019-08-30 15:56
閱讀 2105·2019-08-30 15:54
閱讀 2993·2019-08-30 15:43