摘要:初始狀態對應二叉樹結構將頂點與最后一個結點調換即將頂點與最后一個結點交換,然后將索引為止置。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、PriorityBlockingQueue簡介
PriorityBlockingQueue,是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基于堆實現:
PriorityBlockingQueue是一種無界阻塞隊列,在構造的時候可以指定隊列的初始容量。具有如下特點:
PriorityBlockingQueue與之前介紹的阻塞隊列最大的不同之處就是:它是一種優先級隊列,也就是說元素并不是以FIFO的方式出/入隊,而是以按照權重大小的順序出隊;
PriorityBlockingQueue是真正的無界隊列(僅受內存大小限制),它不像ArrayBlockingQueue那樣構造時必須指定最大容量,也不像LinkedBlockingQueue默認最大容量為Integer.MAX_VALUE;
由于PriorityBlockingQueue是按照元素的權重進入排序,所以隊列中的元素必須是可以比較的,也就是說元素必須實現Comparable接口;
由于PriorityBlockingQueue無界隊列,所以插入元素永遠不會阻塞線程;
PriorityBlockingQueue底層是一種基于數組實現的堆結構。
關于堆,如果讀者不了解,可以參考下我的這篇博文預熱下——優先級隊列。
注意:堆分為“大頂堆”和“小頂堆”,PriorityBlockingQueue會依據元素的比較方式選擇構建大頂堆或小頂堆。比如:如果元素是Integer這種引用類型,那么默認就是“小頂堆”,也就是每次出隊都會是當前隊列最小的元素。二、PriorityBlockingQueue原理 構造
PriorityBlockingQueue提供了四種構造器:
/** * 默認構造器. * 默認初始容量11, 以元素自然順序比較(元素必須實現Comparable接口) */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
/** * 指定初始容量的構造器. * 以元素自然順序比較(元素必須實現Comparable接口) */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }
/** * 指定初始容量和比較器的構造器. */ public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
/** * 從已有集合構造隊列. * 如果已經集合是SortedSet或者PriorityBlockingQueue, 則保持原來的元素順序 */ public PriorityBlockingQueue(Collection extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls ? if (c instanceof SortedSet>) { // 如果是有序集合 SortedSet extends E> ss = (SortedSet extends E>) c; this.comparator = (Comparator super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue>) { // 如果是優先級隊列 PriorityBlockingQueue extends E> pq = (PriorityBlockingQueue extends E>) c; this.comparator = (Comparator super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } ? Object[] a = c.toArray(); int n = a.length; if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { // 校驗是否存在null元素 for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) // 堆排序 heapify(); }
重點是第三種構造器,可以看到,PriorityBlockingQueue內部也是利用了ReentrantLock來保證并發訪問時的線程安全。
PriorityBlockingQueue如果不指定容量,默認容量為11,內部數組queue其實是一種二叉樹,后續我們會詳細介紹。
需要注意的是,PriorityBlockingQueue只有一個條件等待隊列——notEmpty,因為構造時不會限制最大容量且會自動擴容,所以插入元素并不會阻塞,僅當隊列為空時,才可能阻塞“出隊”線程。
public class PriorityBlockingQueue插入元素——put(E e)extends AbstractQueue implements BlockingQueue , java.io.Serializable { ? /** * 默認容量. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; ? /** * 最大容量. */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; ? /** * 內部堆數組, 保存實際數據, 可以看成一顆二叉樹: * 對于頂點queue[n], queue[2*n+1]表示左子結點, queue[2*(n+1)]表示右子結點. */ private transient Object[] queue; ? /** * 隊列中的元素個數. */ private transient int size; ? /** * 比較器, 如果為null, 表示以元素自身的自然順序進行比較(元素必須實現Comparable接口). */ private transient Comparator super E> comparator; ? /** * 全局鎖. */ private final ReentrantLock lock; ? /** * 當隊列為空時,出隊線程在該條件隊列上等待. */ private final Condition notEmpty; ? // ... }
PriorityBlockingQueue插入元素不會阻塞線程,put(E e)方法內部其實是調用了offer(E e)方法:
首先獲取全局鎖(對于隊列的修改都要獲取這把鎖),然后判斷下隊列是否已經滿了,如果滿了就先進行一次內部數組的擴容(關于擴容,我們后面會專門講):
/** * 向隊列中插入指定元素. * 由于隊列是無界的,所以不會阻塞線程. */ public void put(E e) { offer(e); // never need to block } ? public boolean offer(E e) { if (e == null) throw new NullPointerException(); ? final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); ? int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); ? try { Comparator super E> cmp = comparator; if (cmp == null) // 比較器為空, 則按照元素的自然順序進行堆調整 siftUpComparable(n, e, array); else // 比較器非空, 則按照比較器進行堆調整 siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 隊列元素總數+1 notEmpty.signal(); // 喚醒一個可能正在等待的"出隊線程" } finally { lock.unlock(); } return true; }
上面最關鍵的是siftUpComparable和siftUpUsingComparator方法,這兩個方法內部幾乎一樣,只不過前者是一個根據元素的自然順序比較,后者則根據外部比較器比較,我們重點看下siftUpComparable方法:
/** * 將元素x插入到array[k]的位置. * 然后按照元素的自然順序進行堆調整——"上浮",以維持"堆"有序. * 最終的結果是一個"小頂堆". */ private staticvoid siftUpComparable(int k, T x, Object[] array) { Comparable super T> key = (Comparable super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; // 相當于(k-1)除2, 就是求k結點的父結點索引parent Object e = array[parent]; if (key.compareTo((T) e) >= 0) // 如果插入的結點值大于父結點, 則退出 break; ? // 否則,交換父結點和當前結點的值 array[k] = e; k = parent; } array[k] = key; }
siftUpComparable方法的作用其實就是堆的“上浮調整”,可以把堆可以想象成一棵完全二叉樹,每次插入元素都鏈接到二叉樹的最右下方,然后將插入的元素與其父結點比較,如果父結點大,則交換元素,直到沒有父結點比插入的結點大為止。這樣就保證了堆頂(二叉樹的根結點)一定是最小的元素。(注:以上僅針對“小頂堆”)
堆的“上浮”調整我們通過示例來理解下入隊的整個過程:假設初始構造的隊列大小為6,依次插入9、2、93、10、25、90。
①初始隊列情況
②插入元素9(索引0處)
將上述數組想象成一棵完全二叉樹,其實就是下面的結構:
③插入元素2(索引1處)
對應的二叉樹:
由于結點2的父結點為9,所以要進行“上浮調整”,最終隊列結構如下:
④插入元素93(索引2處)
⑤插入元素10(索引3處)
⑥插入元素25(索引4處)
⑦插入元素90(索引5處)
此時,堆不滿足有序條件,因為“90”的父結點“93”大于它,所以需要“上浮調整”:
最終,堆的結構如上,可以看到,經過調整后,堆頂元素一定是最小的。
擴容在入隊過程中,如果隊列內部的queue數組已經滿了,就需要進行擴容:
public boolean offer(E e) { ? // ... while ((n = size) >= (cap = (array = queue).length)) // 隊列已滿, 則進行擴容 tryGrow(array, cap); ? // ... }
我們來看下tryGrow方法:
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // 擴容和入隊/出隊可以同時進行, 所以先釋放全局鎖 Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { // allocationSpinLock置1表示正在擴容 try { // 計算新的數組大小 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判斷 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; // 分配新數組 } finally { allocationSpinLock = 0; } } if (newArray == null) // 擴容失敗(可能有其它線程正在擴容,導致allocationSpinLock競爭失敗) Thread.yield(); lock.lock(); // 獲取全局鎖(因為要修改內部數組queue) if (newArray != null && queue == array) { queue = newArray; // 指向新的內部數組 System.arraycopy(array, 0, newArray, 0, oldCap); } }
上述整個過程還是比較清晰的,由于調用tryGrow的方法一定會先獲取全局鎖,所以先釋放鎖,因為可能有線程正在出隊,擴容/出隊是可以并發執行的(擴容的前半部分只是新建一個內部數組,不會對出隊產生影響)。擴容后的內部數組大小一般為原來的2倍。
上述需要注意的是allocationSpinLock字段,該字段通過CAS操作,置1表示有線程正在進行擴容。
刪除元素——take()刪除元素(出隊)的整個過程比較簡單,也是先獲取全局鎖,然后判斷隊列狀態,如果是空,則阻塞線程,否則調用dequeue方法出隊:
/** * 出隊一個元素. * 如果隊列為空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 獲取全局鎖 E result; try { while ((result = dequeue()) == null) // 隊列為空 notEmpty.await(); // 線程在noEmpty條件隊列等待 } finally { lock.unlock(); } return result; } ? private E dequeue() { int n = size - 1; // n表示出隊后的剩余元素個數 if (n < 0) // 隊列為空, 則返回null return null; else { Object[] array = queue; E result = (E) array[0]; // array[0]是堆頂結點, 每次出隊都刪除堆頂結點 E x = (E) array[n]; // array[n]是堆的最后一個結點, 也就是二叉樹的最右下結點 array[n] = null; Comparator super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
從dequeue方法可以看出,每次出隊的元素都是“堆頂結點”,對于“小頂堆”就是隊列中的最小值,對于“大頂堆”就是隊列中的最大值。
我們看下siftDownComparable方法如何實現堆頂點的刪除:
/** * 堆的"下沉"調整. * 刪除array[k]對應的結點,并重新調整堆使其有序. * * @param k 待刪除的位置 * @param x 待比較的健 * @param array 堆數組 * @param n 堆的大小 */ private staticvoid siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable super T> key = (Comparable super T>) x; int half = n >>> 1; // 相當于n除2, 即找到索引n對應結點的父結點 while (k < half) { /** * 下述代碼中: * c保存k的左右子結點中的較小結點值 * child保存較小結點對應的索引 */ int child = (k << 1) + 1; // k的左子結點 Object c = array[child]; ? int right = child + 1; // k的右子結點 if (right < n && ((Comparable super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
上述代碼其實是經典的堆“下沉”操作,對堆中某個頂點下沉,步驟如下:
找到該頂點的左右子結點中較小的那個;
與當前結點交換;
重復前2步直到當前結點沒有左右子結點或比左右子結點都小。
堆的“下沉”調整來看個示例,假設堆的初始結構如下,現在出隊一個元素(索引0位置的元素2)。
①初始狀態
對應二叉樹結構:
②將頂點與最后一個結點調換
即將頂點“2”與最后一個結點“93”交換,然后將索引5為止置null。
注意:為了提升效率(比如siftDownComparable的源碼所示)并不一定要真正交換,可以用一個變量保存索引5處的結點值,在整個下沉操作完成后再替換。但是為了理解這一過程,示例圖中全是以交換進行的。
③下沉索引0處結點
比較元素“93”和左右子結點中的最小者,發現“93”大于“9”,違反了“小頂堆”的規則,所以交換“93”和“9”,這一過程稱為siftdown(下沉):
④繼續下沉索引1處結點
比較元素“93”和左右子結點中的最小者,發現“93”大于“10”,違反了“小頂堆”的規則,所以交換“93”和“10”:
⑤比較結束
由于“93”已經沒有左右子結點了,所以下沉結束,可以看到,此時堆恢復了有序狀態,最終隊列結構如下:
三、總結PriorityBlockingQueue屬于比較特殊的阻塞隊列,適用于有元素優先級要求的場景。它的內部和ArrayBlockingQueue一樣,使用一個了全局獨占鎖來控制同時只有一個線程可以進行入隊和出隊,另外由于該隊列是無界隊列,所以入隊線程并不會阻塞。
PriorityBlockingQueue始終保證出隊的元素是優先級最高的元素,并且可以定制優先級的規則,內部通過使用堆(數組形式)來維護元素順序,它的內部數組是可擴容的,擴容和出/入隊可以并發進行。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77084.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
閱讀 1773·2021-11-11 16:55
閱讀 2575·2021-08-27 13:11
閱讀 3632·2019-08-30 15:53
閱讀 2307·2019-08-30 15:44
閱讀 1396·2019-08-30 11:20
閱讀 1045·2019-08-30 10:55
閱讀 950·2019-08-29 18:40
閱讀 3042·2019-08-29 16:13