摘要:今天在群上拋出來(lái)一個(gè)問(wèn)題,如下我以自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程通知線程的。一以可重入鎖和兩個(gè)對(duì)象來(lái)控制并發(fā)。四使用來(lái)控制并發(fā),同時(shí)也使用的對(duì)象來(lái)與線程交互。
今天在QQ群上拋出來(lái)一個(gè)問(wèn)題,如下
我以Java自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程、通知線程的。
一、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個(gè)Condition對(duì)象來(lái)控制并發(fā)。
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
構(gòu)造函數(shù)中初始化了notEmpty和notFull.
/** * Creates an ArrayBlockingQueue with the given (fixed) * capacity and the specified access policy. * @param capacity the capacity of this queue * @param fair if true then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if false the access order is unspecified. * @throws IllegalArgumentException if capacity is less than 1 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
二、線程阻塞
當(dāng)ArrayBlockingQueue存儲(chǔ)的元素是0個(gè)的時(shí)候,take()方法會(huì)阻塞.
public Object take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } Object x = extract(); return x; } finally { lock.unlock(); } }
這里take方法首先獲得可重入鎖lock,然后判斷如果元素為空就執(zhí)行notEmpty.await(); 這個(gè)時(shí)候線程掛起。
三、通知線程
比如使用put放入一個(gè)新元素,
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
在enqueue方法中,
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
對(duì)剛才的notEmptyCondition進(jìn)行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來(lái)控制并發(fā),同時(shí)也使用ArrayBlockingQueue的Condition對(duì)象來(lái)與線程交互。notEmpty和notFull都是由
ReentrantLock的成員變量sync生成的,
public Condition newCondition() { return sync.newCondition(); }
sync可以認(rèn)為是一個(gè)抽象類類型,Sync,它是在ReentrantLock內(nèi)部定義的靜態(tài)抽象類,抽象類實(shí)現(xiàn)了newCondition方法,
final ConditionObject newCondition() { return new ConditionObject(); }
返回的類型是實(shí)現(xiàn)了Condition接口的ConditionObject類,這是在AbstractQueuedSynchronizer內(nèi)部定義的類。在ArrayBlockingQueue中的notEmpty就是ConditionObject實(shí)例。
阻塞:
當(dāng)ArrayBlockingQueue為空時(shí),notEmpty.await()將自己掛起,如ConditionObject的await方法,
/** * Implements interruptible condition wait. **
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }- If current thread is interrupted, throw InterruptedException. *
- Save lock state returned by {@link #getState}. *
- Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
- Block until signalled or interrupted. *
- Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
- If interrupted while blocked in step 4, throw InterruptedException. *
addConditionWaiter是將當(dāng)前線程作為一個(gè)node加入到ConditionObject的隊(duì)列中,隊(duì)列是用鏈表實(shí)現(xiàn)的。
如果是初次加入隊(duì)列的情況,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就將線程park。
while (!isOnSyncQueue(node)) { LockSupport.park(this); .... }
至此線程被掛起,LockSupport.park(this);這里this是指ConditionObject,是notEmpty.
通知:
當(dāng)新的元素put進(jìn)入ArrayBlockingQueue后,notEmpty.signal()通知在這上面等待的線程,如ConditionObject的signal方法,
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法,
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal一開(kāi)始接收到的參數(shù)就是firstWaiter這個(gè)參數(shù),在內(nèi)部實(shí)現(xiàn)中用了do..while的形式,首先將first的的nextWaiter找出來(lái)保存到firstWaiter此時(shí)(first和firstWaiter不是一回事),在while的比較條件中可調(diào)用了transferForSignal方法,
整個(gè)while比較條件可以看著短路邏輯,如果transferForSignal結(jié)果為true,后面的first = firstWaiter就不執(zhí)行了,整個(gè)while循環(huán)就結(jié)束了。
參照注釋,看
transferForSignal方法,
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
首先確保想要被signal的等待node還是處于Node.CONDITION狀態(tài),然后調(diào)整狀態(tài)為Node.SIGNAL,這兩個(gè)都是采用CAS方法,最后調(diào)用的是
LockSupport.unpark(node.thread);
五、LockSupport
至此,我們已經(jīng)知道了線程的掛起和通知都是使用LockSupport來(lái)完成的,并發(fā)數(shù)據(jù)結(jié)構(gòu)與線程直接的交互最終也是需要LockSupport。那么關(guān)于LockSupport,我們又可以了解多少呢?
Ref:
Java中的ReentrantLock和synchronized兩種鎖定機(jī)制的對(duì)比
Java的LockSupport.park()實(shí)現(xiàn)分析
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/68083.html
摘要:線程安全的線程安全的,在讀多寫少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來(lái)的好處是在高并發(fā)的情況下,你會(huì)需要一個(gè)全局鎖來(lái)保證整個(gè)平衡樹(shù)的線程安全。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過(guò)集合框架在集合框架中大多的集合類是線程不安全的比如我們常用的等等我們寫一個(gè)例子看為什么說(shuō)是不安全的例子證明是線程不安全的我們開(kāi)啟個(gè)線程每個(gè)線程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:如果隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間,然后喚醒寫線程隊(duì)列的第一個(gè)等待線程。數(shù)據(jù)必須從某個(gè)寫線程交給某個(gè)讀線程,而不是寫到某個(gè)隊(duì)列中等待被消費(fèi)。 前言 本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...
摘要:序本文主要簡(jiǎn)單介紹下與。有界無(wú)界有界,適合已知最大存儲(chǔ)容量的場(chǎng)景可有界可以無(wú)界吞吐量在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比,但是性能不穩(wěn)定。測(cè)試結(jié)果表明,的可伸縮性要高于。 序 本文主要簡(jiǎn)單介紹下ArrayBlockingQueue與LinkedBlockingQueue。 對(duì)比 queue 阻塞與否 是否有界 線程安全保障 適用場(chǎng)景 注意事項(xiàng) ArrayBlockingQueue 阻...
摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見(jiàn)線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長(zhǎng)一段時(shí)間以來(lái)我都發(fā)現(xiàn)不少開(kāi)發(fā)者對(duì) jdk 中的 J.U.C(java.util.c...
閱讀 2251·2021-11-23 09:51
閱讀 1080·2021-11-22 15:35
閱讀 4868·2021-11-22 09:34
閱讀 1610·2021-10-08 10:13
閱讀 3026·2021-07-22 17:35
閱讀 2547·2019-08-30 15:56
閱讀 3088·2019-08-29 18:44
閱讀 3100·2019-08-29 15:32