摘要:所以,在并發(fā)量適中的情況下,一般具有較好的性能。字段指向隊(duì)列頭,指向隊(duì)列尾,通過來操作字段值以及對象的字段值。單線程的情況下,元素入隊(duì)比較好理解,直接線性地在隊(duì)首插入元素即可。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、ConcurrentLinkedQueue簡介
ConcurrentLinkedQueue是JDK1.5時(shí)隨著J.U.C一起引入的一個(gè)支持并發(fā)環(huán)境的隊(duì)列。從名字就可以看出來,ConcurrentLinkedQueue底層是基于鏈表實(shí)現(xiàn)的。
Doug Lea在實(shí)現(xiàn)ConcurrentLinkedQueue時(shí),并沒有利用鎖或底層同步原語,而是完全基于自旋+CAS的方式實(shí)現(xiàn)了該隊(duì)列。回想一下AQS,AQS內(nèi)部的CLH等待隊(duì)列也是利用了這種方式。
由于是完全基于無鎖算法實(shí)現(xiàn)的,所以當(dāng)出現(xiàn)多個(gè)線程同時(shí)進(jìn)行修改隊(duì)列的操作(比如同時(shí)入隊(duì)),很可能出現(xiàn)CAS修改失敗的情況,那么失敗的線程會(huì)進(jìn)入下一次自旋,再嘗試入隊(duì)操作,直到成功。所以,在并發(fā)量適中的情況下,ConcurrentLinkedQueue一般具有較好的性能。
二、ConcurrentLinkedQueue原理 隊(duì)列結(jié)構(gòu)我們來看下ConcurrentLinkedQueue的內(nèi)部結(jié)構(gòu),:
public class ConcurrentLinkedQueueextends AbstractQueue implements Queue , java.io.Serializable { ? /** * 隊(duì)列頭指針 */ private transient volatile Node head; ? /** * 隊(duì)列尾指針. */ private transient volatile Node tail; ? // Unsafe mechanics ? private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } ? /** * 隊(duì)列結(jié)點(diǎn)定義 */ private static class Node { volatile E item; // 元素值 volatile Node next; // 后驅(qū)指針 ? Node(E item) { UNSAFE.putObject(this, itemOffset, item); } ? boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } ? void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } ? boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } ? // Unsafe mechanics ? private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } ? //... }
可以看到,ConcurrentLinkedQueue內(nèi)部就是一個(gè)簡單的單鏈表結(jié)構(gòu),每入隊(duì)一個(gè)元素就是插入一個(gè)Node類型的結(jié)點(diǎn)。字段head指向隊(duì)列頭,tail指向隊(duì)列尾,通過Unsafe來CAS操作字段值以及Node對象的字段值。
構(gòu)造器定義ConcurrentLinkedQueue包含兩種構(gòu)造器:
/** * 構(gòu)建一個(gè)空隊(duì)列(head,tail均指向一個(gè)占位結(jié)點(diǎn)). */ public ConcurrentLinkedQueue() { head = tail = new Node(null); }
/** * 根據(jù)已有集合,構(gòu)造隊(duì)列 */ public ConcurrentLinkedQueue(Collection extends E> c) { Nodeh = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node (e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node (null); head = h; tail = t; }
我們重點(diǎn)看下空構(gòu)造器,通過空構(gòu)造器建立的ConcurrentLinkedQueue對象,其head和tail指針并非指向null,而是指向一個(gè)item值為null的Node結(jié)點(diǎn)——哨兵結(jié)點(diǎn),如下圖:
元素的入隊(duì)是在隊(duì)尾插入元素,關(guān)于隊(duì)列的操作,如果讀者不熟悉,可以參考《Algorithms 4th》或我的這篇博文:https://www.jianshu.com/p/f9b...
ConcurrentLinkedQueue的入隊(duì)代碼很簡單,卻非常精妙:
/** * 入隊(duì)一個(gè)元素. * * @throws NullPointerException 元素不能為null */ public boolean add(E e) { return offer(e); }
/** * 在隊(duì)尾入隊(duì)元素e, 直到成功 */ public boolean offer(E e) { checkNotNull(e); final NodenewNode = new Node (e); for (Node t = tail, p = t; ; ) { // 自旋, 直到插入結(jié)點(diǎn)成功 Node q = p.next; if (q == null) { // CASE1: 正常情況下, 新結(jié)點(diǎn)直接插入到隊(duì)尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會(huì)在下一次自旋中進(jìn)入該邏輯 casTail(t, newNode); // 重新設(shè)置隊(duì)尾指針tail return true; } // CAS競爭插入失敗,則進(jìn)入下一次自旋 ? } else if (p == q) // CASE2: 發(fā)生了出隊(duì)操作 p = (t != (t = tail)) ? t : head; else // 將p重新指向隊(duì)尾結(jié)點(diǎn) p = (p != t && t != (t = tail)) ? t : q; } }
我們來分析下上面offer方法的實(shí)現(xiàn)。單線程的情況下,元素入隊(duì)比較好理解,直接線性地在隊(duì)首插入元素即可。現(xiàn)在我們假設(shè)有兩個(gè)線程ThreadA和ThreadB同時(shí)進(jìn)行入隊(duì)操作:
①ThreadA先多帶帶入隊(duì)兩個(gè)元素9、2
此時(shí)隊(duì)列的結(jié)構(gòu)如下:
②ThreadA入隊(duì)元素“10”,ThreadB入隊(duì)元素“25”
此時(shí)ThreadA和ThreadB若并發(fā)執(zhí)行,我們看下會(huì)發(fā)生什么:
1、ThreadA和ThreadB同時(shí)進(jìn)入自旋中的以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結(jié)點(diǎn)直接插入到隊(duì)尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會(huì)在下一次自旋中進(jìn)入該邏輯 casTail(t, newNode); // 重新設(shè)置隊(duì)尾指針tail return true; } // CAS競爭插入失敗,則進(jìn)入下一次自旋 ? }
2、ThreadA執(zhí)行cas操作(p.casNext)成功,插入新結(jié)點(diǎn)“10”
ThreadA執(zhí)行完成后,直接返回true,隊(duì)列結(jié)構(gòu)如下:
3、ThreadB執(zhí)行cas操作(p.casNext)失敗
由于CAS操作同時(shí)修改隊(duì)尾元素,導(dǎo)致ThreadB操作失敗,則ThreadB進(jìn)入下一次自旋;
在下一次自旋中,進(jìn)入以下代碼塊:
else // 將p重新指向隊(duì)尾結(jié)點(diǎn) p = (p != t && t != (t = tail)) ? t : q;
上述分支的作用就是讓p指針重新定位到隊(duì)尾結(jié)點(diǎn),此時(shí)隊(duì)列結(jié)構(gòu)如下:
然后ThreadB會(huì)繼續(xù)下一次自旋,并再次進(jìn)入以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結(jié)點(diǎn)直接插入到隊(duì)尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會(huì)在下一次自旋中進(jìn)入該邏輯 casTail(t, newNode); // 重新設(shè)置隊(duì)尾指針tail return true; } // CAS競爭插入失敗,則進(jìn)入下一次自旋 ? }
此時(shí),CAS操作成功,隊(duì)列結(jié)構(gòu)如下:
由于此時(shí)p!=t ,所以會(huì)調(diào)用casTail方法重新設(shè)置隊(duì)尾指針:
casTail(t, newNode); // 重新設(shè)置隊(duì)尾指針tail
最終隊(duì)列如下:
從上面的分析過程可以看到,由于入隊(duì)元素一定是要鏈接到隊(duì)尾的,但并發(fā)情況下隊(duì)尾結(jié)點(diǎn)可能隨時(shí)變化,所以就需要指針定位最新的隊(duì)尾結(jié)點(diǎn),并在入隊(duì)時(shí)判斷隊(duì)尾結(jié)點(diǎn)是否改變了,如果改變了,就需要重新設(shè)置定位指針,然后在下一次自旋中繼續(xù)嘗試入隊(duì)操作。
上面整個(gè)執(zhí)行步驟有一段分支還沒有覆蓋到:
else if (p == q) // CASE2: 發(fā)生了出隊(duì)操作 p = (t != (t = tail)) ? t : head;
這個(gè)分支只有在元素入隊(duì)的同時(shí),針對該元素也發(fā)生了“出隊(duì)”操作才會(huì)執(zhí)行,我們后面會(huì)分析元素的“出隊(duì)”,理解了“出隊(duì)”操作再回頭來看這個(gè)分支就容易理解很多了。
出隊(duì)操作隊(duì)列中元素的“出隊(duì)”是從隊(duì)首移除元素,我們來看下ConcurrentLinkedQueue是如何實(shí)現(xiàn)出隊(duì)的:
/** * 在隊(duì)首出隊(duì)元素, 直到成功 */ public E poll() { restartFromHead: for (; ; ) { for (Nodeh = head, p = h, q; ; ) { E item = p.item; ? if (item != null && p.casItem(item, null)) { // CASE2: 隊(duì)首是非哨兵結(jié)點(diǎn)(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // CASE1: 隊(duì)首是一個(gè)哨兵結(jié)點(diǎn)(item==null) updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
還是通過示例來看,假設(shè)初始的隊(duì)列結(jié)構(gòu)如下:
①ThreadA先多帶帶進(jìn)行出隊(duì)操作
由于head所指的是item==null的結(jié)點(diǎn),所以ThreadA會(huì)執(zhí)行以下分支:
else p = q;
然后進(jìn)入下一次自旋,在自旋中執(zhí)行以下分支,如果CAS操作成功,則移除首個(gè)有效元素,并重新設(shè)置頭指針:
if (item != null && p.casItem(item, null)) { // CASE2: 隊(duì)首是非哨兵結(jié)點(diǎn)(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
此時(shí)隊(duì)列的結(jié)構(gòu)如下:
如果ThreadA的CAS操作失敗呢?
CAS操作失敗則會(huì)進(jìn)入以下分支,并重新開始自旋:
else if (p == q) continue restartFromHead;
最終前面兩個(gè)null結(jié)點(diǎn)會(huì)被GC回收,隊(duì)列結(jié)構(gòu)如下:
②ThreadA繼續(xù)進(jìn)行出隊(duì)操作
ThreadA繼續(xù)執(zhí)行“出隊(duì)”操作,還是執(zhí)行以下分支:
if (item != null && p.casItem(item, null)) { // CASE2: 隊(duì)首是非哨兵結(jié)點(diǎn)(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
但是此時(shí)p==h,所以僅將頭結(jié)點(diǎn)置null,這其實(shí)是一種“懶刪除”的策略。
出隊(duì)元素“2”:
出隊(duì)元素“10”:
最終隊(duì)列結(jié)果如下:
③ThreadA進(jìn)行出隊(duì),其它線程進(jìn)行入隊(duì)
這是最特殊的一種情況,當(dāng)隊(duì)列中只剩下一個(gè)元素時(shí),如果同時(shí)發(fā)生出隊(duì)和入隊(duì)操作,會(huì)導(dǎo)致隊(duì)列出現(xiàn)下面這種結(jié)構(gòu):(假設(shè)ThreadA進(jìn)行出隊(duì)元素“25”,ThreadB進(jìn)行入隊(duì)元素“11”)
此時(shí)tail.next=tail自身,所以ThreadB在執(zhí)行入隊(duì)時(shí),會(huì)進(jìn)入到offer方法的以下分支:
else if (p == q) // CASE2: 發(fā)生了出隊(duì)操作 p = (t != (t = tail)) ? t : head;三、總結(jié)
ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程并發(fā)訪問時(shí)的數(shù)據(jù)一致性。由于隊(duì)列本身是一種鏈表結(jié)構(gòu),所以雖然算法看起來很簡單,但其實(shí)需要考慮各種并發(fā)的情況,實(shí)現(xiàn)復(fù)雜度較高,并且ConcurrentLinkedQueue不具備實(shí)時(shí)的數(shù)據(jù)一致性,實(shí)際運(yùn)用中,隊(duì)列一般在生產(chǎn)者-消費(fèi)者的場景下使用得較多,所以ConcurrentLinkedQueue的使用場景并不如阻塞隊(duì)列那么多。
另外,關(guān)于ConcurrentLinkedQueue還有以下需要注意的幾點(diǎn):
ConcurrentLinkedQueue的迭代器是弱一致性的,這在并發(fā)容器中是比較普遍的現(xiàn)象,主要是指在一個(gè)線程在遍歷隊(duì)列結(jié)點(diǎn)而另一個(gè)線程嘗試對某個(gè)隊(duì)列結(jié)點(diǎn)進(jìn)行修改的話不會(huì)拋出ConcurrentModificationException,這也就造成在遍歷某個(gè)尚未被修改的結(jié)點(diǎn)時(shí),在next方法返回時(shí)可以看到該結(jié)點(diǎn)的修改,但在遍歷后再對該結(jié)點(diǎn)修改時(shí)就看不到這種變化。
size方法需要遍歷鏈表,所以在并發(fā)情況下,其結(jié)果不一定是準(zhǔn)確的,只能供參考。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76960.html
摘要:在之前,除了類外,并沒有其它適合并發(fā)環(huán)境的棧數(shù)據(jù)結(jié)構(gòu)。作為雙端隊(duì)列,可以當(dāng)作棧來使用,并且高效地支持并發(fā)環(huán)境。 showImg(https://segmentfault.com/img/bVbguF7?w=1280&h=853); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、引言 在開始講ConcurrentLinkedDeque之前...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會(huì)進(jìn)行同步。可以看到,上述方法返回一個(gè)迭代器對象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對象,以組合方式,委托對象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進(jìn)行的,不會(huì)拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
閱讀 2674·2021-11-24 09:38
閱讀 1985·2019-08-30 15:53
閱讀 1246·2019-08-30 15:44
閱讀 3237·2019-08-30 14:10
閱讀 3588·2019-08-29 16:29
閱讀 1808·2019-08-29 16:23
閱讀 1107·2019-08-29 16:20
閱讀 1476·2019-08-29 11:13