摘要:支持通過調(diào)整構(gòu)造參數(shù)來配置不同的處理策略,本文主要介紹常用的策略配置方法以及應(yīng)用場景。對于這種場景,我們可以設(shè)置使用帶有長度限制的隊列以及限定最大線程個數(shù)的線程池,同時通過設(shè)置處理任務(wù)被拒絕的情況。
ThreadPoolExecutor 是用來處理異步任務(wù)的一個接口,可以將其理解成為一個線程池和一個任務(wù)隊列,提交到 ExecutorService 對象的任務(wù)會被放入任務(wù)隊或者直接被線程池中的線程執(zhí)行。ThreadPoolExecutor 支持通過調(diào)整構(gòu)造參數(shù)來配置不同的處理策略,本文主要介紹常用的策略配置方法以及應(yīng)用場景。
ThreadPoolExecutor 的處理邏輯首先看一下 ThreadPoolExecutor 構(gòu)造函數(shù)的定義:
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數(shù)量 int maximumPoolSize, //線程池最大線程數(shù)量 long keepAliveTime, //線程KeepAlive時間,當(dāng)線程池數(shù)量超過核心線程數(shù)量以后,idle時間超過這個值的線程會被終止 TimeUnit unit, //線程KeepAlive時間單位 BlockingQueueworkQueue, //任務(wù)隊列 ThreadFactory threadFactory, //創(chuàng)建線程的工廠對象 RejectedExecutionHandler handler) //任務(wù)被拒絕后調(diào)用的handler
ThreadPoolExecutor 對線程池和隊列的使用方式如下:
從線程池中獲取可用線程執(zhí)行任務(wù),如果沒有可用線程則使用ThreadFactory創(chuàng)建新的線程,直到線程數(shù)達(dá)到corePoolSize限制
線程池線程數(shù)達(dá)到corePoolSize以后,新的任務(wù)將被放入隊列,直到隊列不能再容納更多的任務(wù)
當(dāng)隊列不能再容納更多的任務(wù)以后,會創(chuàng)建新的線程,直到線程數(shù)達(dá)到maxinumPoolSize限制
線程數(shù)達(dá)到maxinumPoolSize限制以后新任務(wù)會被拒絕執(zhí)行,調(diào)用 RejectedExecutionHandler 進(jìn)行處理
三種常用的 ThreadPoolExecutorExecutors 是提供了一組工廠方法用于創(chuàng)建常用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調(diào)用 ThreadPoolExecutor 構(gòu)造函數(shù)進(jìn)行創(chuàng)建,區(qū)別在于參數(shù)不同。
FixedThreadPool - 線程池大小固定,任務(wù)隊列無界下面是 Executors 類 newFixedThreadPool 方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
可以看到 corePoolSize 和 maximumPoolSize 設(shè)置成了相同的值,此時不存在線程數(shù)量大于核心線程數(shù)量的情況,所以KeepAlive時間設(shè)置不會生效。任務(wù)隊列使用的是不限制大小的 LinkedBlockingQueue ,由于是無界隊列所以容納的任務(wù)數(shù)量沒有上限。
因此,F(xiàn)ixedThreadPool的行為如下:
從線程池中獲取可用線程執(zhí)行任務(wù),如果沒有可用線程則使用ThreadFactory創(chuàng)建新的線程,直到線程數(shù)達(dá)到nThreads
線程池線程數(shù)達(dá)到nThreads以后,新的任務(wù)將被放入隊列
FixedThreadPool的優(yōu)點是能夠保證所有的任務(wù)都被執(zhí)行,永遠(yuǎn)不會拒絕新的任務(wù);同時缺點是隊列數(shù)量沒有限制,在任務(wù)執(zhí)行時間無限延長的這種極端情況下會造成內(nèi)存問題。
SingleThreadExecutor - 線程池大小固定為1,任務(wù)隊列無界public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
這個工廠方法中使用無界LinkedBlockingQueue,并的將線程數(shù)設(shè)置成1,除此以外還使用FinalizableDelegatedExecutorService類進(jìn)行了包裝。這個包裝類的主要目的是為了屏蔽ThreadPoolExecutor中動態(tài)修改線程數(shù)量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程因為處理異常等原因終止的時候,ThreadPoolExecutor會自動創(chuàng)建一個新的線程繼續(xù)進(jìn)行工作。
SingleThreadExecutor 適用于在邏輯上需要單線程處理任務(wù)的場景,同時無界的LinkedBlockingQueue保證新任務(wù)都能夠放入隊列,不會被拒絕;缺點和FixedThreadPool相同,當(dāng)處理任務(wù)無限等待的時候會造成內(nèi)存問題。
CachedThreadPool - 線程池?zé)o限大(MAX INT),等待隊列長度為1public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
SynchronousQueue是一個只有1個元素的隊列,入隊的任務(wù)需要一直等待直到隊列中的元素被移出。核心線程數(shù)是0,意味著所有任務(wù)會先入隊列;最大線程數(shù)是Integer.MAX_VALUE,可以認(rèn)為線程數(shù)量是沒有限制的。KeepAlive時間被設(shè)置成60秒,意味著在沒有任務(wù)的時候線程等待60秒以后退出。CachedThreadPool對任務(wù)的處理策略是提交的任務(wù)會立即分配一個線程進(jìn)行執(zhí)行,線程池中線程數(shù)量會隨著任務(wù)數(shù)的變化自動擴張和縮減,在任務(wù)執(zhí)行時間無限延長的極端情況下會創(chuàng)建過多的線程。
三種ExecutorService特性總結(jié)類型 | 核心線程數(shù) | 最大線程數(shù) | Keep Alive 時間 | 任務(wù)隊列 | 任務(wù)處理策略 |
---|---|---|---|---|---|
FixedThreadPool | 固定大小 | 固定大小(與核心線程數(shù)相同) | 0 | LinkedBlockingQueue | 線程池大小固定,沒有可用線程的時候任務(wù)會放入隊列等待,隊列長度無限制 |
SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 與 FixedThreadPool 相同,區(qū)別在于線程池的大小為1,適用于業(yè)務(wù)邏輯上只允許1個線程進(jìn)行處理的場景 |
CachedThreadPool | 0 | Integer.MAX_VALUE | 1分鐘 | SynchronousQueue | 線程池的數(shù)量無限大,新任務(wù)會直接分配或者創(chuàng)建一個線程進(jìn)行執(zhí)行 |
我們也可以通過修改 ThreadPoolExecutor 的構(gòu)造函數(shù)來自定義任務(wù)處理策略。例如面對的業(yè)務(wù)是將數(shù)據(jù)異步寫入HBase,當(dāng)HBase嚴(yán)重超時的時候允許寫入失敗并記錄日志以便事后補寫。對于這種應(yīng)用場景,如果使用FixedThreadPool,在HBase服務(wù)嚴(yán)重超時的時候會導(dǎo)致隊列無限增長,引發(fā)內(nèi)存問題;如果使用CachedThreadPool,會導(dǎo)致線程數(shù)量無限增長。對于這種場景,我們可以設(shè)置ExecutorService使用帶有長度限制的隊列以及限定最大線程個數(shù)的線程池,同時通過設(shè)置RejectedExecutionHandler處理任務(wù)被拒絕的情況。
首先定義 RejectedExecutionHandler:
public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 處理任務(wù)被拒絕的情況,例如記錄日志等 } }
創(chuàng)建 ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, //核心線程數(shù)設(shè)置成10 30, //線程池最大線程數(shù)為30 30, TimeUnit.SECONDS, //超過核心線程數(shù)量的線程idle 30秒之后會退出 new ArrayBlockingQueue(100), //隊列長度為100 new MyRejectedExecutionHandler() //任務(wù)被拒絕以后的處理類 );
這樣設(shè)置以后,如果任務(wù)處理函數(shù)出現(xiàn)長時間掛起的情況,會依次發(fā)生下列現(xiàn)象:
線程池線程數(shù)量達(dá)到核心線程數(shù),向ArrayBlockingQueue中放入任務(wù)
ArrayBlockingQueue達(dá)到上限,創(chuàng)建新的線程進(jìn)行處理
線程池中的線程數(shù)量達(dá)到30個,調(diào)用MyRejectedExecutionHandler處理新提交的任務(wù)
總結(jié)對于需要保證所有提交的任務(wù)都要被執(zhí)行的情況,使用FixedThreadPool
如果限定只能使用一個線程進(jìn)行任務(wù)處理,使用SingleThreadExecutor
如果希望提交的任務(wù)盡快分配線程執(zhí)行,使用CachedThreadPool
如果業(yè)務(wù)上允許任務(wù)執(zhí)行失敗,或者任務(wù)執(zhí)行過程可能出現(xiàn)執(zhí)行時間過長進(jìn)而影響其他業(yè)務(wù)的應(yīng)用場景,可以通過使用限定線程數(shù)量的線程池以及限定長度的隊列進(jìn)行容錯處理。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69864.html
摘要:限制線程池運行線程以及等待線程數(shù)量的策略對于所提供的,可以保證可以在內(nèi)存中有固定數(shù)量的線程數(shù)運行。指的是當(dāng)線程池拒絕該任務(wù)的時候,線程在本地線程直接。由此限制了線程池的等待線程數(shù)與執(zhí)行線程數(shù) 限制Java線程池運行線程以及等待線程數(shù)量的策略 對于java.util.concurrent.Executors所提供的FixedThreadPool,可以保證可以在內(nèi)存中有固定數(shù)量的線程數(shù)運行...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡單,我們只要把原來傳入類構(gòu)造器的對象傳入線程池的方法或者方法就可以了。 我們一般不會選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務(wù)提交給它,它就會在合...
摘要:去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用以下做個總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用,以下做個總結(jié)。關(guān)于線程之前也寫過一篇文章《高級面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
摘要:去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用以下做個總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用,以下做個總結(jié)。關(guān)于線程之前也寫過一篇文章《高級面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
閱讀 2389·2021-11-24 10:31
閱讀 3440·2021-11-23 09:51
閱讀 2254·2021-11-15 18:11
閱讀 2402·2021-09-02 15:15
閱讀 2464·2019-08-29 17:02
閱讀 2296·2019-08-29 15:04
閱讀 845·2019-08-29 12:27
閱讀 2869·2019-08-28 18:15