摘要:類則扮演線程池工廠角色,通過可以取得一個具有特定功能的線程池。返回一個可根據實際情況調整線程數量的線程池,線程數量不確定,若有空閑,則會有限復用線程。所有線程在當前任務執行完后,將返回線程池待復用。
前言
多線程的軟件設計方案確實可以最大限度地發揮現代多核處理器的計算能力,提高生產系列的吞吐量和性能。但是,若不加控制和管理的隨意使用線程,對系統的性能反而會產生不利的影響。最容易想到的后果就是線程過多導致CPU忙于切換而無力執行其中的工作。
為了避免系統頻繁地創建和銷毀線程,我們可以讓創建的線程進行復用。如果有同學有過數據庫開發的經驗,對數據庫連接池這個概念應該不會陌生。為了避免每次數據庫查詢都重新建立和銷毀數據庫連接,我們可以使用數據庫連接池維護一些數據庫連接,使其長期保持在一個激活的狀態。當系統需要使用數據庫時,并不是創建一個新的連接,而是從連接池中獲得一個可用的連接即可。反之,當需要關閉連接時,并不真的把連接關閉,而是將這個連接“還”給連接池即可。通過此方法,通過調節線程池的基本大小和存活時間,可以幫助線程池回收空閑線程占有的資源,從而使得這些資源可以用于執行其他的工作。
為了更好地控制多線程,JDK提供了一套Executor框架。核心成員如下圖所示
以上成員均在java.util.concurrent包中,是JDK并發包的核心類。其中ThreadPoolExecutor表示一個線程池。Executors類則扮演線程池工廠角色,通過Executors可以取得一個具有特定功能的線程池。從UML圖中亦可知,ThreadPoolExecutor實現了Executor接口,因此通過這個接口,任何Runnable對象都可以被ThreadPoolExecutor線程池調度。
Java提供了ExecutorService的三種實現:
ThraedPoolExecutor:標準線程池
ScheduledThreadPoolExecutor:支持延時任務的線程池
ForkJoinPool:類似于ThraedPoolExecutor,但是使用work-stealing模式,其會為線程池中的每個線程創建一個隊列,從而使用work-stealing(任務竊取)算法使得線程可以從其他線程隊列里竊取任務來執行。即如果自己的任務處理完成了,則可以去忙碌的工作線程那里去竊取任務執行。
在本文,將會主要以ThraedPoolExecutor作為講解例子。
線程池的基本大小(Core POOL SIZE) ,較大大小(Maximum pool size) 以及存活時間等因素共同負責線程的創建和銷毀 。 基本大小也就是線程池的目標大小,即在沒有任務執行是的線程池的大小,并且只有在工作隊列滿了的情況下才會創建超過這個數量的線程。線程池的較大大小表示可同時活動的線程數量的上限,如并且果某個線程的空閑時間超過了存活時間,那么將被標記為可回收的,并且當線程池的當前大小超過了基本大小時,這個線程將被終止。
newFixedThreadPool 工廠方法將線程池的基本大小和較大大小設置為參數中的執行值,而且創建的線程池不會超時。newCachedThreadPool工廠方法將線程池的較大大小設置為Integer.MAX_VALUE,而將其基本大小設置為0,并將超時時間設置為1分鐘,這種方法創建的線程池可以無限擴展,并且當需求降低時會自動收縮,其他形式的線程池可以通過顯示的ThreadPoolExecutor構造函數來溝通。
Executor框架提供了各種類型的線程池,主要有以下工廠方法。
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newSingleThreadScheduledExecutor() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上方法返回了具有不同工作特性的線程池,具體說明如下:
newFixedThreadPool返回一個固定數量的線程池。當一個新任務提交時,如果有空閑線程,則執行。否則新任務暫存在一個任務隊列中,待有空閑時,便處理在任務隊列中的任務。
newSingleThreadExecutor返回一個線程的線程池。當多余一個新任務提交時,會暫存在一個任務隊列中,待有空閑時,按先入先出的順序處理在任務隊列中的任務。
newCachedThreadPool返回一個可根據實際情況調整線程數量的線程池,線程數量不確定,若有空閑,則會有限復用線程。否則創建新線程處理任務。所有線程在當前任務執行完后,將返回線程池待復用。
newSingleThreadScheduledExecutor返回一個ScheduledExecutorService對象,線程池大小為1。ScheduledExecutorService在Executor接口之上擴展了在給定時間執行某任務的功能。如果在某個固定的延時之后執行,或周期性執行某個任務。可以用這個工廠。
newScheduledThreadPool,返回一個ScheduledExecutorService對象,但該線程可以指定線程數量。
固定大小的線程池public class ExecutorExample { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId() ); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a ){ MyTask myTask = new MyTask(); //創建固定大小線程池 ExecutorService es = Executors.newFixedThreadPool(5); for (int i =0;i<10;i++){ es.submit(myTask); } es.shutdown(); //總的來說就是5個線程去執行10個任務,因此你能看到每個ID都被打印了2遍。 } }任務 執行單位
在此之前,我們得先了解線程池中最基本的執行單位——Runable和Callable。
Executor使用Runnable作為其基本的任務表示形式。Runnable是有一種很大局限的抽象,雖然run能寫入到日志或者將結果放入某個共享的數據結構,但它不能返回一個值或者拋出一個受檢查的異常。那么Callble則可以彌補這些缺陷。
Runnable和Callable描述的都是抽象的計算任務。這些任務通常都是有范圍的,即都有一個明確的起始點,并且最終會結束。Executor執行的任務有4個生命周期階段:
創建
提交
開始
完成
由于有些任務可能要執行很長時間,因此通常能夠希望取消這些任務。在Executor框架中,已提交但尚未開始的任務可以取消,但對于那些已經開始執行的任務,只有當它們能夠響應中斷,才能取消。取消一個已完成的任務不會有任何影響。
Future則表示一個任務的生命周期,并提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消的任務等。在Future規范中包含的隱含含義是,任務的生命周期只能前進,不能后退,就像ExecuteService的生命周期一樣,當某個任務完成后,它就永遠停留在“完成”狀態上。
Future.get方法的行為取決于任務的狀態(尚未開始、已經運行、已完成)。如果任務已經完成,那么get會立即返回或者拋出一個Exception,如果任務沒有完成,那么get將阻塞并直到任務完成。如果任務拋出了異常,那么get將異常封裝為ExecutionException并重新拋出。如果任務被取消了,那么get將拋出CancellationException。如果get拋出ExecutionException,那么可以通過getCause來獲得被封裝的初始異常。
計劃任務newScheduledThreadPool返回一個ScheduledExecutorService對象,可以根據實際對線程進行調度。
//在給定的時間,對任務進行一次調度 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); //用于對任務進行周期性調度,任務調度的頻率是一定的,它以上一個任務開始執行時間為起點,之后的period時間后調度下一次任務。如果任務的執行時間大于調度時間,那么任務就會在上一個任務結束后,立即被調用。 public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); //對任務進行周期性調度,在上一個任務結束后,再經過delay長的時間進行任務調度。 public ScheduledFuture> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
ScheduledExecutorService不會立即安排執行任務,它類似Linux中的crontab工具。如果任務遇到異常,則后續的所有子任務都會停止執行。因此,必須保證異常被及時處理,為周期性任務的穩定調度提供條件。
public class ScheduledExecutorExample { public static void main(String[] a){ ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); //如果前面的任務沒有完成,則調度也不會啟動 ses.scheduleAtFixedRate(new Runnable(){ @Override public void run() { long s = System.currentTimeMillis(); try { System.out.println(Thread.currentThread().getId() + " 號線程開始工作..."); //模擬處理事務 Thread.sleep(1000); long e = System.currentTimeMillis(); System.out.println(Thread.currentThread().getId() + " 號線程結束工作...用時:" +( (e -s)/1000) +"s"); } catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }核心線程池的內部實現
對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool方法,雖然看起來創建的線程具有完全不同的功能特點,但其內部均使用了ThreadPoolExecutor實現。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue ()); }
由以上線程池的實現可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構造函數:
public ThreadPoolExecutor( //指定了線程池中的線程數量 int corePoolSize, //指定了線程池中的最大線程數量 int maximumPoolSize, //當前線程池數量超過corePoolSize時,多余的空閑線程的存活時間,即多次時間內會被銷毀。 long keepAliveTime, //keepAliveTime的單位 TimeUnit unit, //任務隊列,被提交但尚未被執行的任務。 BlockingQueueworkQueue, //線程工廠,用于創建線程,一般用默認的即可 ThreadFactory threadFactory, //拒絕策略,當任務太多來不及處理,如何拒絕任務。 RejectedExecutionHandler handler)
在這里面,大多數的參數都是較好理解的,但是workQueue和handler需要進行詳細說明。
WorkQueueworkQueue指提交但未執行的任務隊列,它是一個BlockingQueue接口的對象,僅用于存放Runnable對象,根據隊列功能分類,在ThreadPoolExecutor的構造函數中可使用以下幾種BlockingQueue。
直接提交的隊列:
該功能由synchronousQueue對象提供,synchronousQueue對象是一個特殊的BlockingQueue。synchronousQueue沒有容量,每一個插入操作都要等待一個響應的刪除操作,反之每一個刪除操作都要等待對應的插入操作。如果使用synchronousQueue,提交的任務不會被真實的保存,而總是將新任務提交給線程執行,如果沒有空閑線程,則嘗試創建線程,如果線程數量已經達到了最大值,則執行拒絕策略,因此,使用synchronousQueue隊列,通常要設置很大的maximumPoolSize值,否則很容易執行拒絕策略。
有界的任務隊列:
有界任務隊列可以使用ArrayBlockingQueue實現。ArrayBlockingQueue構造函數必須帶有一個容量參數,表示隊列的最大容量。
public ArrayBlockingQueue(int capacity)
當使用有界任務隊列時,若有新任務需要執行時,如果線程池的實際線程數量小于corePoolSize,則會優先創建線程。若大于corePoolSize,則會將新任務加入等待隊列。若等待隊列已滿,無法加入,則在總線程數不大于maximumPoolSize的前提下,創建新的線程執行任務。若大于maximumPoolSize,則執行拒絕策略。可見有界隊列僅當在任務隊列裝滿后,才可能將線程數量提升到corePoolSize以上,換言之,除非系統非常繁忙,否則確保核心線程數維持在corePoolSize。
無界的任務隊列:
無界隊列可以通過LinkedBlockingQueue類實現。與有界隊列相比,除非系統資源耗盡,無界隊列的任務隊列不存在任務入隊失敗的情況。若有新任務需要執行時,如果線程池的實際線程數量小于corePoolSize,則會優先創建線程執行。但當系統的線程數量達到corePoolSize后就不再創建了,這里和有界任務隊列是有明顯區別的。若后續還有新任務加入,而又沒有空閑線程資源,則任務直接進入隊列等待。若任務創建和處理的速度差異很大,無界隊列會保持快速增長,知道耗盡系統內存。
優先任務隊列:
帶有優先級別的隊列,它通過PriorityBlokingQueue實現,可以控制任務執行的優先順序。它是一個特殊的無界隊列。無論是ArrayBlockingQueue還是LinkedBlockingQueue實現的隊列,都是按照先進先出的算法處理任務,而PriorityBlokingQueue根據任務自身優先級順序先后執行,在確保系統性能同時,也能很好的質量保證(總是確保高優先級的任務優先執行)。
開發人員以免有時會將線程池的基本大小設置為零,從而最終銷毀工作者線程以免阻礙JVM的退出。然而,如果在線程池中沒有使用SynchronousQueue作為其工作隊列(例如在newCachedThreadPool中就是如此,它的核心池設為0,但它的任務隊列使用的是SynchronousQueue),那么這種方式將產生一些奇怪的行為。如果線程池中的線程數量等于線程池的基本大小,那么僅當在工作隊列已滿的情況下ThreadPoolExecutor才會創建新的線程。因此,如果線程池的基本大小為零并且其工作隊列有一定的容量,那么當把任務提交給該線程池時,只有當線程池的工作隊列被填滿后,才會開始執行任務,而這種行為通常不是我們所希望的。在Java6中,可以通過allowCoreThreadTimeOut來使線程池中的所有線程超時。對于一個大小有限的線程池并且在該線程池中包含了一個工作隊列,如果希望 這個線程池在沒有任務的情況下能銷毀所有的線程 ,那么可以啟用這個特性并將基本大小設置為零。
調度邏輯可以總結為
飽和策略線程池中的線程已經用完了,無法繼續為新任務服務,同時,等待隊列也已經排滿了,再也塞不下新任務了。這時候我們就需要拒絕策略機制合理的處理這個問題。
JDK內置的拒絕策略如下:
AbortPolicy : 默認策略。直接拋出異常DiscardExecutionException,調用者可以考慮捕獲這個異常,編寫自己的處理代碼。
CallerRunsPolicy : 該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者。
DiscardOldestPolicy : 丟棄最老的一個請求,也就是即將被執行的一個任務,并嘗試再次提交當前任務。
DiscardPolicy : 該策略默默地丟棄無法處理的任務,不予任何處理。如果允許任務丟失,這是最好的一種方案。
以上內置拒絕策略均實現了RejectedExecutionHandler接口,若以上策略仍無法滿足實際需要,完全可以自己擴展RejectedExecutionHandler接口。RejectedExecutionHandler的定義如下。
public interface RejectedExecutionHandler { /** * @param r 請求執行的任務 * @param executor 當前線程池 **/ void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
那么接下來看一個簡單地演示了自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is discard"); } }); for (int i = 0; i < Integer.MAX_VALUE; i++) { es.submit(task); Thread.sleep(10); } } }
這個例子定義一個線程池,里面有5個常駐線程,并且最大線程數也是5個。這和固定大小的線程池是一樣的。但是它卻擁有一個只有10個容量的等待隊列。那么必定會有在大量的任務被直接丟棄。
自定義線程創建:ThreadFactory線程池中的線程從何而來?來自ThreadFactory。
ThreadFactory是一個接口,它只有一個方法,用來創建線程:
Thread newThread(Runnable r);
當線程池需要新建線程時,就會調用這個方法。
自定義線程池可以幫我們做不少事情。我們可以跟蹤線程池在何時創建了多少線程,也可以自定義線程的名稱、組以及優先級等信息,甚至可以任性地將所有的線程設置為守護線程。總之,使用自定義線程可以讓我們更加自由地設置池中所有的線程的狀態。下面的案例使用自定義ThreadFactory,一方面記錄了線程的創建,另一方面將所有的線程都設置為守護線程,這樣,當主線程退出后,將會強制銷毀線程池。
public class ThreadFactoryExample { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName() + " coming..."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a ) throws InterruptedException { MyTask myTask = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue擴展線程池(10) , new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("T " + t.getId() + "_" +System.currentTimeMillis()); t.setDaemon(true); System.out.println("Create a Thread Name is : "+t.getName()); return t; } }); for (int i=0;i<10;i++){ es.submit(myTask); } Thread.sleep(2000); } }
ThreadPoolExecutor是可擴展的,它提供了幾個“鉤子”方法可以在子類化中改寫:beforeExecute、afterExecute和terminated,這些方法可以用于擴展ThreadPoolExecutor的行為。
在執行任務的線程中將調用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監視或統計信息收集的功能。無論任務是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調用。(如果任務在完成后帶有一個Error,那么就不會調用afterExecute。)如果beforeExecute拋出了一個RuntimeException,那么任務將不會被執行,并且afterExecute也不會被調用。
栗子:
public class ExtThreadPool { public static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { try { System.out.println("現在向控制臺走來的是線程" + Thread.currentThread().getId() + "號" + "名字為:" + name); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque分治思想:Fork/Join框架()) { /** * 創建ThreadPoolExecutor的匿名內部類的子類 * * @param t * the thread that will run task 將要運行任務的線程 * @param r * the task that will be executed 將要執行的任務 **/ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("start execute .." + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("after execute .." + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("exit execute .."); } }; for (int i = 0; i < 10; i++) { MyTask myTask = new MyTask("T_" + i); es.execute(myTask);// execute 和 submit 的區別在future模式中再說 Thread.sleep(100); } /** * 不會暴力的關閉,而會等待所有線程執行完后關閉線程 可以簡單的理解為shutdown只是發送一個關閉信號, * 但在shutdown之后,線程就不能再接受其他任務了. **/ es.shutdown(); } }
分治思想經常在一些經典的算法中能看到,算是一個非常有效地處理大量數據的方法。
在Linux平臺中,函數fork()用來創建子線程,使得系統進程可以多一個執行分支。
而join這個方法相信了解java多線程的同學一定不會陌生,它表示等待。也就是使用fork()后系統多一個執行分支(線程),所以需要等待這個執行分支執行完畢,才有可能得到最終的結果,因此join()就表示等待。
在實際使用中,如果毫無顧忌的使用fork()開啟線程進行處理,那么很有可能導致系統開啟過多的線程而嚴重影響性能。所以,在JDK中給出一個ForkJoinPool線程池,對于fork()方法并不著急開啟線程,而是提交給ForkJoinPool線程池進行處理,以節省系統資源。使用Fork/Join進行數據處理時候的總體結構如下圖。
由于線程池的優化,提交的任務和線程數量并不是一對一的關系。在絕大多數情況下,一個物理線程實際上是需要處理多個邏輯任務的。因此,每個線程必然需要擁有一個任務隊列。在實際執行過程中,可能遇到這么一種情況:線程A執行完了自己的所有任務,而線程B還有一堆任務等著處理。此時,線程A就會“幫助”線程B,從線程B的任務列表中拿一個任務過來處理,盡可能達到平衡。值得注意的是,當線程視圖幫助別的線程時,總是從任務隊列的底部開始拿數據,而線程視圖執行自己的任務時,則是從相反的頂部開始拿。因此這種行為也十分有利于避免數據競爭。
接下來看一下ForkJoinPool的一個重要的接口:
publicForkJoinPool submit(ForkJoinPoolTask task)
你可以向ForkJoinPool線程池提交一個ForkJoinTask任務。所謂ForkJoinTask任務就是支持fork()分解以及join等待的任務。ForkJoinTask有兩個最重要的子類,RecursiveAction和RecursiveTask。它們分別表示沒有返回值的任務和可以攜帶返回值的任務。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76404.html
摘要:并發編程實戰水平很高,然而并不是本好書。一是多線程的控制,二是并發同步的管理。最后,使用和來關閉線程池,停止其中的線程。當線程調用或等阻塞時,對這個線程調用會使線程醒來,并受到,且線程的中斷標記被設置。 《Java并發編程實戰》水平很高,然而并不是本好書。組織混亂、長篇大論、難以消化,中文翻譯也較死板。這里是一篇批評此書的帖子,很是貼切。俗話說:看到有這么多人罵你,我就放心了。 然而知...
摘要:基礎問題的的性能及原理之區別詳解備忘筆記深入理解流水線抽象關鍵字修飾符知識點總結必看篇中的關鍵字解析回調機制解讀抽象類與三大特征時間和時間戳的相互轉換為什么要使用內部類對象鎖和類鎖的區別,,優缺點及比較提高篇八詳解內部類單例模式和 Java基礎問題 String的+的性能及原理 java之yield(),sleep(),wait()區別詳解-備忘筆記 深入理解Java Stream流水...
摘要:基礎問題的的性能及原理之區別詳解備忘筆記深入理解流水線抽象關鍵字修飾符知識點總結必看篇中的關鍵字解析回調機制解讀抽象類與三大特征時間和時間戳的相互轉換為什么要使用內部類對象鎖和類鎖的區別,,優缺點及比較提高篇八詳解內部類單例模式和 Java基礎問題 String的+的性能及原理 java之yield(),sleep(),wait()區別詳解-備忘筆記 深入理解Java Stream流水...
摘要:基礎問題的的性能及原理之區別詳解備忘筆記深入理解流水線抽象關鍵字修飾符知識點總結必看篇中的關鍵字解析回調機制解讀抽象類與三大特征時間和時間戳的相互轉換為什么要使用內部類對象鎖和類鎖的區別,,優缺點及比較提高篇八詳解內部類單例模式和 Java基礎問題 String的+的性能及原理 java之yield(),sleep(),wait()區別詳解-備忘筆記 深入理解Java Stream流水...
閱讀 2834·2023-04-26 01:00
閱讀 763·2021-10-11 10:59
閱讀 2987·2019-08-30 11:18
閱讀 2687·2019-08-29 11:18
閱讀 1023·2019-08-28 18:28
閱讀 3021·2019-08-26 18:36
閱讀 2140·2019-08-23 18:16
閱讀 1072·2019-08-23 15:56