摘要:并且,線程池在某些情況下還能動態調整工作線程的數量,以平衡資源消耗和工作效率。同時線程池還提供了對池中工作線程進行統一的管理的相關方法。
概述開發中經常會遇到各種池(如:連接池,線程池),它們的作用就是為了提高性能及減少開銷,在JDK1.5以后的java.util.concurrent包中內置了很多不同使用場景的線程池,為了更好的理解它們,自己手寫一個線程池,加深印象。
1.什么是池
它的基本思想是一種對象池,程序初始化的時候開辟一塊內存空間,里面存放若干個線程對象,池中線程執行調度由池管理器來處理。當有線程任務時,從池中取一個,執行完成后線程對象歸池,這樣可以避免反復創建線程對象所帶來的性能開銷,節省系統的資源。
2.使用線程池的好處
合理的使用線程池可以重復利用已創建的線程,這樣就可以減少在創建線程和銷毀線程上花費的時間和資源。并且,線程池在某些情況下還能動態調整工作線程的數量,以平衡資源消耗和工作效率。同時線程池還提供了對池中工作線程進行統一的管理的相關方法。這樣就相當于我們一次創建,就可以多次使用,大量的節省了系統頻繁的創建和銷毀線程所需要的資源。
簡易版實現包含功能:
1.創建線程池,銷毀線程池,添加新任務
2.沒有任務進入等待,有任務則處理掉
3.動態伸縮,擴容
4.拒絕策略
介紹了線程池的原理以及主要組件之后,就讓我們來手動實現一個自己的線程池,以加深理解和深入學習。因為自己實現的簡易版本所以不建議生產中使用,生產中使用java.util.concurrent會更加健壯和優雅(后續文章會介紹)
代碼以下線程池相關代碼均在SimpleThreadPoolExecutor.java中,由于為了便于解讀因此以代碼塊的形式呈現
維護一個內部枚舉類,用來標記當前任務線程狀態,在Thread中其實也有.
private enum TaskState { FREE, RUNNABLE, BLOCKED, TERMINATED; }
定義拒絕策略接口,以及默認實現
static class DiscardException extends RuntimeException { private static final long serialVersionUID = 8827362380544575914L; DiscardException(String message) { super(message); } } interface DiscardPolicy {//拒絕策略接口 void discard() throws DiscardException; }
任務線程具體實現
1.繼承Thread,重寫run方法。
2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果當前線程處于空閑狀態且沒有任何任務了就將它wait住,讓出CPU執行權
3.如果有任務就去執行FIFO(先進先出)策略
4.定義close方法,關閉線程,當然這里不能暴力關閉,所以這里有需要借助interrupt
public static class WorkerTask extends Thread { // 線程狀態 private TaskState taskState; // 線程編號 private static int threadInitNumber; /** * 生成線程名,參考Thread.nextThreadNum(); * * @return */ private static synchronized String nextThreadName() { return THREAD_NAME_PREFIX + (++threadInitNumber); } WorkerTask() { super(THREAD_GROUP, nextThreadName()); } @Override public void run() { Runnable target; //說明該線程處于空閑狀態 OUTER: while (this.taskState != TaskState.TERMINATED) { synchronized (TASK_QUEUE) { while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) { try { this.taskState = TaskState.BLOCKED;//此處標記 //沒有任務就wait住,讓出CPU執行權 TASK_QUEUE.wait(); //如果被打斷說明當前線程執行了 shutdown() 方法 線程狀態為 TERMINATED 直接跳到 while 便于退出 } catch (InterruptedException e) { break OUTER; } } target = TASK_QUEUE.removeFirst();//遵循FIFO策略 } if (target != null) { this.taskState = TaskState.RUNNABLE; target.run();//開始任務了 this.taskState = TaskState.FREE; } } } void close() {//優雅關閉線程 this.taskState = TaskState.TERMINATED; this.interrupt(); } }
簡易版線程池,主要就是維護了一個任務隊列和線程集,為了動態擴容,自己也繼承了Thread去做監聽操作,對外提供submit()提交執行任務、shutdown()等待所有任務工作完畢,關閉線程池
public class SimpleThreadPoolExecutor extends Thread { // 線程池大小 private int threadPoolSize; // 最大接收任務 private int queueSize; // 拒絕策略 private DiscardPolicy discardPolicy; // 是否被銷毀 private volatile boolean destroy = false; private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默認最小線程數 private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活躍線程 private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大線程 private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多執行多少任務 private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//線程名前綴 private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//線程組的名稱 private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//線程組 private final static List測試一把WORKER_TASKS = new ArrayList<>();// 線程容器 // 任務隊列容器,也可以用Queue 遵循 FIFO 規則 private final static LinkedList TASK_QUEUE = new LinkedList<>(); // 拒絕策略 private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException("[拒絕執行] - [任務隊列溢出...]"); }; private int minSize;//最小線程 private int maxSize;//最大線程 private int activeSize;//活躍線程 SimpleThreadPoolExecutor() { this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY); } SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){ this.minSize = minSize; this.activeSize = activeSize; this.maxSize = maxSize; this.queueSize = queueSize; this.discardPolicy = discardPolicy; initPool(); } void submit(Runnable runnable) { if (destroy) { throw new IllegalStateException("線程池已銷毀..."); } synchronized (TASK_QUEUE) { if (TASK_QUEUE.size() > queueSize) {//如果當前任務隊超出隊列限制,后續任務拒絕執行 discardPolicy.discard(); } // 1.將任務添加到隊列 TASK_QUEUE.addLast(runnable); // 2.喚醒等待的線程去執行任務 TASK_QUEUE.notifyAll(); } } void shutdown() throws InterruptedException { int activeCount = THREAD_GROUP.activeCount(); while (!TASK_QUEUE.isEmpty() && activeCount > 0) { // 如果還有任務,那就休息一會 Thread.sleep(100); } int intVal = WORKER_TASKS.size();//如果線程池中沒有線程,那就不用關了 while (intVal > 0) { for (WorkerTask task : WORKER_TASKS) { //當任務隊列為空的時候,線程狀態才會為 BLOCKED ,所以可以打斷掉,相反等任務執行完在關閉 if (task.taskState == TaskState.BLOCKED) { task.close(); intVal--; } else { Thread.sleep(50); } } } this.destroy = true; //資源回收 TASK_QUEUE.clear(); WORKER_TASKS.clear(); this.interrupt(); System.out.println("線程關閉"); } private void createWorkerTask() { WorkerTask task = new WorkerTask(); //剛創建出來的線程應該是未使用的 task.taskState = TaskState.FREE; WORKER_TASKS.add(task); task.start(); } /** * 初始化操作 */ private void initPool() { for (int i = 0; i < this.minSize; i++) { this.createWorkerTask(); } this.threadPoolSize = minSize; this.start();//自己啟動自己 } @Override public void run() { while (!destroy) { try { Thread.sleep(5_000L); if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次擴容到 activeSize 大小 for (int i = threadPoolSize; i < activeSize; i++) { createWorkerTask(); } this.threadPoolSize = activeSize; System.out.println("[初次擴充] - [" + toString() + "]"); } else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次擴容到最大線程 System.out.println(); for (int i = threadPoolSize; i < maxSize; i++) { createWorkerTask(); } this.threadPoolSize = maxSize; System.out.println("[再次擴充] - [" + toString() + "]"); } else { //防止線程在submit的時候,其他線程獲取到鎖干壞事 synchronized (WORKER_TASKS) { int releaseSize = threadPoolSize - activeSize; Iterator iterator = WORKER_TASKS.iterator();// List不允許在for中刪除集合元素,所以這里需要使用迭代器 while (iterator.hasNext()) { if (releaseSize <= 0) { break; } WorkerTask task = iterator.next(); //不能回收正在運行的線程,只回收空閑線程 if (task.taskState == TaskState.FREE) { task.close(); iterator.remove(); releaseSize--; } } System.out.println("[資源回收] - [" + toString() + "]"); } threadPoolSize = activeSize; } } catch (InterruptedException e) { System.out.println("資源釋放"); } } } @Override public String toString() { return "SimpleThreadPoolExecutor{" + "threadPoolSize=" + threadPoolSize + ", taskQueueSize=" + TASK_QUEUE.size() + ", minSize=" + minSize + ", maxSize=" + maxSize + ", activeSize=" + activeSize + "}"; } }
創建一個測試類
public class SimpleExecutorTest { public static void main(String[] args) throws InterruptedException { SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor(); IntStream.range(0, 30).forEach(i -> executor.submit(() -> { System.out.printf("[線程] - [%s] 開始工作... ", Thread.currentThread().getName()); try { Thread.sleep(2_000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("[線程] - [%s] 工作完畢... ", Thread.currentThread().getName()); }) ); //executor.shutdown();如果放開注釋即會執行完所有任務關閉線程池 } }
日志分析: 從日志中可以看到,初始化的時候是2個線程在工作,執行速度較為緩慢,當經過第一次擴容后,會觀察到線程池里線程個數增加了,執行任務的速度就越來越快了,本文一共擴容了2次,第一次是擴容到activeSize的大小,第二次是擴容到maxSize,在執行任務的過程中,當線程數過多的時候就會觸發回收機制...
[線程] - [MY-THREAD-NAME-1] 開始工作... [線程] - [MY-THREAD-NAME-2] 開始工作... [線程] - [MY-THREAD-NAME-1] 工作完畢... [線程] - [MY-THREAD-NAME-1] 開始工作... [線程] - [MY-THREAD-NAME-2] 工作完畢... [線程] - [MY-THREAD-NAME-2] 開始工作... [線程] - [MY-THREAD-NAME-1] 工作完畢... [線程] - [MY-THREAD-NAME-1] 開始工作... [線程] - [MY-THREAD-NAME-2] 工作完畢... [線程] - [MY-THREAD-NAME-2] 開始工作... [初次擴充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}] [線程] - [MY-THREAD-NAME-3] 開始工作... ... [線程] - [MY-THREAD-NAME-6] 開始工作... [線程] - [MY-THREAD-NAME-7] 開始工作... [再次擴充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}] [線程] - [MY-THREAD-NAME-10] 開始工作... ... [線程] - [MY-THREAD-NAME-5] 開始工作... [資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}] [線程] - [MY-THREAD-NAME-4] 工作完畢... ... [線程] - [MY-THREAD-NAME-7] 工作完畢... [資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}] [資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]總結
通過本文,大致可以了解線程池的工作原理和實現方式,學習的過程中,就是要知其然知其所以然。這樣才能更好地駕馭它,更好地去理解和使用,也能更好地幫助我們觸類旁通,后面的文章中會詳細介紹java.util.concurrent中的線程池。
- 說點什么全文代碼:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12
個人QQ:1837307557
battcn開源群(適合新手):391619659
微信公眾號:battcn(歡迎調戲)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67758.html
摘要:在之前,不能為線程單獨設置或指定一個默認的,為了設置,需要繼承并覆寫方法。幸運的是后線程提供了一個方法,用來捕獲并處理因線程中拋出的未知異常,以避免程序終止。 在單線程的開發過程中,通常采用try-catch的方式進行異常捕獲,但是這種方式在多線程環境中會顯得無能為力,而且還有可能導致一些問題的出現,比如發生異常的時候不能及時回收系統資源,或者無法及時關閉當前的連接... 概述 Ja...
摘要:如果有其它線程調用了相同對象的方法,那么處于該對象的等待池中的線程就會全部進入該對象的鎖池中,從新爭奪鎖的擁有權。 wait,notify 和 notifyAll,這些在多線程中被經常用到的保留關鍵字,在實際開發的時候很多時候卻并沒有被大家重視,而本文則是對這些關鍵字的使用進行描述。 存在即合理 在java中,每個對象都有兩個池,鎖池(monitor)和等待池(waitset),每個...
摘要:是所有線程池實現的父類,我們先看看構造函數構造參數線程核心數最大線程數線程空閑后,存活的時間,只有線程數大于的時候生效存活時間的單位任務的阻塞隊列創建線程的工程,給線程起名字當線程池滿了,選擇新加入的任務應該使用什么策略,比如拋異常丟棄當前 ThreadPoolExecutor ThreadPoolExecutor是所有線程池實現的父類,我們先看看構造函數 構造參數 corePool...
摘要:的作用是為其他線程的運行提供服務,比如說線程。在某些平臺上,指定一個較高的參數值可能使線程在拋出之前達到較大的遞歸深度。參數的值與最大遞歸深度和并發程度之間的關系細節與平臺有關。 今天研究了下Java線程基礎知識,發現以前太多知識知識略略帶過了,比較說Java的線程機制,在Java中有兩類線程:User Thread(用戶線程)、Daemon Thread(守護線程),以及構造器中的s...
閱讀 1771·2023-04-26 00:20
閱讀 1819·2021-11-08 13:21
閱讀 2010·2021-09-10 10:51
閱讀 1577·2021-09-10 10:50
閱讀 3310·2019-08-30 15:54
閱讀 2142·2019-08-30 14:22
閱讀 1438·2019-08-29 16:10
閱讀 3098·2019-08-26 11:50