摘要:自己實現在自己實現之前先搞清楚阻塞隊列的幾個特點基本隊列特性先進先出。消費隊列空時會阻塞直到寫入線程寫入了隊列數據后喚醒消費線程。最終的隊列大小為,可見線程也是安全的。
前言
較長一段時間以來我都發現不少開發者對 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并發包的使用甚少,更別談對它的理解了;但這卻也是我們進階的必備關卡。
之前或多或少也分享過相關內容,但都不成體系;于是便想整理一套與并發包相關的系列文章。
其中的內容主要包含以下幾個部分:
根據定義自己實現一個并發工具。
JDK 的標準實現。
實踐案例。
基于這三點我相信大家對這部分內容不至于一問三不知。
既然開了一個新坑,就不想做的太差;所以我打算將這個列表下的大部分類都講到。
所以本次重點討論 ArrayBlockingQueue。
自己實現在自己實現之前先搞清楚阻塞隊列的幾個特點:
基本隊列特性:先進先出。
寫入隊列空間不可用時會阻塞。
獲取隊列數據時當隊列為空時將阻塞。
實現隊列的方式多種,總的來說就是數組和鏈表;其實我們只需要搞清楚其中一個即可,不同的特性主要表現為數組和鏈表的區別。
這里的 ArrayBlockingQueue 看名字很明顯是由數組實現。
我們先根據它這三個特性嘗試自己實現試試。
初始化隊列我這里自定義了一個類:ArrayQueue,它的構造函數如下:
public ArrayQueue(int size) { items = new Object[size]; }
很明顯這里的 items 就是存放數據的數組;在初始化時需要根據大小創建數組。
寫入隊列寫入隊列比較簡單,只需要依次把數據存放到這個數組中即可,如下圖:
但還是有幾個需要注意的點:
隊列滿的時候,寫入的線程需要被阻塞。
寫入過隊列的數量大于隊列大小時需要從第一個下標開始寫。
先看第一個隊列滿的時候,寫入的線程需要被阻塞,先來考慮下如何才能使一個線程被阻塞,看起來的表象線程卡住啥事也做不了。
有幾種方案可以實現這個效果:
Thread.sleep(timeout)線程休眠。
object.wait() 讓線程進入 waiting 狀態。
當然還有一些 join、LockSupport.part 等不在本次的討論范圍。
阻塞隊列還有一個非常重要的特性是:當隊列空間可用時(取出隊列),寫入線程需要被喚醒讓數據可以寫入進去。
所以很明顯Thread.sleep(timeout)不合適,它在到達超時時間之后便會繼續運行;達不到空間可用時才喚醒繼續運行這個特點。
其實這樣的一個特點很容易讓我們想到 Java 的等待通知機制來實現線程間通信;更多線程見通信的方案可以參考這里:深入理解線程通信
所以我這里的做法是,一旦隊列滿時就將寫入線程調用 object.wait() 進入 waiting 狀態,直到空間可用時再進行喚醒。
/** * 隊列滿時的阻塞鎖 */ private Object full = new Object(); /** * 隊列空時的阻塞鎖 */ private Object empty = new Object();
所以這里聲明了兩個對象用于隊列滿、空情況下的互相通知作用。
在寫入數據成功后需要使用 empty.notify(),這樣的目的是當獲取隊列為空時,一旦寫入數據成功就可以把消費隊列的線程喚醒。
這里的 wait 和 notify 操作都需要對各自的對象使用 synchronized 方法塊,這是因為 wait 和 notify 都需要獲取到各自的鎖。消費隊列
上文也提到了:當隊列為空時,獲取隊列的線程需要被阻塞,直到隊列中有數據時才被喚醒。
代碼和寫入的非常類似,也很好理解;只是這里的等待、喚醒恰好是相反的,通過下面這張圖可以很好理解:
總的來說就是:
寫入隊列滿時會阻塞直到獲取線程消費了隊列數據后喚醒寫入線程。
消費隊列空時會阻塞直到寫入線程寫入了隊列數據后喚醒消費線程。
測試先來一個基本的測試:單線程的寫入和消費。
3 123 1234 12345
通過結果來看沒什么問題。
當寫入的數據超過隊列的大小時,就只能消費之后才能接著寫入。
2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123 2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3 2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345 2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456
從運行結果也能看出只有當消費數據后才能接著往隊列里寫入數據。
而當沒有消費時,再往隊列里寫數據則會導致寫入線程被阻塞。
并發測試三個線程并發寫入300條數據,其中一個線程消費一條。
=====0 299
最終的隊列大小為 299,可見線程也是安全的。
由于不管是寫入還是獲取方法里的操作都需要獲取鎖才能操作,所以整個隊列是線程安全的。ArrayBlockingQueue
下面來看看 JDK 標準的 ArrayBlockingQueue 的實現,有了上面的基礎會更好理解。
初始化隊列看似要復雜些,但其實逐步拆分后也很好理解:
第一步其實和我們自己寫的一樣,初始化一個隊列大小的數組。
第二步初始化了一個重入鎖,這里其實就和我們之前使用的 synchronized 作用一致的;
只是這里在初始化重入鎖的時候默認是非公平鎖,當然也可以指定為 true 使用公平鎖;這樣就會按照隊列的順序進行寫入和消費。
更多關于 ReentrantLock 的使用和原理請參考這里:ReentrantLock 實現原理
三四兩步則是創建了 notEmpty notFull 這兩個條件,他的作用于用法和之前使用的 object.wait/notify 類似。
這就是整個初始化的內容,其實和我們自己實現的非常類似。
寫入隊列其實會發現阻塞寫入的原理都是差不多的,只是這里使用的是 Lock 來顯式獲取和釋放鎖。
同時其中的 notFull.await();notEmpty.signal(); 和我們之前使用的 object.wait/notify 的用法和作用也是一樣的。
當然它還是實現了超時阻塞的 API。
也是比較簡單,使用了一個具有超時時間的等待方法。
消費隊列再看消費隊列:
也是差不多的,一看就懂。
而其中的超時 API 也是使用了 notEmpty.awaitNanos(nanos) 來實現超時返回的,就不具體說了。
實際案例說了這么多,來看一個隊列的實際案例吧。
背景是這樣的:
有一個定時任務會按照一定的間隔時間從數據庫中讀取一批數據,需要對這些數據做校驗同時調用一個遠程接口。
簡單的做法就是由這個定時任務的線程去完成讀取數據、消息校驗、調用接口等整個全流程;但這樣會有一個問題:
假設調用外部接口出現了異常、網絡不穩導致耗時增加就會造成整個任務的效率降低,因為他都是串行會互相影響。
所以我們改進了方案:
其實就是一個典型的生產者消費者模型:
生產線程從數據庫中讀取消息丟到隊列里。
消費線程從隊列里獲取數據做業務邏輯。
這樣兩個線程就可以通過這個隊列來進行解耦,互相不影響,同時這個隊列也能起到緩沖的作用。
但在使用過程中也有一些小細節值得注意。
因為這個外部接口是支持批量執行的,所以在消費線程取出數據后會在內存中做一個累加,一旦達到閾值或者是累計了一個時間段便將這批累計的數據處理掉。
但由于開發者的大意,在消費的時候使用的是 queue.take() 這個阻塞的 API;正常運行沒啥問題。
可一旦原始的數據源,也就是 DB 中沒數據了,導致隊列里的數據也被消費完后這個消費線程便會被阻塞。
這樣上一輪積累在內存中的數據便一直沒機會使用,直到數據源又有數據了,一旦中間間隔較長時便可能會導致嚴重的業務異常。
所以我們最好是使用 queue.poll(timeout) 這樣帶超時時間的 api,除非業務上有明確的要求需要阻塞。
這個習慣同樣適用于其他場景,比如調用 http、rpc 接口等都需要設置合理的超時時間。
總結關于 ArrayBlockingQueue 的相關分享便到此結束,接著會繼續更新其他并發容器及并發工具。
對本文有任何相關問題都可以留言討論。
本文涉及到的所有源碼:
https://github.com/crossoverJ...
你的點贊與分享是對我最大的支持
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77511.html
摘要:所以也很容易想到可以利用等待通知機制來實現,和上文的并發包入坑指北之阻塞隊列的類似。 showImg(https://segmentfault.com/img/remote/1460000019021474?w=2785&h=2785); 前言 在面試過程中聊到并發相關的內容時,不少面試官都喜歡問這類問題: 當 N 個線程同時完成某項任務時,如何知道他們都已經執行完畢了。 這也是本次討...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:最近業務需要抽離,抽離出來的應用需要做成第三方包的形式,可以在任何也沒那么神奇,例如有些版本就沒測試版本項目中,直接安裝使用,所以這里還是需要發包到。第一次發包我是先發到環境,看下發包還是不是符合我的預期,畢竟很長時間沒發過包。 最近業務需要抽離,抽離出來的應用需要做成 Django 第三方包的形式,可以在任何 Django(也沒那么神奇,例如有些版本就沒測試)版本項目中,直接安裝使用...
摘要:一和并發包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于的使用。寫鎖與讀鎖之間互斥,一個線程在寫時,不允許讀操作。的注意事項不支持重入,即不可反復獲取同一把鎖。沒有返回值,也就是說無法獲取執行結果。 一、Lock 和 Condition Java 并發包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于 synchron...
閱讀 2021·2021-09-30 09:53
閱讀 1860·2021-09-24 09:48
閱讀 1768·2019-08-30 14:01
閱讀 2180·2019-08-29 18:35
閱讀 1259·2019-08-26 18:27
閱讀 2993·2019-08-26 12:12
閱讀 960·2019-08-23 17:16
閱讀 954·2019-08-23 15:31