摘要:在上一篇文章從到實現自己的阻塞隊列上中,我們已經實現了一個可以使用的阻塞隊列版本。插入鎖隊列未滿的條件變量彈出鎖隊列非空的條件變量最后我們要對和方法中的調用做出一些調整。
在上一篇文章《從0到1實現自己的阻塞隊列(上)》中,我們已經實現了一個可以使用的阻塞隊列版本。在這篇文章中,我們可以繼續我們的冒險之旅,將我們的阻塞隊列提升到接近JDK版本的水平上。
更進一步優化效率我們一直使用的都是Object.notifyAll()或者condition.signalAll()這樣會喚醒所有線程的方法,那么如果只有一個線程能夠順利執行,但是其他線程都要再次回到等待狀態繼續休眠,那不是非常的浪費嗎?比如如果有N個消費者線程在等待隊列中出現元素,那么當一個元素被插入以后所有N個消費者線程都被全部喚醒,最后也只會有一個消費者線程能夠真正拿到元素并執行完成,其他線程不是都被白白喚醒了嗎?我們為什么不用只會喚醒一個線程的Object.notify()和condition.signal()方法呢?
拆分條件變量在阻塞隊列中,我們可以使用Object.notify()或者condition.signal()這樣只喚醒一個線程的方法,但是會有一些前提條件:
首先,在一個條件變量上等待的線程必須是同一類型的。比如一個條件變量上只有消費者線程在等待,另一個條件變量上只有生產者線程在等待。這么做的目的就是防止發生我們在插入時想喚醒的是消費者線程,但是喚醒了一個生產者線程,這個生產者線程又因為隊列已滿又進入了等待狀態,這樣我們需要喚醒的消費者線程就永遠不會被喚醒了。
另外還有一點就是這個條件變量上等待的線程只能互斥執行,如果N個生產者線程可以同時執行,我們也就不需要一個一個喚醒了,這樣反而會讓效率降低。當然,在我們的阻塞隊列當中,不管是插入還是彈出操作同一時間都只能有一個線程在執行,所以自然就滿足這個要求了。
所以,我們只需要滿足第一個要求讓不同類型的線程在不同的條件變量上等待就可以了。那么具體要怎么做呢?
首先,我們自然是要把原來的一個條件變量condition給拆分成兩個實例變量notFull和notEmpty,這兩個條件變量雖然對應于同一互斥鎖,但是兩個條件變量的等待和喚醒操作是完全隔離的。這兩個條件變量分別代表隊列未滿和隊列非空兩個條件,消費者線程因為是被隊列為空的情況所阻塞的,所以就應該等待隊列非空條件得到滿足;而生產者線程因為是被隊列已滿的情況所阻塞的,自然就要等待隊列未滿條件的成立。
/** 隊列未滿的條件變量 */ private final Condition notFull = lock.newCondition(); /** 隊列非空的條件變量 */ private final Condition notEmpty = lock.newCondition();
所以在put()和take()方法中,我們就需要把take()方法中原來的condition.await()修改為等待隊列非空條件,即notEmpty.await();而put()方法中的condition.await()自然是要修改為等待隊列未滿條件成立,即notFull.await()。既然我們把等待條件變量的語句都改了,那么喚醒的語句也要做同樣的修改,put()操作要喚醒等待的消費者線程,所以是notEmpty.signal();take()操作要喚醒的生產者線程,所以是notFull.signal()。修改完成后的代碼如下,大家可以參考一下:
/** * 將指定元素插入隊列 * * @param e 待插入的對象 */ public void put(Object e) throws InterruptedException { lock.lockInterruptibly(); try { while (count == items.length) { // 隊列已滿時進入休眠 // 等待隊列未滿條件得到滿足 notFull.await(); } // 執行入隊操作,將對象e實際放入隊列中 enqueue(e); // 插入元素后喚醒一個等待隊列非空條件成立的線程 notEmpty.signal(); } finally { lock.unlock(); } } /** * 從隊列中彈出一個元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { lock.lockInterruptibly(); try { while (count == 0) { // 隊列為空時進入休眠 // 等待隊列非空條件得到滿足 notEmpty.await(); } // 執行出隊操作,將隊列中的第一個元素彈出 Object e = dequeue(); // 彈出元素后喚醒一個等待隊列未滿條件成立的線程 notFull.signal(); return e; } finally { lock.unlock(); } }驗證程序的效率
既然我們對阻塞隊列做了效率上的改進,那么就讓我們來實際檢驗一下吧。我們還是之前已經提供的檢驗程序,但是不同的是,為了明顯地看出效率上的變化,我們需要修改一下程序中的兩個變量。首先,我們需要把檢驗程序中運行的線程數threads增加到400,然后我們需要把每個線程執行的次數改為100次,就像下面這樣:
// 創建400個線程 final int threads = 400; // 每個線程執行100次 final int times = 100;
最后我們分別使用改進前和改進后的版本來執行這個這個阻塞隊列,在我的電腦上,改進前的版本耗時為7.80s,改進后的版本耗時為1.35s。看起來我們對阻塞隊列的效率做了一個非常大的提升,非常好,那我們還有沒有辦法再加快一點呢?
還能不能更快?在上面的阻塞隊列實現中,我們主要使用的就是put()和take()兩個操作。而因為有互斥鎖ReentrantLock的保護,所以這兩個方法在同一時間只能有一個線程調用。也就是說,生產者線程在操作隊列時同樣會阻塞消費者線程。不過從我們的代碼中看,實際上put()方法和take()方法之間需要有互斥鎖保護的共享數據訪問只發生在入隊操作enqueue方法和出隊操作dequeue方法之中。在這兩個方法里,對于putIndex和takeIndex的訪問是完全隔離的,enqueue只使用putIndex,而dequeue只使用takeIndex,那么線程間的競爭性數據就只剩下count了。這樣的話,如果我們能解決count的更新問題是不是就可以把鎖lock拆分為兩個互斥鎖,分別讓生產者線程和消費者線程使用了呢?這樣的話生產者線程在操作時就只會阻塞生產者線程而不會阻塞消費者線程了,消費者線程也是一樣的道理。
拆分鎖這時候就要請出我們很熟悉的一種同步工具CAS了,CAS是一個原子操作,它會接收兩個參數,一個是當前值,一個是目標值,如果當前值已經發生了改變,那么就會返回失敗,而如果當前值沒有變化,就會將這個變量修改為目標值。在Java中,我們一般會通過java.util.concurrent中的AtomicInteger來執行CAS操作。在AtomicInteger類上有原子性的增加與減少方法,每次調用都可以保證對指定的對象進行增加或減少,并且即使有多個線程同時執行這些操作,它們的結果也仍然是正確的。
首先,為了保證入隊和出隊操作之間的互斥特性移除后兩個方法能夠并發執行,那么我們就要保證對count的更新是線程安全的。因此,我們首先需要把實例變量count的類型從int修改為AtomicInteger,而AtomicInteger類就提供了我們需要的原子性的增加與減少接口。
/** 隊列中的元素總數 */ private AtomicInteger count = new AtomicInteger(0);
然后對應地,我們需要將入隊方法中的count++和出隊方法中的count--分別改為Atomic原子性的加1方法getAndIncrement與減1方法getAndDecrement。
/** * 入隊操作 * * @param e 待插入的對象 */ private void enqueue(Object e) { // 將對象e放入putIndex指向的位置 items[putIndex] = e; // putIndex向后移一位,如果已到末尾則返回隊列開頭(位置0) if (++putIndex == items.length) putIndex = 0; // 增加元素總數 count.getAndIncrement(); } /** * 出隊操作 * * @return 被彈出的元素 */ private Object dequeue() { // 取出takeIndex指向位置中的元素 // 并將該位置清空 Object e = items[takeIndex]; items[takeIndex] = null; // takeIndex向后移一位,如果已到末尾則返回隊列開頭(位置0) if (++takeIndex == items.length) takeIndex = 0; // 減少元素總數 count.getAndDecrement(); // 返回之前代碼中取出的元素e return e; }
到這里,我們就已經解決了put()和take()方法之間的數據競爭問題,兩個方法現在就可以分別用兩個鎖來控制了。雖然相同類型的線程仍然是互斥的,例如生產者和生產者之間同一時間只能有一個生產者線程在操作隊列。但是在生產者線程和消費者線程之間將不用再繼續互斥,一個生產者線程和一個消費者線程可以在同一時間操作同一阻塞隊列了。所以,我們在這里可以將互斥鎖lock拆為兩個,分別保證生產者線程和消費者線程的互斥性,我們將它們命名為插入鎖putLock和彈出鎖takeLock。同時,原來的條件變量也要分別對應于不同的互斥鎖了,notFull要對應于putLock,因為插入元素的生產者線程需要等待隊列未滿條件,那么notEmpyt自然就要對應于takeLock了。
/** 插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 隊列未滿的條件變量 */ private final Condition notFull = putLock.newCondition(); /** 彈出鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊列非空的條件變量 */ private final Condition notEmpty = takeLock.newCondition();
最后我們要對put()和take()方法中的signal()調用做出一些調整。因為在上文中提到的,在使用條件變量時一定要先持有條件變量所對應的互斥鎖,而在put()和take()方法中,使用signal()方法喚醒的都是另一種類型的線程,例如生產者線程喚醒消費者,消費者線程喚醒生產者。這樣我們調用signal()方法的條件變量就和try語句中持有的鎖不一致了,所以我們必須將直接的xxx.signal()調用替換為一個私有方法調用。而在私有方法中,我們會先獲取與條件變量對應的鎖,然后再調用條件變量的signal()方法。比如在下面的signalNotEmpty()方法中,我們就要先獲取takeLock才能調用notEmpty.signal();而在signalNotFull()方法中,我們就要先獲取putLock才能調用notFull.signal()。
/** * 喚醒等待隊列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock takeLock.lock(); try { // 喚醒一個等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 喚醒等待隊列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock putLock.lock(); try { // 喚醒一個等待隊列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } }解決死鎖問題
但直接把notFull.signal()換成signalNotFull(),把notEmpty.signal()換成signalNotEmpty()還不夠,因為在我們的代碼中,原來的notFull.signal()和notEmpty.signal()都是在持有鎖的try語句塊當中的。一旦我們做了調用私有方法的替換,那么put()和take()方法就會以相反的順序同時獲取putLock和takeLock兩個鎖。有一些讀者可能已經意識到這樣會產生死鎖問題了,那么我們應該怎么解決它呢?
最好的方法就是不要同時加兩個鎖,我們完全可以在釋放前一個之后再使用signal()方法來喚醒另一種類型的線程。就像下面的put()與take()方法中所做的一樣,我們可以在執行完入隊操作之后就釋放插入鎖putLock,然后才運行signalNotEmpty()方法去獲取takeLock并調用與其對應的條件變量notEmpty的signal()方法,在take()方法中也是一樣的道理。
/** * 將指定元素插入隊列 * * @param e 待插入的對象 */ public void put(Object e) throws InterruptedException { putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊列已滿時進入休眠 // 等待隊列未滿條件得到滿足 notFull.await(); } // 執行入隊操作,將對象e實際放入隊列中 enqueue(e); } finally { putLock.unlock(); } // 喚醒等待隊列非空條件的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock signalNotEmpty(); } /** * 從隊列中彈出一個元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列為空時進入休眠 // 等待隊列非空條件得到滿足 notEmpty.await(); } // 執行出隊操作,將隊列中的第一個元素彈出 e = dequeue(); } finally { takeLock.unlock(); } // 喚醒等待隊列未滿條件的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock signalNotFull(); return e; }
到了這里我們就順利地把原來單一的一個lock鎖拆分為了插入鎖putLock和takeLock,這樣生產者線程和消費者線程就可以同時運行了。
最后的細節優化——優化喚醒其他線程的效率啊?我們的阻塞隊列到了這里還能再繼續優化嗎?其實我們做的優化已經足夠多了,基本上影響比較大的優化我們都做了,但是還有一些細節是可以最后完善一下的。比如說如果隊列并沒有為空或者已滿時,我們插入或者彈出了元素其實都是不需要喚醒任何線程的,多余的喚醒操作需要先獲取ReentrantLock鎖才能調用對應的條件變量的signal()方法,而獲取鎖是一個成本比較大的操作。所以我們最好是能在隊列真的為空或者已滿以后,成功插入或彈出元素時,再去獲取鎖并喚醒等待的線程。
也就是說我們會將signalNotEmpty();修改為if (c == 0) signalNotEmpty();,而把signalNotFull();修改為if (c == items.length) signalNotFull();,也就是只有在必要的時候才去喚醒另一種類型的線程。但是這種修改又會引入另外一種問題,例如有N個消費者線程在等待隊列非空,這時有兩個生產者線程插入了兩個元素,但是這兩個插入操作是連續發生的,也就是說只有第一個生產者線程在插入元素之后調用了signalNotEmpty(),第二個線程看到隊列原本是非空的就不會調用喚醒方法。在這種情況下,實際就只有一個消費者線程被喚醒了,而實際上隊列中還有一個元素可供消費。那么我們如何解決這個問題呢?
比較簡單的一種方法就是,生產者線程和消費者線程不止會喚醒另一種類型的線程,而且也會喚醒同類型的線程。比如在生產者線程中如果插入元素之后發現隊列還未滿,那么就可以調用notFull.signal()方法來喚醒其他可能存在的等待狀態的生產者線程,對于消費者線程所使用的take()方法也是類似的處理方式。相對來說signal方法較低,而互斥鎖的lock方法成本較高,而且會影響到另一種類型線程的運行。所以通過這種方式盡可能地少調用signalNotEmpty()和signalNotFull()方法會是一種還不錯的優化手段。
優化后的put()和take()方法如下:
/** * 將指定元素插入隊列 * * @param e 待插入的對象 */ public void put(Object e) throws InterruptedException { int c = -1; putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊列已滿時進入休眠 // 等待隊列未滿條件得到滿足 notFull.await(); } // 執行入隊操作,將對象e實際放入隊列中 enqueue(e); // 增加元素總數 c = count.getAndIncrement(); // 如果在插入后隊列仍然沒滿,則喚醒其他等待插入的線程 if (c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // 如果插入之前隊列為空,才喚醒等待彈出元素的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock if (c == 0) signalNotEmpty(); } /** * 從隊列中彈出一個元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; int c = -1; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列為空時進入休眠 // 等待隊列非空條件得到滿足 notEmpty.await(); } // 執行出隊操作,將隊列中的第一個元素彈出 e = dequeue(); // 減少元素總數 c = count.getAndDecrement(); // 如果隊列在彈出一個元素后仍然非空,則喚醒其他等待隊列非空的線程 if (c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock if (c == items.length) signalNotFull(); return e; }成品出爐!
恭喜大家,經過一番漫長的探索,我們終于徹底完成了我們的阻塞隊列實現之旅。如果你能堅持到這里,我相信你已經對多線程編程的實踐方法有了非常深刻的理解。最后讓我們來看一看我們最終完成的成品代碼——一個完整的阻塞隊列實現吧。
完整的阻塞隊列代碼public class BlockingQueue { /** 存放元素的數組 */ private final Object[] items; /** 彈出元素的位置 */ private int takeIndex; /** 插入元素的位置 */ private int putIndex; /** 隊列中的元素總數 */ private AtomicInteger count = new AtomicInteger(0); /** 插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 隊列未滿的條件變量 */ private final Condition notFull = putLock.newCondition(); /** 彈出鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊列非空的條件變量 */ private final Condition notEmpty = takeLock.newCondition(); /** * 指定隊列大小的構造器 * * @param capacity 隊列大小 */ public BlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); items = new Object[capacity]; } /** * 入隊操作 * * @param e 待插入的對象 */ private void enqueue(Object e) { // 將對象e放入putIndex指向的位置 items[putIndex] = e; // putIndex向后移一位,如果已到末尾則返回隊列開頭(位置0) if (++putIndex == items.length) putIndex = 0; } /** * 出隊操作 * * @return 被彈出的元素 */ private Object dequeue() { // 取出takeIndex指向位置中的元素 // 并將該位置清空 Object e = items[takeIndex]; items[takeIndex] = null; // takeIndex向后移一位,如果已到末尾則返回隊列開頭(位置0) if (++takeIndex == items.length) takeIndex = 0; // 返回之前代碼中取出的元素e return e; } /** * 將指定元素插入隊列 * * @param e 待插入的對象 */ public void put(Object e) throws InterruptedException { int c = -1; putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊列已滿時進入休眠 // 等待隊列未滿條件得到滿足 notFull.await(); } // 執行入隊操作,將對象e實際放入隊列中 enqueue(e); // 增加元素總數 c = count.getAndIncrement(); // 如果在插入后隊列仍然沒滿,則喚醒其他等待插入的線程 if (c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // 如果插入之前隊列為空,才喚醒等待彈出元素的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock if (c == 0) signalNotEmpty(); } /** * 喚醒等待隊列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊列非空條件的線程,需要先獲取對應的takeLock takeLock.lock(); try { // 喚醒一個等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 從隊列中彈出一個元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; int c = -1; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列為空時進入休眠 // 等待隊列非空條件得到滿足 notEmpty.await(); } // 執行出隊操作,將隊列中的第一個元素彈出 e = dequeue(); // 減少元素總數 c = count.getAndDecrement(); // 如果隊列在彈出一個元素后仍然非空,則喚醒其他等待隊列非空的線程 if (c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // 只有在彈出之前隊列已滿的情況下才喚醒等待插入元素的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock if (c == items.length) signalNotFull(); return e; } /** * 喚醒等待隊列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊列未滿條件的線程,需要先獲取對應的putLock putLock.lock(); try { // 喚醒一個等待隊列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } } }
有興趣的讀者可以把我們完成的這個阻塞隊列類和JDK中的java.util.concurrent.LinkedBlockingQueue類做一個比較,相信大家可以發現這兩個類非常的相似,這足以說明我們費勁千辛萬苦實現的這個阻塞隊列類已經非常接近JDK中的阻塞隊列類的質量了。
總結恭喜大家終于完整地讀完了這篇文章!在這篇文章中,我們從一個最簡單的阻塞隊列版本開始,一路解決了各種問題,最終得到了一個完整、高質量的阻塞隊列實現。我們一起來回憶一下我們解決的問題吧。從最簡單的阻塞隊列開始,我們首先用互斥鎖synchronized關鍵字解決了并發控制問題,保證了隊列在多線程訪問情況下的正確性。然后我們用條件變量Object.wati()、Object.notifyAll()解決了休眠喚醒問題,使隊列的效率得到了飛躍性地提高。為了保障隊列的安全性,不讓外部代碼可以訪問到我們所使用的對象鎖和條件變量,所以我們使用了顯式鎖ReentrantLock,并通過鎖對象lock的newCondition()方法創建了與其相對應的條件變量對象。最后,我們對隊列中的條件變量和互斥鎖分別做了拆分,使隊列的效率得到了進一步的提高。當然,最后我們還加上了一點對喚醒操作的有條件調用優化,使整個阻塞隊列的實現變得更加完善。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74358.html
摘要:而且在大多數經典的多線程編程資料中,阻塞隊列都是其中非常重要的一個實踐案例。甚至可以說只有自己動手實現了一個阻塞隊列才能真正掌握多線程相關的。為什么會發生這種情況呢原因就是在我們實現的這個阻塞隊列中完全沒有線程同步機制,所以同時并發進行的個 阻塞隊列不止是一道熱門的面試題,同時也是許多并發處理模型的基礎,比如常用的線程池類ThreadPoolExecutor內部就使用了阻塞隊列來保存等...
摘要:實現阻塞隊列在自己實現之前先搞清楚阻塞隊列的幾個特點基本隊列特性先進先出。消費隊列空時會阻塞直到寫入線程寫入了隊列數據后喚醒消費線程。 實現Java 阻塞隊列 在自己實現之前先搞清楚阻塞隊列的幾個特點:基本隊列特性:先進先出。寫入隊列空間不可用時會阻塞。獲取隊列數據時當隊列為空時將阻塞。 實現隊列的方式多種,總的來說就是數組和鏈表;其實我們只需要搞清楚其中一個即可,不同的特性主要表現為...
摘要:自己實現在自己實現之前先搞清楚阻塞隊列的幾個特點基本隊列特性先進先出。消費隊列空時會阻塞直到寫入線程寫入了隊列數據后喚醒消費線程。最終的隊列大小為,可見線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長一段時間以來我都發現不少開發者對 jdk 中的 J.U.C(java.util.c...
摘要:最后,我們會通過對源代碼的剖析深入了解線程池的運行過程和具體設計,真正達到知其然而知其所以然的水平。創建線程池既然線程池是一個類,那么最直接的使用方法一定是一個類的對象,例如。單線程線程池單線程線程 我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,...
閱讀 1313·2021-11-22 09:34
閱讀 2175·2021-10-08 10:18
閱讀 1737·2021-09-29 09:35
閱讀 2467·2019-08-29 17:20
閱讀 2148·2019-08-29 15:36
閱讀 3410·2019-08-29 13:52
閱讀 789·2019-08-29 12:29
閱讀 1195·2019-08-28 18:10