摘要:深入理解線程池線程池初探所謂線程池,就是將多個線程放在一個池子里面所謂池化技術(shù),然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程,然后執(zhí)行我們的任務(wù)。最后的的意思是需要確保線程池已經(jīng)被啟動起來了。
深入理解Java線程池 線程池初探
?所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術(shù)),然后需要線程的時候不是創(chuàng)建一個線程,而是從線程池里面獲取一個可用的線程,然后執(zhí)行我們的任務(wù)。線程池的關(guān)鍵在于它為我們管理了多個線程,我們不需要關(guān)心如何創(chuàng)建線程,我們只需要關(guān)系我們的核心業(yè)務(wù),然后需要線程來執(zhí)行任務(wù)的時候從線程池中獲取線程。任務(wù)執(zhí)行完之后線程不會被銷毀,而是會被重新放到池子里面,等待機(jī)會去執(zhí)行任務(wù)。
?我們?yōu)槭裁葱枰€程池呢?首先一點(diǎn)是線程池為我們提高了一種簡易的多線程編程方案,我們不需要投入太多的精力去管理多個線程,線程池會自動幫我們管理好,它知道什么時候該做什么事情,我們只要在需要的時候去獲取就可以了。其次,我們使用線程池很大程度上歸咎于創(chuàng)建和銷毀線程的代價是非常昂貴的,甚至我們創(chuàng)建和銷毀線程的資源要比我們實(shí)際執(zhí)行的任務(wù)所花費(fèi)的時間還要長,這顯然是不科學(xué)也是不合理的,而且如果沒有一個合理的管理者,可能會出現(xiàn)創(chuàng)建了過多的線程的情況,也就是在JVM中存活的線程過多,而存活著的線程也是需要銷毀資源的,另外一點(diǎn),過多的線程可能會造成線程過度切換的尷尬境地。
對線程池有了一個初步的認(rèn)識之后,我們來看看如何使用線程池。
// 創(chuàng)建一個線程池 ExecutorService executorService = Executors.newFixedThreadPool(1); // 提交任務(wù) executorService.submit(() -> System.out.println("run")); FuturestringFuture = executorService.submit(() -> "run"); // 創(chuàng)建一個調(diào)度線程池 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // 提交一個周期性執(zhí)行的任務(wù) scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("schedule"),0,1, TimeUnit.SECONDS); // shutdown executorService.shutdown(); scheduledExecutorService.shutdown();
可以發(fā)現(xiàn)使用線程池非常簡單,只需要極少的代碼就可以創(chuàng)建出我們需要的線程池,然后將我們的任務(wù)提交到線程池中去。我們只需要在結(jié)束之時記得關(guān)閉線程池就可以了。本文的重點(diǎn)并非在于如何使用線程池,而是試圖剖析線程池的實(shí)現(xiàn),比如一個調(diào)度線程池是怎么實(shí)現(xiàn)的?是靠什么實(shí)現(xiàn)的?為什么能這樣實(shí)現(xiàn)等等問題。
Java線程池實(shí)現(xiàn)架構(gòu)Java中與線程池相關(guān)的類都在java.util.concurrent包下,如下展示了一些:
Executor
ExecutorService
ScheduledExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executors
?通過上面一節(jié)中的使用示例,可以發(fā)現(xiàn)Executors類是一個創(chuàng)建線程池的有用的類,事實(shí)上,Executors類的角色也就是創(chuàng)建線程池,它是一個工廠類,可以產(chǎn)生不同類型的線程池。而Executor是線程池的鼻祖類,它有兩個子類是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor則是真正的線程池,我們的任務(wù)將被這兩個類交由其所管理者的線程池運(yùn)行,可以發(fā)現(xiàn),ScheduledThreadPoolExecutor是一個萬千寵愛于一身的類,下面我們可以看看它的類關(guān)系圖:
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,ThreadPoolExecutor實(shí)現(xiàn)了一般的線程池,沒有調(diào)度功能,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實(shí)現(xiàn),然后增加了調(diào)度功能。
最為原始的Executor只有一個方法execute,它接受一個Runnable類型的參數(shù),意思是使用線程池來執(zhí)行這個Runnable,可以發(fā)現(xiàn)Executor不提供有返回值的任務(wù)。ExecutorService繼承了Executor,并且極大的增強(qiáng)了Executor的功能,不僅支持有返回值的任務(wù)執(zhí)行,而且還有很多十分有用的方法來為你提供服務(wù),下面展示了ExecutorService提供的方法:
ScheduledExecutorService繼承了ExecutorService,并且增加了特有的調(diào)度(schedule)功能。關(guān)于Executor、ExecutorService和ScheduledExecutorService的關(guān)系,可以見下圖:
ThreadPoolExecutor解析總結(jié)一下,經(jīng)過我們的調(diào)研,可以發(fā)現(xiàn)其實(shí)對于我們編寫多線程代碼來說,最為核心的是Executors類,根據(jù)我們是需要ExecutorService類型的線程池還是ScheduledExecutorService類型的線程池調(diào)用相應(yīng)的工廠方法就可以了,而ExecutorService的實(shí)現(xiàn)表現(xiàn)在ThreadPoolExecutor上,ScheduledExecutorService的實(shí)現(xiàn)則表現(xiàn)在ScheduledThreadPoolExecutor上,下文將分別剖析這兩者,嘗試弄清楚線程池的原理。
?上文中描述了Java中線程池相關(guān)的架構(gòu),了解了這些內(nèi)容其實(shí)我們就可以使用java的線程池為我們工作了,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼,本節(jié)將分析ThreadPoolExecutor的實(shí)現(xiàn),來探索線程池的運(yùn)行原理。下面的圖片展示了ThreadPoolExecutor的類圖:
private final BlockingQueueworkQueue; // 任務(wù)隊(duì)列,我們的任務(wù)會添加到該隊(duì)列里面,線程將從該隊(duì)列獲取任務(wù)來執(zhí)行 private final HashSet workers = new HashSet ();//所有工作線程的集合,來消費(fèi)workQueue里面的任務(wù) private volatile ThreadFactory threadFactory;//線程工廠 private volatile RejectedExecutionHandler handler;//拒絕策略,默認(rèn)會拋出異常,還要其他幾種拒絕策略如下: 1、CallerRunsPolicy:在調(diào)用者線程里面運(yùn)行該任務(wù) 2、DiscardPolicy:丟棄任務(wù) 3、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù) private volatile int corePoolSize;//最下保活work數(shù)量 private volatile int maximumPoolSize;//work上限
我們嘗試執(zhí)行submit方法,下面是執(zhí)行的關(guān)鍵路徑,總結(jié)起來就是:如果Worker數(shù)量還沒達(dá)到上限則繼續(xù)創(chuàng)建,否則提交任務(wù)到workQueue,然后讓worker來調(diào)度運(yùn)行任務(wù)。
step 1:Future> submit(Runnable task); step 2: public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask; } step 3: void execute(Runnable command); step 4: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //提交我們的任務(wù)到workQueue int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界 reject(command); //還不行?拒絕提交的任務(wù) } step 5: private boolean addWorker(Runnable firstTask, boolean core) step 6: w = new Worker(firstTask); //包裝任務(wù) final Thread t = w.thread; //獲取線程(包含任務(wù)) workers.add(w); // 任務(wù)被放到works中 t.start(); //執(zhí)行任務(wù)
上面的流程是高度概括的,實(shí)際情況遠(yuǎn)比這復(fù)雜得多,但是我們關(guān)心的是怎么打通整個流程,所以這樣分析問題是沒有太大的問題的。觀察上面的流程,我們發(fā)現(xiàn)其實(shí)關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的,那么我們也就大概明白了線程池是怎么工作的了。下面分析一下Worker類。
上面的圖片展示了Worker的類關(guān)系圖,關(guān)鍵在于他實(shí)現(xiàn)了Runnable接口,所以問題的關(guān)鍵就在于run方法上。在這之前,我們來看一下Worker類里面的關(guān)鍵成員:
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; // 我們提交的任務(wù),可能被立刻執(zhí)行,也可能被放到隊(duì)列里面
thread是Worker的工作線程,上面的分析我們也發(fā)現(xiàn)了在addWorker中會獲取worker里面的thread然后start,也就是這個線程的執(zhí)行,而Worker實(shí)現(xiàn)了Runnable接口,所以在構(gòu)造thread的時候Worker將自己傳遞給了構(gòu)造函數(shù),thread.start執(zhí)行的其實(shí)就是Worker的run方法。下面是run方法的內(nèi)容:
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
我們來分析一下runWorker這個方法,這就是整個線程池的核心。首先獲取到了我們剛提交的任務(wù)firstTask,然后會循環(huán)從workQueue里面獲取任務(wù)來執(zhí)行,獲取任務(wù)的方法如下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
其實(shí)核心也就一句:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
?我們再回頭看一下execute,其實(shí)我們上面只走了一條邏輯,在execute的時候,我們的worker的數(shù)量還沒有到達(dá)我們設(shè)定的corePoolSize的時候,會走上面我們分析的邏輯,而如果達(dá)到了我們設(shè)定的閾值之后,execute中會嘗試去提交任務(wù),如果提交成功了就結(jié)束,否則會拒絕任務(wù)的提交。我們上面還提到一個成員:maximumPoolSize,其實(shí)線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize,但是我們上面的分析是corePoolSize,這是因?yàn)槲覀兊膒rivate boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來控制的,core為true則使用corePoolSize來設(shè)定邊界,否則使用maximumPoolSize來設(shè)定邊界。直觀的解釋一下,當(dāng)線程池里面的Worker數(shù)量還沒有到corePoolSize,那么新添加的任務(wù)會伴隨著產(chǎn)生一個新的worker,如果Worker的數(shù)量達(dá)到了corePoolSize,那么就將任務(wù)存放在阻塞隊(duì)列中等待Worker來獲取執(zhí)行,如果沒有辦法再向阻塞隊(duì)列放任務(wù)了,那么這個時候maximumPoolSize就變得有用了,新的任務(wù)將會伴隨著產(chǎn)生一個新的Worker,如果線程池里面的Worker已經(jīng)達(dá)到了maximumPoolSize,那么接下來提交的任務(wù)只能被拒絕策略拒絕了。可以參考下面的描述來理解:
* When a new task is submitted in method {@link #execute(Runnable)}, * and fewer than corePoolSize threads are running, a new thread is * created to handle the request, even if other worker threads are * idle. If there are more than corePoolSize but less than * maximumPoolSize threads running, a new thread will be created only * if the queue is full. By setting corePoolSize and maximumPoolSize * the same, you create a fixed-size thread pool. By setting * maximumPoolSize to an essentially unbounded value such as {@code * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary * number of concurrent tasks. Most typically, core and maximum pool * sizes are set only upon construction, but they may also be changed * dynamically using {@link #setCorePoolSize} and {@link * #setMaximumPoolSize}.
在此需要說明一點(diǎn),有一個重要的成員:keepAliveTime,當(dāng)線程池里面的線程數(shù)量超過corePoolSize了,那么超出的線程將會在空閑keepAliveTime之后被terminated。可以參考下面的文檔:
* If the pool currently has more than corePoolSize threads, * excess threads will be terminated if they have been idle for more * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).ScheduledThreadPoolExecutor解析
?ScheduledThreadPoolExecutor適用于延時執(zhí)行,或者周期性執(zhí)行的任務(wù)調(diào)度,ScheduledThreadPoolExecutor在實(shí)現(xiàn)上繼承了ThreadPoolExecutor,所以你依然可以將ScheduledThreadPoolExecutor當(dāng)成ThreadPoolExecutor來使用,但是ScheduledThreadPoolExecutor的功能要強(qiáng)大得多,因?yàn)镾cheduledThreadPoolExecutor可以根據(jù)設(shè)定的參數(shù)來周期性調(diào)度運(yùn)行,下面的圖片展示了四個和周期性相關(guān)的方法:
如果你想延時一段時間然后運(yùn)行一個Callable,那么使用的第一個方法
如果你想延時一段時間之后運(yùn)行一個Runnable,那么使用第二個方法;
如果你想要延時一段時間,然后根據(jù)設(shè)定的參數(shù)周期執(zhí)行Runnable,那么可以選擇第三個和第四個方法,第三個方法和第四個方法的區(qū)別在于:第三個方法嚴(yán)格按照規(guī)劃的時間路徑來執(zhí)行,比如周期為2,延時為0,那么執(zhí)行的序列為0,2,4,6,8....,而第四個方法將基于上次執(zhí)行時間來規(guī)劃下次的執(zhí)行,也就是在上次執(zhí)行完成之后再次執(zhí)行。比如上面的執(zhí)行序列0,2,4,6,8...,如果第2秒沒有被調(diào)度執(zhí)行,而在第三秒的時候才被調(diào)度,那么下次執(zhí)行的時間不是4,而是5,以此類推。
下面來看一下這四個方法的一些細(xì)節(jié):
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture t = decorateTask(callable, new ScheduledFutureTask (callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
通過上面的代碼我們可以發(fā)現(xiàn),前兩個方法是類似的,后兩個方法也是類似的。前兩個方法屬于一次性調(diào)度,所以period都為0,區(qū)別在于參數(shù)不同,一個是Runnable,而一個是Callable,可笑的是,最后都變?yōu)榱薈allable了,見下面的構(gòu)造函數(shù):
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
對于后兩個方法,區(qū)別僅僅在于period的,scheduleWithFixedDelay對參數(shù)進(jìn)行了操作,將原來的時間變?yōu)樨?fù)數(shù)了,而后面在計(jì)算下次被調(diào)度的時間的時候會根據(jù)這個參數(shù)的正負(fù)值來分別處理,正數(shù)代表scheduleAtFixedRate,而負(fù)數(shù)代表了scheduleWithFixedDelay。
一個需要被我們注意的細(xì)節(jié)是,以上四個方法最后都會調(diào)用一個方法: delayedExecute(t),下面看一下這個方法:
private void delayedExecute(RunnableScheduledFuture> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
大概的意思就是先判斷線程池是否被關(guān)閉了,如果被關(guān)閉了,則拒絕任務(wù)的提交,否則將任務(wù)加入到任務(wù)隊(duì)列中去等待被調(diào)度執(zhí)行。最后的ensurePrestart的意思是需要確保線程池已經(jīng)被啟動起來了。下面是這個方法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
主要是增加了一個沒有任務(wù)的worker,有什么用呢?我們還記得Worker的邏輯嗎?addWorker方法的執(zhí)行,會觸發(fā)Worker的run方法的執(zhí)行,然后runWorker方法就會被執(zhí)行,而runWorker方法是循環(huán)從workQueue中取任務(wù)執(zhí)行的,所以確保線程池被啟動起來是重要的,而只需要簡單的執(zhí)行addWorker便會觸發(fā)線程池的啟動流程。對于調(diào)度線程池來說,只要執(zhí)行了addWorker方法,那么線程池就會一直在后臺周期性的調(diào)度執(zhí)行任務(wù)。
到此,似乎我們還是沒有鬧明白ScheduledThreadPoolExecutor是如何實(shí)現(xiàn)周期性的,上面講到四個scheduled方法時,我們沒有提一個重要的類:ScheduledFutureTask,對,所有神奇的事情將會發(fā)生在這個類中,下面來分析一下這個類。
看上面的類圖,貌似這個類非常復(fù)雜,還好,我們發(fā)現(xiàn)他實(shí)現(xiàn)了Runnable接口,那么必然會有一個run方法,而這個run方法必然是整個類的核心,下面來看一下這個run方法的內(nèi)容:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
首先,判斷是否是周期性的任務(wù),如果不是,則直接執(zhí)行(一次性),否則執(zhí)行后,然后設(shè)置下次執(zhí)行的時間,然后重新調(diào)度,等待下次執(zhí)行。這里有一個方法需要注意,也就是setNextRunTime,上面我們提到scheduleAtFixedRate和scheduleWithFixedDelay在傳遞參數(shù)時不一樣,后者將delay值變?yōu)榱素?fù)數(shù),所以下面的處理正好印證了前文所述。
/** * Sets the next time to run for a periodic task. */ private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
下面來看一下reExecutePeriodic方法是如何做的,他的目標(biāo)是將任務(wù)再次被調(diào)度執(zhí)行,下面的代碼展示了這個功能的實(shí)現(xiàn):
void reExecutePeriodic(RunnableScheduledFuture> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
可以看到,這個方法就是將我們的任務(wù)再次放到了workQueue里面,那這個參數(shù)是什么?在上面的run方法中我們調(diào)用了reExecutePeriodic方法,參數(shù)為outerTask,而這個變量是什么?看下面的代碼:
/** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFutureouterTask = this;
這個變量指向了自己,而this的類型是什么?是ScheduledFutureTask,也就是可以被調(diào)度的task,這樣就實(shí)現(xiàn)了循環(huán)執(zhí)行任務(wù)了。
上面的分析已經(jīng)到了循環(huán)執(zhí)行,但是ScheduledThreadPoolExecutor的功能是周期性執(zhí)行,所以我們接著分析ScheduledThreadPoolExecutor是如何根據(jù)我們的參數(shù)走走停停的。這個時候,是應(yīng)該看一下ScheduledThreadPoolExecutor的構(gòu)造函數(shù)了,我們來看一個最簡單的構(gòu)造函數(shù):
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
我們知道ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,所以這里的super其實(shí)是ThreadPoolExecutor的構(gòu)造函數(shù),我們發(fā)現(xiàn)其中有一個參數(shù)DelayedWorkQueue,看名字貌似是一個延遲隊(duì)列的樣子,進(jìn)一步跟蹤代碼,發(fā)現(xiàn)了下面的一行代碼(構(gòu)造函數(shù)中):
this.workQueue = workQueue;
所以在ScheduledThreadPoolExecutor中,workQueue是一個DelayedWorkQueue類型的隊(duì)列,我們暫且認(rèn)為DelayedWorkQueue是一種具備延遲功能的隊(duì)列吧,那么,到此我們便可以想明白了,上面的分析我們明白了ScheduledThreadPoolExecutor是如何循環(huán)執(zhí)行任務(wù)的,而這里我們明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue來達(dá)到延遲的目標(biāo),所以組合起來,就可以實(shí)現(xiàn)ScheduledThreadPoolExecutor周期性執(zhí)行的目標(biāo)。下面我們來看一下DelayedWorkQueue是如何做到延遲的吧,上文中提到一個方法:getTask,這個方法的作用是從workQueue中取出任務(wù)來執(zhí)行,而在ScheduledThreadPoolExecutor里面,getTask方法是從DelayedWorkQueue中取任務(wù)的,而取任務(wù)無非兩個方法:poll或者take,下面我們對DelayedWorkQueue的take方法來分析一下:
public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don"t retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
在for循環(huán)里面,首先從queue中獲取第一個任務(wù),然后從任務(wù)中取出延遲時間,而后使用available變量來實(shí)現(xiàn)延遲效果。這里面需要幾個點(diǎn)需要探索一下:
這個queue是什么東西?
延遲時間的來龍去脈?
available變量的來龍去脈?
對于第一個問題,看下面的代碼:
`
private RunnableScheduledFuture>[] queue = new RunnableScheduledFuture>[INITIAL_CAPACITY];`
它是一個RunnableScheduledFuture類型的數(shù)組,下面是RunnableScheduledFuture類的類關(guān)系圖:
數(shù)組里面保存了我們的RunnableScheduledFuture,對queue的操作,主要來看一下增加元素和消費(fèi)元素的操作。首先,假設(shè)使用add方法來增加RunnableScheduledFuture到queue,調(diào)用的鏈路如下:
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
解釋一下,add方法直接轉(zhuǎn)到了offer方法,該方法中,首先判斷數(shù)組的容量是否足夠,如果不夠則grow,增長的策略如下:
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增長50%。增長完成后,如果這是第一個元素,則放在坐標(biāo)為0的位置,否則,使用siftUp操作,下面是該方法的內(nèi)容:
private void siftUp(int k, RunnableScheduledFuture> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
這個數(shù)組實(shí)現(xiàn)了堆這種數(shù)據(jù)結(jié)構(gòu),使用對象比較將最需要被調(diào)度執(zhí)行的RunnableScheduledFuture放到數(shù)組的前面,而這得力于compareTo方法,下面是RunnableScheduledFuture類的compareTo方法的實(shí)現(xiàn),主要是通過延遲時間來做比較。
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
上面是生產(chǎn)元素,下面來看一下消費(fèi)數(shù)據(jù)。在上面我們提到的take方法中,使用了一個方法如下:
private RunnableScheduledFuture> finishPoll(RunnableScheduledFuture> f) { int s = --size; RunnableScheduledFuture> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
這個方法中調(diào)用了一個方法siftDown,這個方法如下:
private void siftDown(int k, RunnableScheduledFuture> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
對其的解釋就是:
Replaces first element with last and sifts it down. Call only when holding lock.
總結(jié)一下,當(dāng)我們向queue插入任務(wù)的時候,會發(fā)生siftUp方法的執(zhí)行,這個時候會把任務(wù)盡量往根部移動,而當(dāng)我們完成任務(wù)調(diào)度之后,會發(fā)生siftDown方法的執(zhí)行,與siftUp相反,siftDown方法會將任務(wù)盡量移動到queue的末尾。總之,大概的意思就是queue通過compareTo實(shí)現(xiàn)了類似于優(yōu)先級隊(duì)列的功能。
下面我們來看一下第二個問題:延遲時間的來龍去脈。在上面的take方法里面,首先獲取了delay,然后再使用available來做延遲效果,那這個delay從哪里來的呢?通過上面的類圖RunnableScheduledFuture的類圖我們知道,RunnableScheduledFuture類實(shí)現(xiàn)了Delayed接口,而Delayed接口里面的唯一方法是getDelay,我們到RunnableScheduledFuture里面看一下這個方法的具體實(shí)現(xiàn):
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
time是我們設(shè)定的下次執(zhí)行的時間,所以延遲就是(time - now()),沒毛病!
第三個問題:available變量的來龍去脈,至于這個問題,我們看下面的代碼:
/** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. */ private final Condition available = lock.newCondition();
這是一個條件變量,take方法里面使用這個變量來做延遲效果。Condition可以在多個線程間做同步協(xié)調(diào)工作,更為具體細(xì)致的關(guān)于Condition的內(nèi)容,可以參考更多的資料來學(xué)習(xí),本文對此知識點(diǎn)點(diǎn)到為止。
到此為止,我們梳理了ScheduledThreadPoolExecutor是如何實(shí)現(xiàn)周期性調(diào)度的,首先分析了它的循環(huán)性,然后分析了它的延遲效果。
本文到此也就結(jié)束了,對于線程池的學(xué)習(xí)現(xiàn)在才剛剛起步,需要更多更專業(yè)的知識類幫我理解更為底層的內(nèi)容,當(dāng)然,為了更進(jìn)一步理解線程池的實(shí)現(xiàn)細(xì)節(jié),首先需要對線程間通信有足夠的把握,其次是要對各種數(shù)據(jù)結(jié)構(gòu)有清晰的認(rèn)識,比如隊(duì)列、優(yōu)先級隊(duì)列、堆等高級的數(shù)據(jù)結(jié)構(gòu),以及java語言對于這些數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn),更為重要的是要結(jié)合實(shí)際情況分析問題,在工作和平時的學(xué)習(xí)中不斷總結(jié),不斷迭代對于線程、線程池的認(rèn)知。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67663.html
摘要:虛擬機(jī)運(yùn)行時數(shù)據(jù)區(qū)分為以下幾個部分。程序計(jì)數(shù)器也是在虛擬機(jī)規(guī)范中唯一沒有規(guī)定任何異常情況的區(qū)域。在方法運(yùn)行期間不會改變局部變量表的大小。長度在位和位的虛擬機(jī)中,分別為官方稱它為。 Java虛擬機(jī)運(yùn)行時數(shù)據(jù)區(qū) 詳解 2.1 概述 本文參考的是周志明的 《深入理解Java虛擬機(jī)》第二章 ,為了整理思路,簡單記錄一下,方便后期查閱。 2.2 運(yùn)行時數(shù)據(jù)區(qū)域 Java虛擬機(jī)在Java程序運(yùn)行時...
摘要:運(yùn)行時數(shù)據(jù)區(qū)域的學(xué)習(xí),是學(xué)習(xí)以及機(jī)制的基礎(chǔ),也是深入理解對象創(chuàng)建及運(yùn)行過程的前提。了解內(nèi)存區(qū)域劃分,是學(xué)習(xí)概念的前提。 Java 運(yùn)行時數(shù)據(jù)區(qū)域的學(xué)習(xí),是學(xué)習(xí) jvm 以及 GC 機(jī)制的基礎(chǔ),也是深入理解 java 對象創(chuàng)建及運(yùn)行過程的前提。廢話不多說,直接進(jìn)入正題: 一張圖總結(jié) showImg(https://segmentfault.com/img/bVOMAn?w=685&h=5...
摘要:從使用到原理學(xué)習(xí)線程池關(guān)于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實(shí)現(xiàn)在軟件開發(fā)中,分散于應(yīng)用中多出的功能被稱為橫切關(guān)注點(diǎn)如事務(wù)安全緩存等。 Java 程序媛手把手教你設(shè)計(jì)模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經(jīng)風(fēng)雨慢慢變老,回首走過的點(diǎn)點(diǎn)滴滴,依然清楚的記得當(dāng)初愛情萌芽的模樣…… Java 進(jìn)階面試問題列表 -...
閱讀 1864·2023-04-25 23:28
閱讀 572·2023-04-25 22:49
閱讀 2253·2021-09-27 13:34
閱讀 5210·2021-09-22 15:09
閱讀 3615·2019-08-30 12:52
閱讀 2746·2019-08-29 15:26
閱讀 664·2019-08-29 11:12
閱讀 2198·2019-08-26 12:24