摘要:是所有線程池實(shí)現(xiàn)的父類,我們先看看構(gòu)造函數(shù)構(gòu)造參數(shù)線程核心數(shù)最大線程數(shù)線程空閑后,存活的時(shí)間,只有線程數(shù)大于的時(shí)候生效存活時(shí)間的單位任務(wù)的阻塞隊(duì)列創(chuàng)建線程的工程,給線程起名字當(dāng)線程池滿了,選擇新加入的任務(wù)應(yīng)該使用什么策略,比如拋異常丟棄當(dāng)前
ThreadPoolExecutor
ThreadPoolExecutor是所有線程池實(shí)現(xiàn)的父類,我們先看看構(gòu)造函數(shù)
構(gòu)造參數(shù)corePoolSize:線程核心數(shù)
maximumPoolSize:最大線程數(shù)
keepAliveTime:線程空閑后,存活的時(shí)間,只有線程數(shù)大于corePoolSize的時(shí)候生效
unit:存活時(shí)間的單位
workQueue:任務(wù)的阻塞隊(duì)列
threadFactory:創(chuàng)建線程的工程,給線程起名字
handler:當(dāng)線程池滿了,選擇新加入的任務(wù)應(yīng)該使用什么策略,比如拋異常、丟棄當(dāng)前任務(wù)、丟棄阻塞隊(duì)列的最老任務(wù)等,也可以自定義。
流程判斷是否超過線程核心數(shù)corePoolSize,沒超過創(chuàng)建線程
超過線程核心數(shù),則判斷隊(duì)列是否已滿,沒有滿,放入隊(duì)列
隊(duì)列也滿了,判斷是否超過maximumPoolSize,沒有就創(chuàng)建線程
超過了,根據(jù)策略執(zhí)行
源碼解析//32為,前3位作為線程池的狀態(tài),后三位是線程數(shù) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//28 private static final int CAPACITY = (1 << COUNT_BITS) - 1;00011111 11111111 11111111 11111110 //-1的二進(jìn)制是11111111 11111111 11111111 11111111 private static final int RUNNING = -1 << COUNT_BITS;//-1如上,左移28位后,就是111000000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS;//0左移28位,還是0,00000000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS;//00100000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS;//01000000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS;//01100000 00000000 00000000 00000000 private static int runStateOf(int c) { return c & ~CAPACITY; }//~CAPACITY為11100000000000000000000000000000,與完就是線程的狀態(tài) private static int workerCountOf(int c) { return c & CAPACITY; }//與完,是線程的數(shù)量 private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean isRunning(int c) { return c < SHUTDOWN;//小于0,說明是RUNNING,RUNNING=-1 }execute方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {//如果線程數(shù)少于線程核心數(shù) if (addWorker(command, true))//增加任務(wù)成功,返回true,沒成功,繼續(xù)往下 return; c = ctl.get(); } //判斷隊(duì)列 if (isRunning(c) && workQueue.offer(command)) {//如果線程池還在跑,并且可以插入隊(duì)列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//線程池不是運(yùn)行狀態(tài),就移除剛剛插入的任務(wù) reject(command);//執(zhí)行策略 else if (workerCountOf(recheck) == 0)// addWorker(null, false); } //隊(duì)列也滿了,判斷最大線程數(shù) else if (!addWorker(command, false)) reject(command);//執(zhí)行策略 }addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {//core為true,使用corePoolSize判斷,否則使用maximumPoolSize retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//獲取當(dāng)前線程狀態(tài) // Check if queue empty only if necessary. if (rs >= SHUTDOWN && // 就是STOP、TIDYING、TERMINATED,此時(shí)不讓任務(wù)進(jìn)來 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))// return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;//超過了線程核心數(shù)或最大線程數(shù),不讓新增 if (compareAndIncrementWorkerCount(c))//返回true,說明成功了,跳出retry循環(huán) break retry; //失敗了,說明被其他符號條件的線程占了,就再判斷線程狀態(tài)是否跟之前一樣,不一樣重新獲取,跳到retry c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//獲取鎖 try { int rs = runStateOf(ctl.get());//獲取線程池的狀態(tài) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 沒通過start來啟動run的 throw new IllegalThreadStateException(); workers.add(w);//加點(diǎn)hashset int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//更新當(dāng)前最大值 workerAdded = true;//增加成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//啟動線程 workerStarted = true;//啟動成功 } } } finally { if (! workerStarted) addWorkerFailed(w);//失敗,線程數(shù)-1,從hashset移除,并嘗試Terminate } return workerStarted; }runWorker方法
上面執(zhí)行 t.start();的時(shí)候,就會通過run方法調(diào)用下面的方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//任務(wù)不為空或者獲取的任務(wù)也不為空 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//調(diào)用run方法,這里沒有通過start,也就是說沒有啟動新線程 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++;//完成任務(wù)數(shù)加1 w.unlock();//釋放 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//移除w,在task為空的時(shí)候,比如線程池狀態(tài)停止或者啟動的線程太多 } }
getTask方法
當(dāng)Worker第一次啟動的時(shí)候,調(diào)用run方法,后面就一直從隊(duì)列里獲取任務(wù)
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c);//獲取當(dāng)前線程池狀態(tài) // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// decrementWorkerCount();//線程數(shù)量-1 return null; } int wc = workerCountOf(c);//線程數(shù) //allowCoreThreadTimeOut為true,說明線程數(shù)要根據(jù)是否超過核心線程數(shù)判斷keepAliveTime boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//是否超過核心線程數(shù) if ((wc > maximumPoolSize || (timed && timedOut))//超過了最大線程數(shù) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c))//線程數(shù)-1 return null;//返回空 continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();//獲取任務(wù) if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/75696.html
摘要:系統(tǒng)預(yù)定了幾個(gè)線程池,不過建議手動創(chuàng)建,以防止錯(cuò)誤創(chuàng)建消耗資源,比如創(chuàng)建太多線程或者固定線程數(shù)量,無界隊(duì)列固定線程數(shù)量,數(shù)量為,無界隊(duì)列,會按順序執(zhí)行不限制線程數(shù)量,使用隊(duì)列,使用于短任務(wù)基于用于周期性執(zhí)行任務(wù)示例第一個(gè)是,第二個(gè)是第一 系統(tǒng)預(yù)定了幾個(gè)線程池,不過建議手動創(chuàng)建,以防止錯(cuò)誤創(chuàng)建消耗資源,比如創(chuàng)建太多線程或者OOM FixedThreadPool 固定線程數(shù)量,無界隊(duì)列 p...
摘要:接口用于提交任務(wù)接口繼承了接口設(shè)置線程的狀態(tài),還沒執(zhí)行的線程會被中斷設(shè)置線程的狀態(tài),嘗試停止正在進(jìn)行的線程當(dāng)調(diào)用或方法后返回為當(dāng)調(diào)用方法后,并且所有提交的任務(wù)完成后返回為當(dāng)調(diào)用方法后,成功停止后返回為當(dāng)前線程阻塞,直到線程執(zhí)行完時(shí)間到被中斷 Executor接口 void execute(Runnable command)//用于提交command任務(wù) ExecutorService接...
摘要:抽象類,實(shí)現(xiàn)了的接口。將任務(wù)封裝成提交任務(wù)主要方法在任務(wù)是否超時(shí)超時(shí)時(shí)間任務(wù)書用于存放結(jié)果的,先完成的放前面。 AbstractExecutorService抽象類,實(shí)現(xiàn)了ExecutorService的接口。 newTaskFor 將任務(wù)封裝成FutureTask protected RunnableFuture newTaskFor(Runnable runnable, T va...
摘要:方法作用讓當(dāng)前的線程狀態(tài)從運(yùn)行狀態(tài)轉(zhuǎn)到就緒狀態(tài),然后和其他就緒狀態(tài)的同相同優(yōu)先級的其他線程競爭的執(zhí)行權(quán)。也就是說,這個(gè)線程,還是有機(jī)會繼續(xù)再次執(zhí)行的。 方法作用 讓當(dāng)前的線程狀態(tài)從運(yùn)行狀態(tài)轉(zhuǎn)到就緒狀態(tài),然后和其他就緒狀態(tài)的同相同優(yōu)先級的其他線程競爭CPU的執(zhí)行權(quán)。也就是說,這個(gè)線程,還是有機(jī)會繼續(xù)再次執(zhí)行的。 優(yōu)先權(quán) 優(yōu)先級范圍是1~10,數(shù)字越大,優(yōu)先級越高,默認(rèn)為5,但是由于操作系...
摘要:高并發(fā)系列第篇文章。簡單的說,在使用了線程池之后,創(chuàng)建線程變成了從線程池中獲取一個(gè)空閑的線程,然后使用,關(guān)閉線程變成了將線程歸還到線程池。如果調(diào)用了線程池的方法,線程池會提前把核心線程都創(chuàng)造好,并啟動線程池允許創(chuàng)建的最大線程數(shù)。 java高并發(fā)系列第18篇文章。 本文主要內(nèi)容 什么是線程池 線程池實(shí)現(xiàn)原理 線程池中常見的各種隊(duì)列 自定義線程創(chuàng)建的工廠 常見的飽和策略 自定義飽和策略 ...
閱讀 3178·2021-11-22 15:25
閱讀 3854·2021-11-17 09:33
閱讀 3369·2021-11-08 13:15
閱讀 3051·2021-09-22 10:56
閱讀 542·2021-08-31 09:45
閱讀 2754·2019-08-30 13:49
閱讀 3081·2019-08-30 12:52
閱讀 1145·2019-08-29 17:05