摘要:與最大線程池比較。如果加入成功,需要二次檢查線程池的狀態(tài)如果線程池沒(méi)有處于,則從移除任務(wù),啟動(dòng)拒絕策略。如果線程池處于狀態(tài),則檢查工作線程是否為。線程池將如何工作這個(gè)問(wèn)題應(yīng)該就不難回答了。
原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/
最近在看《Java并發(fā)編程的藝術(shù)》回顧線程池的原理和參數(shù)的時(shí)候發(fā)現(xiàn)一個(gè)問(wèn)題,如果 corePoolSize = 0 且 阻塞隊(duì)列是無(wú)界的。線程池將如何工作?
我們先回顧一下書(shū)里面描述線程池execute()工作的邏輯:
如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)。
如果運(yùn)行的線程等于或多于 corePoolSize,將任務(wù)加入 BlockingQueue。
如果 BlockingQueue 內(nèi)的任務(wù)超過(guò)上限,則創(chuàng)建新的線程來(lái)處理任務(wù)。
如果創(chuàng)建的線程數(shù)是單錢運(yùn)行的線程超出 maximumPoolSize,任務(wù)將被拒絕策略拒絕。
看了這四個(gè)步驟,其實(shí)描述上是有一個(gè)漏洞的。如果核心線程數(shù)是0,阻塞隊(duì)列也是無(wú)界的,會(huì)怎樣?如果按照上文的邏輯,應(yīng)該沒(méi)有線程會(huì)被運(yùn)行,然后線程無(wú)限的增加到隊(duì)列里面。然后呢?
于是我做了一下試驗(yàn)看看到底會(huì)怎樣?
public class threadTest { private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); while (true) { executor.execute(() -> { System.out.println(atomicInteger.getAndAdd(1)); }); } } }
結(jié)果里面的System.out.println(atomicInteger.getAndAdd(1));語(yǔ)句執(zhí)行了,與上面的描述矛盾了。到底發(fā)生了什么?線程池創(chuàng)建線程的邏輯是什么?我們還是從源碼來(lái)看看到底線程池的邏輯是什么?
ctl要了解線程池,我們首先要了解的線程池里面的狀態(tài)控制的參數(shù) ctl。
線程池的ctl是一個(gè)原子的 AtomicInteger。
這個(gè)ctl包含兩個(gè)參數(shù) :
workerCount 激活的線程數(shù)
runState 當(dāng)前線程池的狀態(tài)
它的低29位用于存放當(dāng)前的線程數(shù), 因此一個(gè)線程池在理論上最大的線程數(shù)是 536870911; 高 3 位是用于表示當(dāng)前線程池的狀態(tài), 其中高三位的值和狀態(tài)對(duì)應(yīng)如下:
111: RUNNING
000: SHUTDOWN
001: STOP
010: TIDYING
110: TERMINATED
為了能夠使用 ctl 線程池提供了三個(gè)方法:
// Packing and unpacking ctl // 獲取線程池的狀態(tài) private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取線程池的工作線程數(shù) private static int workerCountOf(int c) { return c & CAPACITY; } // 根據(jù)工作線程數(shù)和線程池狀態(tài)獲取 ctl private static int ctlOf(int rs, int wc) { return rs | wc; }execute
外界通過(guò) execute 這個(gè)方法來(lái)向線程池提交任務(wù)。
先看代碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果工作線程數(shù)小于核心線程數(shù), if (workerCountOf(c) < corePoolSize) { //執(zhí)行addWork,提交為核心線程,提交成功return。提交失敗重新獲取ctl if (addWorker(command, true)) return; c = ctl.get(); } //如果工作線程數(shù)大于核心線程數(shù),則檢查線程池狀態(tài)是否是正在運(yùn)行,且將新線程向阻塞隊(duì)列提交。 if (isRunning(c) && workQueue.offer(command)) { //recheck 需要再次檢查,主要目的是判斷加入到阻塞隊(duì)里中的線程是否可以被執(zhí)行 int recheck = ctl.get(); //如果線程池狀態(tài)不為running,將任務(wù)從阻塞隊(duì)列里面移除,啟用拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池的工作線程為零,則調(diào)用addWoker提交任務(wù) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //添加非核心線程失敗,拒絕 else if (!addWorker(command, false)) reject(command); }addWorker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //獲取線程池狀態(tài) int rs = runStateOf(c); // Check if queue empty only if necessary. // 判斷是否可以添加任務(wù)。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取工作線程數(shù)量 int wc = workerCountOf(c); //是否大于線程池上限,是否大于核心線程數(shù),或者最大線程數(shù) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS 增加工作線程數(shù) if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果線程池狀態(tài)改變,回到開(kāi)始重新來(lái) if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; //上面的邏輯是考慮是否能夠添加線程,如果可以就cas的增加工作線程數(shù)量 //下面正式啟動(dòng)線程 try { //新建worker w = new Worker(firstTask); //獲取當(dāng)前線程 final Thread t = w.thread; if (t != null) { //獲取可重入鎖 final ReentrantLock mainLock = this.mainLock; //鎖住 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài) // 或者線程處于SHUTDOWN狀態(tài),且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒(méi)有初始任務(wù)的worker線程執(zhí)行) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 當(dāng)前線程已經(jīng)啟動(dòng),拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers 是一個(gè) HashSet 必須在 lock的情況下操作。 workers.add(w); int s = workers.size(); //設(shè)置 largeestPoolSize 標(biāo)記workAdded if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //如果添加成功,啟動(dòng)線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //啟動(dòng)線程失敗,回滾。 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
先看看 addWork() 的兩個(gè)參數(shù),第一個(gè)是需要提交的線程 Runnable firstTask,第二個(gè)參數(shù)是 boolean 類型,表示是否為核心線程。
execute() 中有三處調(diào)用了 addWork() 我們逐一分析。
第一次,條件 if (workerCountOf(c) < corePoolSize) 這個(gè)很好理解,工作線程數(shù)少于核心線程數(shù),提交任務(wù)。所以 addWorker(command, true)。
第二次,如果 workerCountOf(recheck) == 0 如果worker的數(shù)量為0,那就 addWorker(null,false)。為什么這里是 null ?之前已經(jīng)把 command 提交到阻塞隊(duì)列了 workQueue.offer(command) 。所以提交一個(gè)空線程,直接從阻塞隊(duì)列里面取就可以了。
第三次,如果線程池沒(méi)有 RUNNING 或者 offer 阻塞隊(duì)列失敗,addWorker(command,false),很好理解,對(duì)應(yīng)的就是,阻塞隊(duì)列滿了,將任務(wù)提交到,非核心線程池。與最大線程池比較。
至此,重新歸納execute()的邏輯應(yīng)該是:
如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)。
如果運(yùn)行的線程等于或多于 corePoolSize,將任務(wù)加入 BlockingQueue。
如果加入 BlockingQueue 成功,需要二次檢查線程池的狀態(tài)如果線程池沒(méi)有處于 Running,則從 BlockingQueue 移除任務(wù),啟動(dòng)拒絕策略。
如果線程池處于 Running狀態(tài),則檢查工作線程(worker)是否為0。如果為0,則創(chuàng)建新的線程來(lái)處理任務(wù)。如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
如果加入 BlockingQueue 。失敗,則創(chuàng)建新的線程來(lái)處理任務(wù)。
如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
總結(jié)回顧我開(kāi)始提出的問(wèn)題:
如果 corePoolSize = 0 且 阻塞隊(duì)列是無(wú)界的。線程池將如何工作?
這個(gè)問(wèn)題應(yīng)該就不難回答了。
最后《Java并發(fā)編程的藝術(shù)》是一本學(xué)習(xí) java 并發(fā)編程的好書(shū),在這里推薦給大家。
同時(shí),希望大家在閱讀技術(shù)數(shù)據(jù)的時(shí)候要仔細(xì)思考,結(jié)合源碼,發(fā)現(xiàn),提出問(wèn)題,解決問(wèn)題。這樣的學(xué)習(xí)才能高效且透徹。
歡迎關(guān)注我的微信公眾號(hào)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/68556.html
摘要:異步任務(wù)的構(gòu)造方法主要用于初始化線程池先關(guān)的成員變量創(chuàng)建一個(gè)新的異步任務(wù)。所以,我們是必須確保在銷毀活動(dòng)之前取消任務(wù)。 目錄介紹 01.先看下AsyncTask用法 02.AsyncTask源碼深入分析 2.1 構(gòu)造方法源碼分析 2.2 看execute(Params... params)方法 2.3 mWorker和mFuture的創(chuàng)建過(guò)程 03.異步機(jī)制的實(shí)現(xiàn) 04.不同...
摘要:使用線程池的好處通過(guò)線程在自己的線程池中隔離的好處是該應(yīng)用程序完全可以不受失控的客戶端庫(kù)的威脅。簡(jiǎn)而言之,由線程池提供的隔離功能可以使客戶端庫(kù)和子系統(tǒng)性能特性的不斷變化和動(dòng)態(tài)組合得到優(yōu)雅的處理,而不會(huì)造成中斷。 ? 工作流程圖 下面的流程圖展示了當(dāng)使用Hystrix的依賴請(qǐng)求,Hystrix是如何工作的。showImg(https://segmentfault.com/img/bV0...
摘要:提高線程的可管理性線程池可以統(tǒng)一管理分配調(diào)優(yōu)和監(jiān)控。線程池的初始化狀態(tài)是。調(diào)用線程池的接口時(shí),線程池由。當(dāng)所有的任務(wù)已終止,記錄的任務(wù)數(shù)量為,阻塞隊(duì)列為空,線程池會(huì)變?yōu)闋顟B(tài)。線程池徹底終止,就變成狀態(tài)。 序言 我們知道,線程池幫我們重復(fù)管理線程,避免創(chuàng)建大量的線程增加開(kāi)銷。合理的使用線程池能夠帶來(lái)3個(gè)很明顯的好處:1.降低資源消耗:通過(guò)重用已經(jīng)創(chuàng)建的線程來(lái)降低線程創(chuàng)建和銷毀的消耗2.提...
摘要:介紹線程池一般包含三個(gè)主要部分調(diào)度器決定由哪個(gè)線程來(lái)執(zhí)行任務(wù)執(zhí)行任務(wù)所能夠的最大耗時(shí)等線程隊(duì)列存放并管理著一系列線程這些線程都處于阻塞狀態(tài)或休眠狀態(tài)任務(wù)隊(duì)列存放著用戶提交的需要被執(zhí)行的任務(wù)一般任務(wù)的執(zhí)行的即先提交的任務(wù)先被執(zhí)行調(diào)度器并非是必 介紹 線程池一般包含三個(gè)主要部分: 調(diào)度器: 決定由哪個(gè)線程來(lái)執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等 線程隊(duì)列: 存放并管理著一系列線程, 這些...
閱讀 1380·2021-11-25 09:43
閱讀 3594·2021-11-10 11:48
閱讀 5156·2021-09-23 11:21
閱讀 1605·2019-08-30 15:55
閱讀 3516·2019-08-30 13:53
閱讀 1243·2019-08-30 10:51
閱讀 876·2019-08-29 14:20
閱讀 1981·2019-08-29 13:11