摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、LinkedBlockingDeque簡介
LinkedBlockingDeque和ConcurrentLinkedDeque類似,都是一種雙端隊列的結構,只不過LinkedBlockingDeque同時也是一種阻塞隊列,它是在JDK1.5時隨著J.U.C包引入的,實現了BlockingDueue接口,底層基于雙鏈表實現:
注意:LinkedBlockingDeque底層利用ReentrantLock實現同步,并不像ConcurrentLinkedDeque那樣采用無鎖算法。
另外,LinkedBlockingDeque是一種近似有界阻塞隊列,為什么說近似?因為LinkedBlockingDeque既可以在初始構造時就指定隊列的容量,也可以不指定,如果不指定,那么它的容量大小默認為Integer.MAX_VALUE。
BlockingDeque接口截止目前為止,我們介紹的阻塞隊列都是實現了BlockingQueue接口。和普通雙端隊列接口——Deque一樣,J.U.C中也有一種阻塞的雙端隊列接口——BlockingDeque。BlockingDeque是JDK1.6時,J.U.C包新增的一個接口:
BlockingDeque的類繼承關系圖:
我們知道,BlockingQueue中阻塞方法一共有4個:put(e)、take();offer(e, time, unit)、poll(time, unit),忽略限時等待的阻塞方法,一共就兩個:
隊尾入隊:put(e)
隊首出隊:take()
BlockingDeque相對于BlockingQueue,最大的特點就是增加了在隊首入隊/隊尾出隊的阻塞方法。下面是兩個接口的比較:
阻塞方法 | BlockingQueue | BlockingDeque |
---|---|---|
隊首入隊 | / | putFirst(e) |
隊首出隊 | take() | takeFirst() |
隊尾入隊 | put(e) | putLast(e) |
隊尾出隊 | / | takeLast() |
LinkedBlockingDeque一共三種構造器,不指定容量時,默認為Integer.MAX_VALUE:
/** * 默認構造器. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); }
/** * 指定容量的構造器. */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; }
/** * 從已有集合構造隊列. */ public LinkedBlockingDeque(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node內部結構(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
LinkedBlockingDeque內部是雙鏈表的結構,結點Node的定義如下:
/** * 雙鏈表結點定義 */ static final class Node{ /** * 結點值, null表示該結點已被移除. */ E item; /** * 前驅結點指針. */ Node prev; /** * 后驅結點指針. */ Node next; Node(E x) { item = x; } }
字段first指向隊首結點,字段last指向隊尾結點。另外LinkedBlockingDeque利用ReentrantLock來保證線程安全,所有對隊列的修改操作都需要先獲取這把全局鎖:
public class LinkedBlockingDeque隊尾入隊——putextends AbstractQueue implements BlockingDeque , java.io.Serializable { /** * 隊首結點指針. */ transient Node first; /** * 隊尾結點指針. */ transient Node last; /** * 隊列元素個數. */ private transient int count; /** * 隊列容量. */ private final int capacity; /** * 全局鎖 */ final ReentrantLock lock = new ReentrantLock(); /** * 出隊線程條件隊列(隊列為空時,出隊線程在此等待) */ private final Condition notEmpty = lock.newCondition(); /** * 入隊線程條件隊列(隊列為滿時,入隊線程在此等待) */ private final Condition notFull = lock.newCondition(); //... }
先來看下,LinkedBlockingDeque是如何實現正常的從隊尾入隊的:
/** * 在隊尾入隊元素e. * 如果隊列已滿, 則阻塞線程. */ public void put(E e) throws InterruptedException { putLast(e); } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 隊列不能包含null元素 Nodenode = new Node (e); // 創建入隊結點 final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) // 隊列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
put方法內部調用了putLast方法,這是Deque接口獨有的方法。上述入隊操作的關鍵是linkLast方法:
/** * 將結點node鏈接到隊尾, 如果失敗, 則返回false. */ private boolean linkLast(Nodenode) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊列已滿, 直接返回 return false; // 以下是雙鏈表的"尾插"操作 Node l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; // 隊列元素加1 notEmpty.signal(); // 喚醒一個等待的出隊線程 return true; }
linkLast方法在隊尾插入一個結點,插入失敗(隊列已滿的情況)則返回false。插入成功,則喚醒一個正在等待的出隊線程:
初始:
隊尾插入結點node:
隊首入隊就是雙鏈表的“頭插法”插入一個結點,如果隊列已滿,則阻塞調用線程:
/** * 在隊首入隊元素e. * 如果隊列已滿, 則阻塞線程. */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Nodenode = new Node (e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) // 隊列已滿, 則阻塞線程 notFull.await(); } finally { lock.unlock(); } }
/** * 在隊首插入一個結點, 插入失敗則返回null. */ private boolean linkFirst(Nodenode) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) // 隊列已滿 return false; // 以下是雙鏈表的“頭插”操作 Node f = first; node.next = f; first = node; if (last == null) last = node; else f.prev = node; ++count; // 隊列元素數量加1 notEmpty.signal(); // 喚醒一個等待的出隊線程 return true; }
初始:
隊首插入結點node:
隊首出隊的邏輯很簡單,如果隊列為空,則阻塞調用線程:
/** * 從隊首出隊一個元素, 如果隊列為空, 則阻塞線程. */ public E take() throws InterruptedException { return takeFirst(); } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkFirst()) == null) // 隊列為空, 則阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實際的出隊由unlinkFirst方法執行:
/** * 從隊首刪除一個元素, 失敗則返回null. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Nodef = first; if (f == null) // 隊列為空 return null; // 以下是雙鏈表的頭部刪除過程 Node n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; // 隊列元素個數減1 notFull.signal(); // 喚醒一個等待的入隊線程 return item; }
初始:
刪除隊首結點:
隊尾出隊的邏輯很簡單,如果隊列為空,則阻塞調用線程:
/** * 從隊尾出隊一個元素, 如果隊列為空, 則阻塞線程. */ public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ((x = unlinkLast()) == null) // 隊列為空, 阻塞線程 notEmpty.await(); return x; } finally { lock.unlock(); } }
實際的出隊由unlinkLast方法執行:
/** * 刪除隊尾元素, 如果失敗, 則返回null. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Nodel = last; if (l == null) // 隊列為空 return null; // 以下為雙鏈表的尾部刪除過程 Node p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; // 隊列元素個數減1 notFull.signal(); // 喚醒一個等待的入隊線程 return item; }
初始:
刪除隊尾結點:
LinkedBlockingDeque作為一種阻塞雙端隊列,提供了隊尾刪除元素和隊首插入元素的阻塞方法。該類在構造時一般需要指定容量,如果不指定,則最大容量為Integer.MAX_VALUE。另外,由于內部通過ReentrantLock來保證線程安全,所以LinkedBlockingDeque的整體實現時比較簡單的。
另外,雙端隊列相比普通隊列,主要是多了【隊尾出隊元素】/【隊首入隊元素】的功能。
阻塞隊列我們知道一般用于“生產者-消費者”模式,而雙端阻塞隊列在“生產者-消費者”就可以利用“雙端”的特性,從隊尾出隊元素。
考慮下面這樣一種場景:有多個消費者,每個消費者有自己的一個消息隊列,生產者不斷的生產數據扔到隊列中,消費者消費數據有快又慢。為了提升效率,速度快的消費者可以從其它消費者隊列的隊尾出隊元素放到自己的消息隊列中,由于是從其它隊列的隊尾出隊,這樣可以減少并發沖突(其它消費者從隊首出隊元素),又能提升整個系統的吞吐量。這其實是一種“工作竊取算法”的思路。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77132.html
摘要:在章節中,我們說過,維護了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導致任一時間點只有一個線程能夠執行。入隊鎖對應的是條件隊列,出隊鎖對應的是條件隊列,所以每入隊一個元素,應當立即去喚醒可能阻塞的其它入隊線程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發于一世流云專欄:https://seg...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
閱讀 1274·2021-11-17 09:33
閱讀 1742·2021-09-09 11:53
閱讀 3208·2021-09-04 16:45
閱讀 1382·2021-08-17 10:12
閱讀 2383·2019-08-30 15:55
閱讀 1779·2019-08-30 15:53
閱讀 2404·2019-08-30 15:52
閱讀 2558·2019-08-29 18:41