摘要:表示一個異步任務的結果,就是向線程池提交一個任務后,它會返回對應的對象。它們分別提供兩個重要的功能阻塞當前線程等待一段時間直到完成或者異常終止取消任務。此時,線程從中返回,然后檢查當前的狀態已經被改變,隨后退出循環。
0 引言
前段時間需要把一個C++的項目port到Java中,因此時隔三年后重新熟悉了下Java。由于需要一個通用的線程池,自然而然就想到了Executors。
用了后,感覺很爽... 于是忍不住摳了下源碼。因此就有了這篇學習筆記。
言歸正傳,Java Executor是一個功能豐富,接口設計很好的,基于生產者-消費者模式的通用線程池。這種線程池的設計思想也在很多地方被應用。
在這篇文章中,我并不打算介紹java線程池的使用,生產者-消費者模式,并發編程基本概念等。
通常來說,一個線程池的實現包括四個部分:
執行任務的線程
用于封裝任務的task對象
存儲任務的數據結構
線程池本身
1 ThreadThread 并不是concurrent包的一部分。Thread包含著name, priority等成員和對應的操作方法。
它是繼承自runable的,也就是說線程的入口函數是run。它的繼承體系和重要操作函數如下圖:
它實現了一系列包括sleep, yield等靜態方法。以及獲取當前線程的靜態方法currentThread()。這些都是native方法。
值得注意的是它的中斷機制(雖然它也實現了suspend和resume方法,但是這兩個方法已被棄用):
通過調用interrupt來觸發一個中斷
isInterrupted() 用來查詢線程的中斷狀態
interrupted() 用來查詢并清除線程的中斷狀態
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); }
在默認的情況下,blocker (Interruptible 成員變量)的值為null, 這時調用interrupt,僅僅是調用interrupt0設置一個標志位。
而如果blocker的值不為null,則會調用其interrupt方法實現真正的中斷。
(關于blocker值何時被設置,在后面會看到一個使用場景。)
當線程處于可中斷的阻塞狀態時,比如說阻塞在sleep, wait, join,select等操作時,調用interrupt方法會讓線程從阻塞狀態退出,并拋出InterruptedException。
值得注意的一點是:interrupt讓我們從阻塞的方法中退出,但線程的中斷狀態卻并不會被設置!
try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted()); }
如上述示例代碼,此時你得到的輸出是: IsInterrupted : false 。這是一個有點令人意外的地方。
上述代碼并不是一個好的示例,因為interrupt被我們“吃”掉了!除非你明確的知道這是你想要的。否則的話請考慮在異常捕獲中(catch段中)加上:
Thread.currentThread.interrupt();2. Task
Java可執行的接口類有兩種,Runnable和Callable,它們的區別是Callable可以帶返回值,一個需要實現Run()方法,另一個需要實現帶返回值的Call() 方法。
在java.util.concurret中還有另外一個接口類Future。
Future表示一個異步任務的結果,就是user code向線程池提交一個任務后,它會返回對應的 Future對象。用以觀察任務執行的狀態(isCancelled, isDone),取消任務(Cancel)或者等待任務執行(get, timeout get)。
如上圖,RunnableFuture是一個中間類,它將Runnable和Future的功能糅合到一起。FutureTask 則是真正的實現。
FutureTaskFutureTask可以從一個Runnable和Callable構造,當通過Runnable構造時,它會調用Excutors.callable接口將其轉為Callable對象保存起來。
從上面的類圖中可以看出,FutureTask除了簡單的狀態查詢等接口外,還具有兩個重要的接口:get() 和 get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)。
它們分別提供兩個重要的功能:阻塞(當前線程)等待(一段時間)直到task完成或者異常終止;取消任務。
任務取消一個任務具有三種狀態:尚未運行,正在運行,已經執行完畢。
在調用cancel后,如果任務處于已經執行完畢了,則不需要做任何事情直接返回;
如果任務尚未運行,將其狀態設為cancelled;
如果任務正在執行,而且user以cancel(true)的方式取消這個任務。那么FutureTask會通過調用Thread.interrupt來終止當前任務。
public boolean cancel(boolean mayInterruptIfRunning) { // 任務已經完成或者被中斷等其他狀態 if (state != NEW) return false; if (mayInterruptIfRunning) { // 正在運行,或者尚未運行 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } // 設置cancel標志位 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
注意到: FutureTask并沒有一個RUNNING的狀態來標識該任務正在執行。正常的情況下,任務從開始創建直到運行完畢,這段過程的狀態都是NEW。
阻塞等待user code可以調用get() 接口等待任務完成或者調用get(long, TimeUnit)等待一段時間。但get()接口被調用,當前的線程將被掛起,直到條件滿足(任務完成或者異常退出)。
在前文中我們了解到,Thread并沒有提供掛起和阻塞的方法。在這里,Java利用LockSupport類來實現目的。(我猜測其中用了類似條件變量的方法來實現)。
parkLockSupport也屬于concurrent。FutureTask利用它的park (parkNanos)和unpark方法來實現線程的掛起和恢復:
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }
其中parkNanos跟park方法并無本質區別,只是多了一個timeout參數。FutureTask分別用它們來實現get和timeout的get。
注意到上面的setBlocker方法了嗎?沒錯,它就是給在上文Thread.interrupt方法中出現過的Thread成員變量blocker賦值。從這我們可以看出,它是可中斷的。
而它真正實現掛起的則是依賴unsafe類。unsafe類在concurrent中頻繁出現,但sun去并不建議使用它。
它除了提供park,unpark方法外,還提供了一些內存和同步原語。比如CAS等。
多個等待者調用get()的線程可以是一個,也可以是多個。為了能夠在恰當的時機將它們一一恢復,FutureTask內部需要維護一個鏈表來記錄所有的等待線程:waiters.
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }get 全貌
至此,我們終于了解get的全貌了。get會調用awaitDone方法來實現阻塞。當然,只有兩個狀態需要處理:NEW, COMPLETING。
NEW的狀態在前文已經有介紹過。COMPLETING狀態通常持續較短,在FutureTask 內部的callable 的call方法調用完畢后,會需要將call的返回值設置到outcome這個成員變量。隨后將狀態設為NORMAL。這期間的狀態就是COMPLETING。
顯而易見,對于這種狀態,我們只需要調用yield讓出線程資源,使得FutureTask完成這一過程即可。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet 2 Thread.yield(); else if (q == null) // 3 q = new WaitNode(); else if (!queued) // 4 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 5 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 6 LockSupport.park(this); } }
當任務處于NEW狀態正在被執行時,其他線程調用get而進入awaitdone函數。
此時的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。
它會首先分配一個WaitNode對象 --> 把它插入到waiters鏈表的表頭 --> 然后開始等待。那么park函數何時返回呢?
對應的unpark被調用(或者在這之前已經被調用)
如果設置了timeout的,會在時間到達后退出。
被中斷。
其他異常。
等待線程恢復當任務執行完畢(或者被cancel)時,FutureTask會調用最終調用finishcompletion,改函數會改變FutureTask狀態,并調用LockSupport.unpark方法。
此時,awaitDone線程從park中返回,然后檢查當前的狀態已經被改變,隨后退出for循環。
線程安全FutureTask是會被多個線程訪問的,涉及到臨界區的保護,但是其內部卻并沒有任何的鎖操作。而在該類定義的末尾,有這樣的代碼。
private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
這段代碼會在類被加載時執行一次。注意到它利用getDeclaredField反射機制來保存了三個offset:
stateOffset,runnerOffset,waitersOffset分別對應著state,runner,waiters這三個成員的偏移量。
FutureTask真是對這三個成員變量進行CAS操作來保證原子性和無鎖化的。實現CAS的類正是上文出現過的sun.misc.Unsafe類。
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
第一個參數是對象指針,第二個是偏移量,第三個是舊值,最后一個是新值。詳細可參考Unsafe文檔。
3. BlockingQueuejava實現了生產者-消費者模式的隊列。由于隊列的容量有限,因此涉及到在隊列為空的時候取task和在隊列已滿的時候存task的策略,連同一系列的查詢函數一起,BlockingQueue包含著11個靜態方法。
BlockingQueue只是一個interface,它的實現類包括鏈表方式的LinkedBlockingQueue 、數組方式的ArrayBlockingQueue以及PriorityBlockingQueue等。
LinkedBlockingQueue下面以LinkedBlockingQueue為例來了解一下它的實現。
LinkedBlockingQueue是一個FIFO的隊列,它真正用來存儲元素的節點類型是Node :
static class Node{ E item; Node next; Node(E x) { item = x; } }
對應的,在LinkedBlockingQueue中保存了頭節點和尾節點 :
/** * Head of linked list. * Invariant: head.item == null */ private transient Nodehead; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node last;
在LinkedBlockingQueue中,Java使用了雙鎖機制,分別對頭節點和尾節點加鎖。這樣取和存的操作就可以同時進行了。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
以take為例,獲取并移除此隊列的頭部,在元素變得可用之前一直等待(可被打斷)。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
它將會一直阻塞在notEmpty.await()上,直到信號到達或者被中斷。注意到它只需要對takeLock加鎖,而無需對putLock加鎖。
相應的,put操作也只需要鎖上putLock就可以了。
有的操作則需要兩個鎖都鎖上,比如說remove,因為我們不確定要刪除的元素的位置。
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Nodetrail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
可以看到LinkedBlockingQueue 并沒有直接調用lock,而是通過fullyLock和fullyUnLock來加解鎖以保證一致性,避免死鎖:
/** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
當然,雙鎖隊列在插入第一個元素和最后一個元素出隊的時候會有沖突。這里的解決辦法是加了一個哨兵,開始的時候,頭尾節點都指向這個哨兵,在隨后的操作中,頭結點始終指向哨兵,而尾節點指向真正有效的值。
4. Executors 類結構有了前面這些零件,我們就可以開始組裝線程池對象了。java里面Executors的真正實現類主要包括兩個ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor通過實現其基類ScheduledExecutorService擴展了ThreadPoolExecutor類。
SheduledExecutorsService主要用于執行周期性的或者定時的任務。其他情況下我們更多使用ThreadPoolExecutor。
ThreadPoolExecutorThreadPoolExecutor總共有七個構造參數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
從其注釋和參數名不難猜測各個參數的用途。唯一有點麻煩的是corePoolSize, maximumPoolSize這兩個參數的區別。你可以參考這里或者這里。
但大多數情況我們并不需要直接調用它的構造函數,在Executors里面定義了一系列的靜態方法供我們使用。包括newFixedThreadPool、newSingleThreadExecutor等。
由于ThreadPoolExecutor是一個通用的線程池,因此它需要為各種各樣的情況預留足夠的接口。ThreadPoolExecutor除了提供豐富的接口外,還提供了一些“什么都不做”的函數,為user預留接口。
比如每個任務在執行之前會調用beforeExecute,執行完畢后又會調用afterExecute。又比如terminate用來通知用戶代碼該線程將要結束。
這些接口java都提供了及其豐富的文檔。
Executor接口設計的目的或許也在于此,為簡單的情況提供盡量簡單的使用方法,同時為復雜的情況或者說高級用戶提供足夠多的接口。
一個不用擔心的問題在最初使用ThreadPoolExecutor 時候,用到FutrueTask的cancel接口,我總是擔心一個問題:
由于cancel是依賴線程的interrupt方法來實現的,也就是說cancel的狀態保持在線程中而不是task中。那么當這個線程執行下一個task會不會被影響?為了驗證這一點,我做了個小小的實驗:
public class InterruptTest { public static class MyTask implements Runnable { @Override public void run() { System.out.println(Thread.currentThread()); System.out.println("before interrupt " + Thread.currentThread().isInterrupted()); Thread.currentThread().interrupt(); System.out.println("after interrupt " + Thread.currentThread().isInterrupted()); } } public static void main(String[] str) { ExecutorService service = Executors.newFixedThreadPool(1); // MyTask task1 = new MyTask(); Future> future1 = service.submit(new InterruptTest.MyTask()); Future> future2 = service.submit(new InterruptTest.MyTask()); } }
輸出結果說明,我的擔心是多余的:
Thread[pool-1-thread-1,5,main] before interrupt false after interrupt true Thread[pool-1-thread-1,5,main] before interrupt false after interrupt true
其關鍵代碼就在ThreadPoolExecutor.runWorker 方法中,線程的中斷狀態會被清除(shutDown例外)。
final void runWorker(Worker w) { ... // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); ... }
參見 SO 的提問
其中Executors還有很多的東西,但是看看文章的長度,我決定把那些關于Executors的筆記先“藏”起來。
如果感興趣的可以翻看源碼: ThreadFactory, RejectHandler, worker, task, shutDown策略,鎖機制... 看看ThreadPoolExecutor 把這些積木堆成一個房子的吧。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/65307.html
摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:并不會為每個任務都創建工作線程,而是根據實際情況構造線程池時的參數確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:線程池常見實現線程池一般包含三個主要部分調度器決定由哪個線程來執行任務執行任務所能夠的最大耗時等線程隊列存放并管理著一系列線程這些線程都處于阻塞狀態或休眠狀態任務隊列存放著用戶提交的需要被執行的任務一般任務的執行的即先提交的任務先被執行調度 線程池常見實現 線程池一般包含三個主要部分: 調度器: 決定由哪個線程來執行任務, 執行任務所能夠的最大耗時等 線程隊列: 存放并管理著一系列線...
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上越來越遠。于是乎,我決定:繼續保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
閱讀 963·2021-11-17 09:33
閱讀 422·2019-08-30 11:16
閱讀 2476·2019-08-29 16:05
閱讀 3356·2019-08-29 15:28
閱讀 1401·2019-08-29 11:29
閱讀 1956·2019-08-26 13:51
閱讀 3393·2019-08-26 11:55
閱讀 1213·2019-08-26 11:31