摘要:在隊尾插入指定元素,如果隊列已滿,則阻塞線程加鎖隊列已滿。這里必須用,防止虛假喚醒在隊列上等待之所以這樣做,是防止線程被意外喚醒,不經再次判斷就直接調用方法。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、ArrayBlockingQueue簡介
ArrayBlockingQueue是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基于數組實現:
ArrayBlockingQueue是一種有界阻塞隊列,在初始構造的時候需要指定隊列的容量。具有如下特點:
隊列的容量一旦在構造時指定,后續不能改變;
插入元素時,在隊尾進行;刪除元素時,在隊首進行;
隊列滿時,調用特定方法插入元素會阻塞線程;隊列空時,刪除元素也會阻塞線程;
支持公平/非公平策略,默認為非公平策略。
這里的公平策略,是指當線程從阻塞到喚醒后,以最初請求的順序(FIFO)來添加或刪除元素;非公平策略指線程被喚醒后,誰先搶占到鎖,誰就能往隊列中添加/刪除順序,是隨機的。二、ArrayBlockingQueue原理 構造
ArrayBlockingQueue提供了三種構造器:
/** * 指定隊列初始容量的構造器. */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
/** * 指定隊列初始容量和公平/非公平策略的構造器. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 利用獨占鎖的策略 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** * 根據已有集合構造隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 這里加鎖是用于保證items數組的可見性 try { int i = 0; try { for (E e : c) { checkNotNull(e); // 不能有null元素 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 如果隊列已滿,則重置puIndex索引為0 } finally { lock.unlock(); } }
核心就是第二種構造器,從構造器也可以看出,ArrayBlockingQueue在構造時就指定了內部數組的大小,并通過ReentrantLock來保證并發環境下的線程安全。
ArrayBlockingQueue的公平/非公平策略其實就是內部ReentrantLock對象的策略,此外構造時還創建了兩個Condition對象。在隊列滿時,插入線程需要在notFull上等待;當隊列空時,刪除線程會在notEmpty上等待:
public class ArrayBlockingQueue核心方法extends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 內部數組 */ final Object[] items; /** * 下一個待刪除位置的索引: take, poll, peek, remove方法使用 */ int takeIndex; /** * 下一個待插入位置的索引: put, offer, add方法使用 */ int putIndex; /** * 隊列中的元素個數 */ int count; /** * 全局鎖 */ final ReentrantLock lock; /** * 非空條件隊列:當隊列空時,線程在該隊列等待獲取 */ private final Condition notEmpty; /** * 非滿條件隊列:當隊列滿時,線程在該隊列等待插入 */ private final Condition notFull; //... }
ArrayBlockingQueue會阻塞線程的方法一共4個:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來看插入元素的方法。
插入元素——put(E e)
插入元素的邏輯很簡單,用ReentrantLock來保證線程安全,當隊列滿時,則調用線程會在notFull條件隊列上等待,否則就調用enqueue方法入隊。
/** * 在隊尾插入指定元素,如果隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖 try { while (count == items.length) // 隊列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待 enqueue(e); // 隊列未滿, 直接入隊 } finally { lock.unlock(); } }
這里需要注意一點,隊列已滿的時候,是通過while循環判斷的,這其實是多線程設計模式中的Guarded Suspension模式:
while (count == items.length) // 隊列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待
之所以這樣做,是防止線程被意外喚醒,不經再次判斷就直接調用enqueue方法。
enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引為0 putIndex = 0; count++; // 元素個數+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(可以來隊列取元素了) }
刪除元素——take()
刪除元素的邏輯和插入元素類似,區別就是:刪除元素時,如果隊列空了,則線程需要在notEmpty條件隊列上等待。
/** * 從隊首刪除一個元素, 如果隊列為空, 則阻塞線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 隊列為空, 則線程在notEmpty條件隊列等待 notEmpty.await(); return dequeue(); // 隊列非空,則出隊一個元素 } finally { lock.unlock(); } }
隊列非空時,調用dequeue方法出隊一個元素:
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) // 如果隊列已空 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 喚醒一個notFull上的等待線程(可以插入元素到隊列了) return x; }環形隊列
從上面的入隊/出隊操作,可以看出,ArrayBlockingQueue的內部數組其實是一種環形結構。
假設ArrayBlockingQueue的容量大小為6,我們來看下整個入隊過程:
①初始時
②插入元素“9”
③插入元素“2”、“10”、“25”、“93”
④插入元素“90”
注意,此時再插入一個元素“90”,則putIndex變成6,等于隊列容量6,由于是循環隊列,所以會將tableIndex重置為0:
這是隊列已經滿了(count==6),如果再有線程嘗試插入元素,并不會覆蓋原有值,而是被阻塞。
我們再來看下出隊過程:
①出隊元素“9”
②出隊元素“2”、“10”、“25”、“93”
③出隊元素“90”
注意,此時再出隊一個元素“90”,則tabeIndex變成6,等于隊列容量6,由于是循環隊列,所以會將tableIndex重置為0:
這是隊列已經空了(count==0),如果再有線程嘗試出隊元素,則會被阻塞。
三、總結ArrayBlockingQueue利用了ReentrantLock來保證線程的安全性,針對隊列的修改都需要加全局鎖。在一般的應用場景下已經足夠。對于超高并發的環境,由于生產者-消息者共用一把鎖,可能出現性能瓶頸。
另外,由于ArrayBlockingQueue是有界的,且在初始時指定隊列大小,所以如果初始時需要限定消息隊列的大小,則ArrayBlockingQueue 比較合適。后續,我們會介紹另一種基于單鏈表實現的阻塞隊列——LinkedBlockingQueue,該隊列的最大特點是使用了“兩把鎖”,以提升吞吐量。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77023.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
閱讀 3303·2021-11-24 09:39
閱讀 2815·2021-10-12 10:20
閱讀 1918·2019-08-30 15:53
閱讀 3083·2019-08-30 14:14
閱讀 2612·2019-08-29 15:36
閱讀 1130·2019-08-29 14:11
閱讀 1961·2019-08-26 13:51
閱讀 3415·2019-08-26 13:23