摘要:的前位數用來表示線程的數量,后面三位用來表示線程池的狀態。線程池的狀態有五種,分別是,根據單詞就能猜出大概。并且為了考慮性能問題,線程池的設計沒有使用悲觀鎖關鍵字,而是大量使用了和機制。
零 前期準備 0 FBI WARNING
文章異常啰嗦且繞彎。
1 版本JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadPoolExecutor 簡介ThreadPoolExecutor 是 jdk4 中加入的工具,被封裝在 jdk 自帶的 Executors 框架中,是 java 中最經典的線程池技術。
ThreadPoolExecutor 類在 concurrent 包下,和其它線程工具類一樣都由 Doug Lea 大神操刀完成。
[ 在看完 Spring ioc 和 Gson 之后有點乏了,換換口味看一些 jdk 的源碼 ]
3 Demoimport java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolDemo { public static void main(String[] args){ //創建線程池 //這里使用固定線程數的線程池,線程數為 5 ExecutorService executorService = Executors.newFixedThreadPool(5); for(int i = 0 ; i < 100 ; i ++){ final int ii = i; //創建 Runnable 作為線程池的任務 Runnable r = () -> System.out.println(ii); //執行 executorService.execute(r); } } }一 線程池的初始化
線程池的初始化調用的 Executors 框架的靜態方法:
//Executors.class public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
繼續追蹤這個構造方法:
//ThreadPoolExecutor.class public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
繼續追蹤:
//ThreadPoolExecutor.class public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue二 WorkerworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //驗證參數的有效性 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); //本例中不涉及權限 this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); //線程數 this.corePoolSize = corePoolSize; //最大線程數 //本例中使用固定線程數的線程池,所以線程數和最大線程數相等 this.maximumPoolSize = maximumPoolSize; //用于存儲任務的隊列 //此處使用 LinkedBlockingQueue 來儲存任務,其線程安全 this.workQueue = workQueue; //keepAliveTime 參數用于表示: //對于超出線程和隊列緩存總和的任務,是否要臨時增加線程來處理 //超出的線程的存在時間是多少 //這里使用的是定長線程池,所以 keepAliveTime = 0,即不增加線程 this.keepAliveTime = unit.toNanos(keepAliveTime); //用于創建線程的工廠類 this.threadFactory = threadFactory; //handler 用來處理 task 太多時候的拒絕策略 //此例中使用的是默認的,即定義在 ThreadPoolExecutor 中的 defaultHandler 對象 this.handler = handler; }
Worker 是 ThreadPoolExecutor 的內部類,可以看做是 Runnable 的代理類:
//ThreadPoolExecutor.class private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; //完成 task 數量的計數器 volatile long completedTasks; Worker(Runnable firstTask) { //這個方法是 AbstractQueuedSynchronizer 中的方法,功能相當于加鎖 //-1 的意思是后續的任務會處于阻塞狀態,即為已經加鎖 setState(-1); //在創建的時候存入一個要處理的 task //需要注意的是每個 worker 對象被創建出來之后是可以重復利用來處理多個 task 的 this.firstTask = firstTask; //worker 會用自身作為 Runnable 對象去創建一個線程 //這里調用線程工廠進行線程創建 this.thread = getThreadFactory().newThread(this); } //對于線程變量來說,其啟動的就是 worker 的 run() 方法 public void run() { //runWorker(...) 方法在 ThreadPoolExecutor 里 runWorker(this); } //獲取鎖的狀態 protected boolean isHeldExclusively() { return getState() != 0; } //重寫了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法 //嘗試加鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //重寫了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法 //嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //真正的加鎖方法 public void lock() { acquire(1); } //嘗試加鎖 public boolean tryLock() { return tryAcquire(1); } //真正的釋放鎖方法 public void unlock() { release(1); } //判斷是否在鎖中 public boolean isLocked() { return isHeldExclusively(); } //中斷線程 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
追蹤一下 runWorker(...) 方法:
//ThreadPoolExecutor.class final void runWorker(Worker w) { //獲取當前所在的線程的實例對象 Thread wt = Thread.currentThread(); //獲取 task Runnable task = w.firstTask; //取出來之后把 task 置空 w.firstTask = null; //此處釋放鎖 w.unlock(); //指示器,此變量為 true 的時候確認該方法已經執行完畢 boolean completedAbruptly = true; try { //此處為一個 while 循環,用于不斷的執行 task //getTask() 方法會從隊列里不斷抓取 task 并進行執行 //當 task 為 null,且隊列里已經沒有更多 task 的時候,就會終止循環 while (task != null || (task = getTask()) != null) { //加鎖,獨占線程 w.lock(); //在這里會判斷線程的狀態,如果存在符合中斷的情況,就會直接中斷掉 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并沒有實現 //是預留出來給使用者重寫,以達到業務需求的方法 beforeExecute(wt, task); try { //此處執行 task task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { //將執行的 task 置空 task = null; //每完成一個 task 就會加 1 w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { //這個方法會銷毀掉 worker //同時如果檢測到有新的 task 又會重新創建 Worker processWorkerExit(w, completedAbruptly); } }
Worker 是線程池中真正起完成業務邏輯的組件,是任務和線程的封裝。
三 線程池的狀態控制線程池的狀態主要由 ctl 變量來進行控制:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 是一個 AtomicInteger 類型的變量,其實可以簡單理解為一個 int 值,AtomicInteger 只是能夠適應高并發的原子化操作的需要。
ctl 的前 29 位數用來表示線程(Worker)的數量,后面三位用來表示線程池的狀態。
線程池的狀態有五種,分別是 Running、Shutdown、Stop、Tidying、Terminate,根據單詞就能猜出大概。
注意的是,這五種狀態在線程池中都以 int 變量的形式存在,從前到后依次變大,對狀態的比較有一系列方法:
//ThreadPoolExecutor.class private static boolean runStateLessThan(int c, int s) { //c 的狀態值要小于 s return c < s; } //ThreadPoolExecutor.class private static boolean runStateAtLeast(int c, int s) { //c 的狀態值要大于或等于 s return c >= s; } //ThreadPoolExecutor.class private static boolean isRunning(int c) { //狀態里只有 RUNNING 是小于 SHUTDOWN 的 return c < SHUTDOWN; }
在這些方法里,傳入的參數 c 一般指的是當前線程池狀態,s 是用來對比的參照狀態。
四 線程池的執行該 part 的起點:
executorService.execute(r);
來追蹤 execute(...) 方法:
public void execute(Runnable command) { //有效性驗證 if (command == null) throw new NullPointerException(); //ctl 是一個 AtomicInteger 類型的變量,用來記錄線程池的狀態 int c = ctl.get(); //workerCountOf(...) 方法會返回當前運行的 Worker 的數量 if (workerCountOf(c) < corePoolSize) { //Worker 的數量小于線程池容量的情況下 //直接增加 Worker 并取出 task 去運行 if (addWorker(command, true)) return; //如果 Worker 已經順利執行了 task,應該會直接返回掉 //如果執行中出現了其它情況,則會繼續往下走 //此處刷新狀態 c = ctl.get(); } //當 Worker 數量已經達到線程池的指定數量,或者添加 Worker 的時候出問題的時候,會進入此判斷語句 //先判斷線程池是否處于活躍狀態,且 task 是否已經被成功添加到隊列中 //如果不滿足,會進入 else 語句中,先最后嘗試一次 addWorker(...) 方法,如果不成功就拒絕 task //reject(...) 方法會調用 handler 的拒絕策略 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }else if (!addWorker(command, false)) reject(command); }1 reject
這里先提及一下 reject(...) 方法:
//ThreadPoolExecutor.class final void reject(Runnable command) { handler.rejectedExecution(command, this); }
本質是調用了 handler 對象的相關方法。在本例中,handler 對象指向了 defaultHandler:
//ThreadPoolExecutor.class private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
defaultHandler 是一個 AbortPolicy 類型的對象,而 AbortPolicy 是 ThreadPoolExecutor 的靜態內部類。
AbortPolicy 起作用的方法為 rejectedExecution(...) 方法:
//AbortPolicy.class public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
也就是說,在 task 過多的情況下,AbortPolicy 的應對策略是拋出異常。
2 addWorker來看一下核心方法 addWorker(...):
//ThreadPoolExecutor.class private boolean addWorker(Runnable firstTask, boolean core) { //先標記這個 for 循環,方便退出循環 retry: //在每一次循環開始之前會刷新一次狀態標識 for (int c = ctl.get();;) { //這里先進行判斷,如果線程池已經關閉了,或者沒有 task 了,就會返回 false if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { //如果 Worker 數量已經超出了最大值就會直接返回 false if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; //將 ctl 變量的值加 1,如果成功了就會跳出循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); //在狀態值比 SHUTDOWN 大的時候會直接跳到最外頭的循環里 //需要注意的是最外面的 for 循環會判斷狀態值是否大于 SHUTDOWN //如果大于 SHUTDOWN 的話就返回 false 了 if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創建一個 Worker w = new Worker(firstTask); //獲取線程對象 final Thread t = w.thread; if (t != null) { //加鎖,此處加的是一把全局的鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); //如果狀態值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就會進入這個判斷語句 // if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { //如果這個線程已經處于運作狀態,會拋出異常 if (t.isAlive()) throw new IllegalThreadStateException(); //workers 是一個列表,用于存儲 Worker 對象 workers.add(w); //獲取 Worker 的數量 int s = workers.size(); //largestPoolSize 用來記錄線程池達到過的最大線程數 if (s > largestPoolSize) largestPoolSize = s; //標記 Worker 已經被添加 workerAdded = true; } } finally { //釋放鎖 mainLock.unlock(); } //先判斷 Worker 是否已經被添加到 workers 內了 if (workerAdded) { //這是該方法核心的啟動線程方法 t.start(); //標記 Worker 已經開始運行了 workerStarted = true; } } } finally { //如果沒有標記 Worker 已經開始工作,會在這里銷毀掉 Worker if (!workerStarted) addWorkerFailed(w); } return workerStarted; }五 一點嘮叨
先總結一下線程池的業務邏輯:
1 接收到 task (即實現了 Runnable 接口的實例對象) [execute(...) 方法] 2 用 task 去嘗試創建一個 Worker 實例 [execute(...) 方法] 2.1 如果 Worker 數量沒有達到線程池的指定最大值 -> 新建 2.2 如果 Worker 數量達到了線程池的指定最大值 -> 不會再創建,而是把 task 儲存起來等待空閑的 Worker 去提取 2.3 如果 task 隊列也已經滿了,無法再添加 -> 觸發拒絕機制(handler) 3 Worker 在執行的時候調用其內部的 Thread 實例對象的 start() 方法 [addWorker(...) 方法] 4 該 start() 方法會調用到 Worker 的 run() 方法 [Worker.class 內的 run() 方法] 5 Worker 的 run() 方法本質上是封裝了 task 的 run() 方法 [runWorker(...) 方法]
主線業務邏輯不算復雜,比較艱難的是為了保證數據的一致性,線程池代碼中充斥著大量的狀態判斷和鎖機制。
并且為了考慮性能問題,線程池的設計沒有使用悲觀鎖(synchronized 關鍵字),而是大量使用了 ASQ 和 ReetrentLock 機制。
本文僅為個人的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/73156.html
摘要:零前期準備文章異常啰嗦且繞彎。版本版本簡介是中默認的實現類,常與結合進行多線程并發操作。所以方法的主體其實就是去喚醒被阻塞的線程。本文僅為個人的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充 零 前期準備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...
摘要:那么線程池到底是怎么利用類來實現持續不斷地接收提交的任務并執行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。 在上一篇文章《從0到1玩轉線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會從頭閱讀線程池ThreadPoolExecutor類的源代...
摘要:將線程池狀態置為并不會立即停止,停止接收外部的任務,內部正在跑的任務和隊列里等待的任務,會執行完,才真正停止。將線程池狀態置為。 在Java中,我們經常使用的線程池就是ThreadPoolExecutor,此外還有定時的線程池ScheduledExecutorService(),但是需要注意的是Executors.newCachedThreadPool()的線程是沒有上屆的,在使用時,...
摘要:創建一個線程池,具有固定線程數,運行在共享的無界隊列中。固定線程數源碼如下是的實現類。線程池中允許最大的線程數。如果線程數超過了核心線程數,過量的線程在關閉前等待新任務的最大時間。處理因為線程邊界和隊列容量導致的堵塞。 1.Executors.newFixedThreadPool(int nThreads):創建一個線程池,具有固定線程數,運行在共享的無界隊列中。在大多數時候,線程會主...
摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
閱讀 689·2021-09-30 09:47
閱讀 2878·2021-09-04 16:40
閱讀 866·2019-08-30 13:18
閱讀 3458·2019-08-29 16:22
閱讀 1564·2019-08-29 12:36
閱讀 595·2019-08-29 11:11
閱讀 1483·2019-08-26 13:47
閱讀 1136·2019-08-26 13:32