摘要:如何優雅的使用和理解線程池線程池中你不容錯過的一些細節由于篇幅限制,本次可能會分為上下兩篇。不接受新的任務,同時等待現有任務執行完畢后退出線程池。慎用方法關閉線程池,會導致任務丟失除非業務允許。
前言
原以為線程池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個線程池來更加深入的了解它;但在動手寫的過程中落地到細節時發現并沒想的那么容易。結合源碼對比后確實不得不佩服 Doug Lea 。
我覺得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源碼時都是看一個大概,因為其中涉及到了許多細節處理,還有部分 AQS 的內容,所以想要理清楚具體細節并不是那么容易。
與其挨個分析源碼不如自己實現一個簡版,當然簡版并不意味著功能缺失,需要保證核心邏輯一致。
所以也是本篇文章的目的:
自己動手寫一個五臟俱全的線程池,同時會了解到線程池的工作原理,以及如何在工作中合理的利用線程池。
再開始之前建議對線程池不是很熟悉的朋友看看這幾篇:
這里我截取了部分內容,也許可以埋個伏筆(坑)。
具體請看這兩個鏈接。
如何優雅的使用和理解線程池
線程池中你不容錯過的一些細節
由于篇幅限制,本次可能會分為上下兩篇。
創建線程池現在進入正題,新建了一個 CustomThreadPool 類,它的工作原理如下:
簡單來說就是往線程池里邊丟任務,丟的任務會緩沖到隊列里;線程池里存儲的其實就是一個個的 Thread ,他們會一直不停的從剛才緩沖的隊列里獲取任務執行。
流程還是挺簡單。
先來看看我們這個自創的線程池的效果如何吧:
初始化了一個核心為3、最大線程數為5、隊列大小為 4 的線程池。
先往其中丟了 10 個任務,由于阻塞隊列的大小為 4 ,最大線程數為 5 ,所以由于隊列里緩沖不了最終會創建 5 個線程(上限)。
過段時間沒有任務提交后(sleep)則會自動縮容到三個線程(保證不會小于核心線程數)。
構造函數來看看具體是如何實現的。
下面則是這個線程池的構造函數:
會有以下幾個核心參數:
miniSize 最小線程數,等效于 ThreadPool 中的核心線程數。
maxSize 最大線程數。
keepAliveTime 線程保活時間。
workQueue 阻塞隊列。
notify 通知接口。
大致上都和 ThreadPool 中的參數相同,并且作用也是類似的。
需要注意的是其中初始化了一個 workers 成員變量:
/** * 存放線程池 */ private volatile Setworkers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }
workers 是最終存放線程池中運行的線程,在 j.u.c 源碼中是一個 HashSet 所以對他所有的操作都是需要加鎖。
我這里為了簡便起見就自己定義了一個線程安全的 Set 稱為 ConcurrentHashSet。
其實原理也非常簡單,和 HashSet 類似也是借助于 HashMap 來存放數據,利用其 key 不可重復的特性來實現 set ,只是這里的 HashMap 是用并發安全的 ConcurrentHashMap 來實現的。
這樣就能保證對它的寫入、刪除都是線程安全的。
不過由于 ConcurrentHashMap 的 size() 函數并不準確,所以我這里多帶帶利用了一個 AtomicInteger 來統計容器大小。
創建核心線程往線程池中丟一個任務的時候其實要做的事情還蠻多的,最重要的事情莫過于創建線程存放到線程池中了。
當然我們不能無限制的創建線程,不然拿線程池來就沒任何意義了。于是 miniSize maxSize 這兩個參數就有了它的意義。
但這兩個參數再哪一步的時候才起到作用呢?這就是首先需要明確的。
從這個流程圖可以看出第一步是需要判斷是否大于核心線程數,如果沒有則創建。
結合代碼可以發現在執行任務的時候會判斷是否大于核心線程數,從而創建線程。
worker.startTask() 執行任務部分放到后面分析。
這里的 miniSize 由于會在多線程場景下使用,所以也用 volatile 關鍵字來保證可見性。
隊列緩沖結合上面的流程圖,第二步自然是要判斷隊列是否可以存放任務(是否已滿)。
優先會往隊列里存放。
上至封頂一旦寫入失敗則會判斷當前線程池的大小是否大于最大線程數,如果沒有則繼續創建線程執行。
不然則執行會嘗試阻塞寫入隊列(j.u.c 會在這里執行拒絕策略)
以上的步驟和剛才那張流程圖是一樣的,這樣大家是否有看出什么坑嘛?
時刻小心從上面流程圖的這兩步可以看出會直接創建新的線程。
這個過程相對于中間直接寫入阻塞隊列的開銷是非常大的,主要有以下兩個原因:
創建線程會加鎖,雖說最終用的是 ConcurrentHashMap 的寫入函數,但依然存在加鎖的可能。
會創建新的線程,創建線程還需要調用操作系統的 API 開銷較大。
所以理想情況下我們應該避免這兩步,盡量讓丟入線程池中的任務進入阻塞隊列中。執行任務
任務是添加進來了,那是如何執行的?
在創建任務的時候提到過 worker.startTask() 函數:
/** * 添加任務,需要加鎖 * @param runnable 任務 */ private void addWorker(Runnable runnable) { Worker worker = new Worker(runnable, true); worker.startTask(); workers.add(worker); }
也就是在創建線程執行任務的時候會創建 Worker 對象,利用它的 startTask() 方法來執行任務。
所以先來看看 Worker 對象是長啥樣的:
其實他本身也是一個線程,將接收到需要執行的任務存放到成員變量 task 處。
而其中最為關鍵的則是執行任務 worker.startTask() 這一步驟。
public void startTask() { thread.start(); }
其實就是運行了 worker 線程自己,下面來看 run 方法。
第一步是將創建線程時傳過來的任務執行(task.run),接著會一直不停的從隊列里獲取任務執行,直到獲取不到新任務了。
任務執行完畢后將內置的計數器 -1 ,方便后面任務全部執行完畢進行通知。
worker 線程獲取不到任務后退出,需要將自己從線程池中釋放掉(workers.remove(this))。
從隊列里獲取任務其實 getTask 也是非常關鍵的一個方法,它封裝了從隊列中獲取任務,同時對不需要保活的線程進行回收。
很明顯,核心作用就是從隊列里獲取任務;但有兩個地方需要注意:
當線程數超過核心線程數時,在獲取任務的時候需要通過保活時間從隊列里獲取任務;一旦獲取不到任務則隊列肯定是空的,這樣返回 null 之后在上文的 run() 中就會退出這個線程;從而達到了回收線程的目的,也就是我們之前演示的效果
這里需要加鎖,加鎖的原因是這里肯定會出現并發情況,不加鎖會導致 workers.size() > miniSize 條件多次執行,從而導致線程被全部回收完畢。
關閉線程池最后來談談線程關閉的事;
還是以剛才那段測試代碼為例,如果提交任務后我們沒有關閉線程,會發現即便是任務執行完畢后程序也不會退出。
從剛才的源碼里其實也很容易看出來,不退出的原因是 Worker 線程一定還會一直阻塞在 task = workQueue.take(); 處,即便是線程縮容了也不會小于核心線程數。
通過堆棧也能證明:
恰好剩下三個線程阻塞于此處。
而關閉線程通常又有以下兩種:
立即關閉:執行關閉方法后不管現在線程池的運行狀況,直接一刀切全部停掉,這樣會導致任務丟失。
不接受新的任務,同時等待現有任務執行完畢后退出線程池。
立即關閉我們先來看第一種立即關閉:
/** * 立即關閉線程池,會造成任務丟失 */ public void shutDownNow() { isShutDown.set(true); tryClose(false); } /** * 關閉線程池 * * @param isTry true 嘗試關閉 --> 會等待所有任務執行完畢 * false 立即關閉線程池--> 任務有丟失的可能 */ private void tryClose(boolean isTry) { if (!isTry) { closeAllTask(); } else { if (isShutDown.get() && totalTask.get() == 0) { closeAllTask(); } } } /** * 關閉所有任務 */ private void closeAllTask() { for (Worker worker : workers) { //LOGGER.info("開始關閉"); worker.close(); } } public void close() { thread.interrupt(); }
很容易看出,最終就是遍歷線程池里所有的 worker 線程挨個執行他們的中斷函數。
我們來測試一下:
可以發現后面丟進去的三個任務其實是沒有被執行的。
完事后關閉而正常關閉則不一樣:
/** * 任務執行完畢后關閉線程池 */ public void shutdown() { isShutDown.set(true); tryClose(true); }
他會在這里多了一個判斷,需要所有任務都執行完畢之后才會去中斷線程。
同時在線程需要回收時都會嘗試關閉線程:
來看看實際效果:
回收線程上文或多或少提到了線程回收的事情,其實總結就是以下兩點:
一旦執行了 shutdown/shutdownNow 方法都會將線程池的狀態置為關閉狀態,這樣只要 worker 線程嘗試從隊列里獲取任務時就會直接返回空,導致 worker 線程被回收。
一旦線程池大小超過了核心線程數就會使用保活時間來從隊列里獲取任務,所以一旦獲取不到返回 null 時就會觸發回收。
但如果我們的隊列足夠大,導致線程數都不會超過核心線程數,這樣是不會觸發回收的。
比如這里我將隊列大小調為 10 ,這樣任務就會累計在隊列里,不會創建五個 worker 線程。
所以一直都是 Thread-1~3 這三個線程在反復調度任務。
總結本次實現了線程池里大部分核心功能,我相信只要看完并動手敲一遍一定會對線程池有不一樣的理解。
結合目前的內容來總結下:
線程池、隊列大小要設計的合理,盡量的讓任務從隊列中獲取執行。
慎用 shutdownNow() 方法關閉線程池,會導致任務丟失(除非業務允許)。
如果任務多,線程執行時間短可以調大 keepalive 值,使得線程盡量不被回收從而可以復用線程。
同時下次會分享一些線程池的新特性,如:
執行帶有返回值的線程。
異常處理怎么辦?
所有任務執行完怎么通知我?
本文所有源碼:
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java
你的點贊與分享是對我最大的支持
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74567.html
摘要:前言前段時間寫過一篇線程池沒你想的那么簡單,和大家一起擼了一個基本的線程池,具備線程池基本調度功能。線程池自動擴容縮容。回調以上就是線程池的構造函數以及接口的定義。所以我們在使用線程池時,其中的任務一定要做好異常處理。線程異常捕獲的重要性。 showImg(https://segmentfault.com/img/remote/1460000019403163?w=1904&h=108...
摘要:如何優雅的使用和理解線程池線程池中你不容錯過的一些細節由于篇幅限制,本次可能會分為上下兩篇。不接受新的任務,同時等待現有任務執行完畢后退出線程池。慎用方法關閉線程池,會導致任務丟失除非業務允許。前言 原以為線程池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個線程池來更加深入的了解它;但在動手寫的過程中落地到細節時發現并沒想的那么容易。結合源碼對比后確實不得不佩服 Doug Le...
摘要:列入全國計算機二級取代,部分城市試點,引入高中。建議通過視頻學習,這樣不但節省時間,而且效果很好。能否回憶起那個陡峭的學習曲線問題越多,學的越快。出報告每完成一個項目,總結報告,必不可少。結構化學習,才是你我需要真正培養的能力。 編程就如同你學習開車,即使,你可以一口氣,說出一輛車的全部零部件,以及內燃機進氣、壓縮、做功和排氣過程,但你就是不去練如何開車,怎么上路。你確定,你敢開嗎?你...
摘要:但我認為談不上的毛病,而是編程模型和之間的一種模式差異。相比類,更貼近編程模型,使得這種差異更加突出。聲明本文采用循序漸進的示例來解釋問題。本文假設讀者已經使用超過一個小時。這是通過組件生命周期上綁定與的組合完成的。 本文由云+社區發表作者:Dan Abramov 接觸 React Hooks 一定時間的你,也許會碰到一個神奇的問題: setInterval 用起來沒你想的簡單。 R...
閱讀 1587·2021-10-18 13:35
閱讀 2365·2021-10-09 09:44
閱讀 819·2021-10-08 10:05
閱讀 2719·2021-09-26 09:47
閱讀 3571·2021-09-22 15:22
閱讀 435·2019-08-29 12:24
閱讀 2002·2019-08-29 11:06
閱讀 2860·2019-08-26 12:23