摘要:源碼分析文章轉自源碼分析前段時間學習的源碼,學習線程池這一塊的時候發現了一篇不錯的文章,就記錄下來。這個方法在任何可能導致線程池終止的動作后執行比如減少或狀態下從隊列中移除任務。
threadpoolexecutor源碼分析
文章轉自:threadpoolexecutor源碼分析
前段時間學習java.util.concurrent的源碼,學習線程池這一塊的時候發現了一篇不錯的文章,就記錄下來。同時,文章之中加入了自己的一些見解。廢話不多說,直接開始。
ThreadPoolExecutor作為Java.util.concurrent包中核心的類,先看下類型的結構:
核心的接口其實是Executor,它只有一個execute方法抽象為對任務(Runnable接口)的執行, ExecutorService接口在Executor的基礎上提供了對任務執行的生命周期的管理,主要是submit和shutdown方法, AbstractExecutorService對ExecutorService一些方法做了默認的實現,主要是submit和invoke方法,而真正的任務執行 的Executor接口execute方法是由子類實現,就是ThreadPoolExecutor,它實現了基于線程池的任務執行框架,所以要了解 JDK的線程池,那么就得先看這個類。
再看execute方法之前需要先介幾個變量或類。
ctlprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
這個變量是整個類的核心,AtomicInteger保證了對這個變量的操作是原子的,通過巧妙的操作,ThreadPoolExecutor用這一個變量保存了兩個內容:
所有有效線程的數量
各個線程的狀態(runState)
低29位存線程數,高3位存runState,這樣runState有5個值:
RUNNING:-536870912
SHUTDOWN:0
STOP:536870912
TIDYING:1073741824
TERMINATED:1610612736
線程池中各個狀態間的轉換比較復雜,主要記住下面內容就可以了:
RUNNING狀態:線程池正常運行,可以接受新的任務并處理隊列中的任務;
SHUTDOWN狀態:不再接受新的任務,但是會執行隊列中的任務;
STOP狀態:不再接受新任務,不處理隊列中的任務
圍繞rtc有一些操作和變量:
/** * 這個方法用于取出runState的值 因為CAPACITY值為:00011111111111111111111111111111 * ~為按位取反操作,則~CAPACITY值為:11100000000000000000000000000000 * 再同參數做&操作,就將低29位置0了,而高3位還是保持原先的值,也就是runState的值 * * @param c * 該參數為存儲runState和workerCount的int值 * @return runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 這個方法用于取出workerCount的值 * 因為CAPACITY值為:00011111111111111111111111111111,所以&操作將參數的高3位置0了 * 保留參數的低29位,也就是workerCount的值 * * @param c * ctl, 存儲runState和workerCount的int值 * @return workerCount的值 */ private static int workerCountOf(int c) { return c & CAPACITY; } /** * 將runState和workerCount存到同一個int中 * “|”運算的意思是,假設rs的值是101000,wc的值是000111,則他們位或運算的值為101111 * * @param rs * runState移位過后的值,負責填充返回值的高3位 * @param wc * workerCount移位過后的值,負責填充返回值的低29位 * @return 兩者或運算過后的值 */ private static int ctlOf(int rs, int wc) { return rs | wc; } // 只有RUNNING狀態會小于0 private static boolean isRunning(int c) { return c < SHUTDOWN; }corePoolSize
核心線程池大小,活動線程小于corePoolSize則直接創建,大于等于則先加到workQueue中,隊列滿了才創建新的線程。
keepAliveTime線程從隊列中獲取任務的超時時間,也就是說如果線程空閑超過這個時間就會終止。
Workerprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable ...
內部類Worker是對任務的封裝,所有submit的Runnable都被封裝成了Worker,它本身也是一個Runnable, 然后利用AQS框架(關于AQS可以看我這篇文章)實現了一個簡單的非重入的互斥鎖, 實現互斥鎖主要目的是為了中斷的時候判斷線程是在空閑還是運行,可以看后面shutdown和shutdownNow方法的分析。
// state只有0和1,互斥 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功獲得鎖 } // 線程進入等待隊列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
之所以不用ReentrantLock是為了避免任務執行的代碼中修改線程池的變量,如setCorePoolSize,因為ReentrantLock是可重入的。
executeexecute方法主要三個步驟:
活動線程小于corePoolSize的時候創建新的線程;
活動線程大于corePoolSize時都是先加入到任務隊列當中;
任務隊列滿了再去啟動新的線程,如果線程數達到最大值就拒絕任務。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活動線程數 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接啟動新的線程。第二個參數true:addWorker中會重新檢查workerCount是否小于corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活動線程數 >= corePoolSize // runState為RUNNING && 隊列未滿 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING狀態 則從workQueue中移除任務并拒絕 if (!isRunning(recheck) && remove(command)) reject(command);// 采用線程池指定的策略拒絕任務 // 線程池處于RUNNING狀態 || 線程池處于非RUNNING狀態但是任務移除失敗 else if (workerCountOf(recheck) == 0) // 這行代碼是為了SHUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。 // 添加一個null任務是因為SHUTDOWN狀態下,線程池不再接受新任務 addWorker(null, false); // 兩種情況: // 1.非RUNNING狀態拒絕新的任務 // 2.隊列滿了啟動新的線程失敗(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
其中比較難理解的應該是addWorker(null, false);這一行,這要結合addWorker一起來看。 主要目的是防止HUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。
addWorker/** * @param firstTask:新增一個線程并執行這個任務,可空,增加的線程從隊列獲取任務; * * @param core:是否使用corePoolSize作為上限,否則使用maxmunPoolSize **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 當前線程池狀態 // Check if queue empty only if necessary. // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 滿足下列調價則直接返回false,線程創建失敗: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時不再接受新的任務,且所有任務執行結束 // rs = SHUTDOWN:firtTask != null 此時不再接受任務,但是仍然會執行隊列中的任務 // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, // false),任務為null && 隊列為空 // 最后一種情況也就是說SHUTDONW狀態下,如果隊列不為空還得接著往下執行,為什么?add一個null任務目的到底是什么? // 看execute方法只有workCount==0的時候firstTask才會為null結合這里的條件就是線程池SHUTDOWN了不再接受新任務 // 但是此時隊列不為空,那么還得創建線程把任務給執行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到這的情形: // 1.線程池狀態為RUNNING // 2.SHUTDOWN狀態,但隊列中還有任務需要執行 for (;;) { int wc = workerCountOf(c); //判斷條件有點難理解,其實是非運行狀態下(>=SHUTDOWN)或者SHUTDOWN狀態下任務非空(新提交任務)、任務隊列為空, //就不可以再新增線程了(return false),即SHUTDOWN狀態是可以新增線程去執行隊列中的任務; if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操作遞增workCount break retry;// 操作成功跳出的重試的循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 如果線程池的狀態發生變化則重試 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount遞增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 并發的訪問線程池workers對象必須加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩余的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將新啟動的線程添加到線程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動新添加的線程,這個線程首先執行firstTask,然后不停的從隊列中取任務執行 // 當等待keepAlieTime還沒有任務執行則該線程結束。見runWoker和getTask方法的代碼。 if (workerAdded) { t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 線程啟動失敗,則從wokers中移除w并遞減wokerCount if (!workerStarted) // 遞減wokerCount會觸發tryTerminate方法 addWorkerFailed(w); } return workerStarted; }runWorker
任務添加成功后實際執行的是runWorker這個方法,這個方法非常重要,簡單來說它做的就是:
第一次啟動會執行初始化傳進來的任務firstTask;
然后會從workQueue中取任務執行,如果隊列為空則等待keepAliveTime這么長時間。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Worker的構造函數中抑制了線程中斷setState(-1),所以這里需要unlock從而允許中斷 w.unlock(); // 用于標識是否異常終止,finally中processWorkerExit的方法會有不同邏輯 // 為true的情況:1.執行任務拋出異常;2.被中斷。 boolean completedAbruptly = true; try { // 如果getTask返回null那么getTask中會將workerCount遞減,如果異常了這個遞減操作會在processWorkerExit中處理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任務執行前可以插入一些處理,子類重載該方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 執行用戶任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 和beforeExecute一樣,留給子類去重載 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 結束線程的一些清理工作 processWorkerExit(w, completedAbruptly); } }getTask
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1.rs > SHUTDOWN 所以rs至少等于STOP,這時不再處理隊列中的任務 // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時還需要處理隊列中的任務除非隊列為空 // 這兩種情況都會返回null讓runWoker退出while循環也就是當前線程結束了,所以必須要decrement // wokerCount if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 遞減workerCount值 decrementWorkerCount(); return null; } // 標記從隊列中取任務時是否設置超時時間 boolean timed; // Are workers subject to culling? // 1.RUNING狀態 // 2.SHUTDOWN狀態,但隊列中還有任務需要執行 for (;;) { int wc = workerCountOf(c); // 1.core thread允許被超時,那么超過corePoolSize的的線程必定有超時 // 2.allowCoreThreadTimeOut == false && wc > // corePoolSize時,一般都是這種情況,core thread即使空閑也不會被回收,只要超過的線程才會 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 從addWorker可以看到一般wc不會大于maximumPoolSize,所以更關心后面半句的情形: // 1. timedOut == false 第一次執行循環, 從隊列中取出任務不為null方法返回 或者 // poll出異常了重試 // 2.timeOut == true && timed == // false:看后面的代碼workerQueue.poll超時時timeOut才為true, // 并且timed要為false,這兩個條件相悖不可能同時成立(既然有超時那么timed肯定為true) // 所以超時不會繼續執行而是return null結束線程。(重點:線程是如何超時的???) if (wc <= maximumPoolSize && !(timedOut && timed)) break; // workerCount遞減,結束當前thread if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // 需要重新檢查線程池狀態,因為上述操作過程中線程池可能被SHUTDOWN if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 1.以指定的超時時間從隊列中取任務 // 2.core thread沒有超時 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;// 超時 } catch (InterruptedException retry) { timedOut = false;// 線程被中斷重試 } } }processWorkerExit
線程退出會執行這個方法做一些清理工作。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常的話再runWorker的getTask方法workerCount已經被減一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加線程的completedTasks completedTaskCount += w.completedTasks; // 從線程池中移除超時或者出現異常的線程 workers.remove(w); } finally { mainLock.unlock(); } // 嘗試停止線程池 tryTerminate(); int c = ctl.get(); // runState為RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 線程不是異常結束 if (!completedAbruptly) { // 線程池最小空閑數,允許core thread超時就是0,否則就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是隊列不為空要保證有1個線程來執行隊列中的任務 if (min == 0 && !workQueue.isEmpty()) min = 1; // 線程池還不為空那就不用擔心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.線程異常退出 // 2.線程池為空,但是隊列中還有任務沒執行,看addWoker方法對這種情況的處理 addWorker(null, false); } }tryTerminate
processWorkerExit方法中會嘗試調用tryTerminate來終止線程池。這個方法在任何可能導致線程池終止的動作后執行:比如減少wokerCount或SHUTDOWN狀態下從隊列中移除任務。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 以下狀態直接返回: // 1.線程池還處于RUNNING狀態 // 2.SHUTDOWN狀態但是任務隊列非空 // 3.runState >= TIDYING 線程池已經停止了或在停止了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; // 只能是以下情形會繼續下面的邏輯:結束線程池。 // 1.SHUTDOWN狀態,這時不再接受新任務而且任務隊列也空了 // 2.STOP狀態,當調用了shutdownNow方法 // workerCount不為0則還不能停止線程池,而且這時線程都處于空閑等待的狀態 // 需要中斷讓線程“醒”過來,醒過來的線程才能繼續處理shutdown的信號。 if (workerCountOf(c) != 0) { // Eligible to terminate // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。 // ONLY_ONE:這里只需要中斷1個線程去處理shutdown信號就可以了。 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 進入TIDYING狀態 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子類重載:一些資源清理工作 terminated(); } finally { // TERMINATED狀態 ctl.set(ctlOf(TERMINATED, 0)); // 繼續awaitTermination termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }shutdown和shutdownNow
shutdown這個方法會將runState置為SHUTDOWN,會終止所有空閑的線程。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 線程池狀態設為SHUTDOWN,如果已經至少是這個狀態那么則直接返回 advanceRunState(SHUTDOWN); // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit → // tryTerminate方法中會保證隊列中剩余的任務得到執行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的線程。
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP狀態:不再接受新任務且不再執行隊列中的任務。 advanceRunState(STOP); // 中斷所有線程 interruptWorkers(); // 返回隊列中還沒有被執行的任務。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
主要區別在于shutdown調用的是interruptIdleWorkers這個方法,而shutdownNow實際調用的是Worker類的interruptIfStarted方法:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能獲取到鎖,說明該線程沒有在運行,因為runWorker中執行任務會先lock, // 因此保證了中斷的肯定是空閑的線程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
void interruptIfStarted() { Thread t; // 初始化時state == -1 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
這就是前面提到的Woker類實現AQS的主要作用。
注意:shutdown方法可能會在finalize被隱式的調用。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67477.html
摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
摘要:任務性質不同的任務可以用不同規模的線程池分開處理。線程池在運行過程中已完成的任務數量。如等于線程池的最大大小,則表示線程池曾經滿了。線程池的線程數量。獲取活動的線程數。通過擴展線程池進行監控。框架包括線程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個子線程共同在運行的線程組合。 舉個容易理解的例子:有個線程組合(即線程池,咱可以比喻為一個公司),里面有3...
摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
閱讀 2347·2019-08-30 15:44
閱讀 1272·2019-08-30 13:01
閱讀 3314·2019-08-30 11:22
閱讀 3099·2019-08-29 15:23
閱讀 1621·2019-08-29 12:22
閱讀 3380·2019-08-26 13:58
閱讀 3448·2019-08-26 12:17
閱讀 3486·2019-08-26 12:16