摘要:本文首發于一世流云的專欄一模式簡介模式是多線程設計模式中的一種常見模式,它的主要作用就是異步地執行任務,并在需要的時候獲取結果。二中的模式在多線程基礎之模式中,我們曾經給出過模式的通用類關系圖。
本文首發于一世流云的專欄:https://segmentfault.com/blog...一、Future模式簡介
Future模式是Java多線程設計模式中的一種常見模式,它的主要作用就是異步地執行任務,并在需要的時候獲取結果。我們知道,一般調用一個函數,需要等待函數執行完成,調用線程才會繼續往下執行,如果是一些計算密集型任務,需要等待的時間可能就會比較長。
筆者在讀書期間曾參與過一個國家電網的復雜水電系統的聯合優化調度項目,需要在工業上可接受的時間內計算出整個云南地區近40座大型水電站的日發電計劃。
在電力系統中日發電計劃的制定非常重要,同時又涉及水利學、經濟學、電氣工程、政治政策等諸多復雜約束條件,工業上基本都是通過混合整數規劃、動態規劃再結合其它數學規劃方法建模求解,模型涉及的變量基本都是百萬級以上。
試想一下,這種復雜的計算模型,假設我把它封裝到一個函數中,由調用方進行單線程調用,需要等待多少時間?如果將模型集成到UI,用戶在界面上點擊一下計算,那可能用戶基本就認為應用假設死崩潰了。
在Java中,一種解決辦法是由調用線程新建一個線程執行該任務,比如下面這樣:
public void calculate(){ Thread t = new Thread(new Runnable() { @Override public void run() { model.calculate(); } }); t.start(); }
但是,這樣有一個問題,我拿不到計算結果,也不知道任務到底什么時候計算結束。我們來看下Future模式是如何來解決的。
Future模式,可以讓調用方立即返回,然后它自己會在后面慢慢處理,此時調用者拿到的僅僅是一個憑證,調用者可以先去處理其它任務,在真正需要用到調用結果的場合,再使用憑證去獲取調用結果。這個憑證就是這里的Future。
我們看下泳道圖來理解下兩者的區別:
傳統的數據獲取方式:
Future模式下的數據獲取:
如果讀者對經濟學有些了解,或是了解金融衍生品的話,對Future這個單詞應該不會陌生,Future在經濟學中出現的頻率相當之高,比如關于現金流的折算,其中的終值,英文就是Future value。常見的金融衍生品,期貨、遠期的英文分別是Futures、Financial future。
我們之前說了,Future模式可以理解為一種憑證,拿著該憑證在將來的某個時間點可以取到我想要的東西,這其實就和期貨、遠期有點類似了,期貨、遠期也是雙方制定協議或合同,然后在約定的某個時間點,拿著合同進行資金或實物的交割。可見,Future模式的命名是很有深意且很恰當的。
二、J.U.C中的Future模式在Java多線程基礎之Future模式中,我們曾經給出過Future模式的通用類關系圖。本章中,我不想教科書般得再貼一遍該圖,而是希望能循序漸進地帶領讀者去真正理解Future模式中的各個組件,去思考為什么Future模式的類關系圖是那樣,為什么一定就是那么幾個組件?
真實的任務類首先來思考下,我們需要執行的是一個任務,那么在Java中,一般需要實現Runnable接口,比如像下面這樣:
public class Task implements Runnable { @Override public void run() { // do something } }
但是,如果我需要任務的返回結果呢,從Runnable的接口定義來看,并不能滿足我們的要求,Runnable一般僅僅用于定義一個可以被線程執行的任務,它的run方法沒有返回值:
public interface Runnable { public abstract void run(); }
于是,JDK提供了另一個接口——Callable,表示一個具有返回結果的任務:
public interface Callable{ V call() throws Exception; }
所以,最終我們自定義的任務類一般都是實現了Callable接口。以下定義了一個具有復雜計算過程的任務,最終返回一個Double值:
public class ComplexTask implements Callable憑證{ @Override public Double call() { // complex calculating... return ThreadLocalRandom.current().nextDouble(); } }
第一節講到,Future模式可以讓調用方獲取任務的一個憑證,以便將來拿著憑證去獲取任務結果,憑證需要具有以下特點:
在將來某個時間點,可以通過憑證獲取任務的結果;
可以支持取消。
從以上兩點來看,我們首先想到的方式就是對Callable任務進行包裝,包裝成一個憑證,然后返回給調用方。
J.U.C提供了Future接口和它的實現類——FutureTask來滿足我們的需求,我們可以像下面這樣對之前定義的ComplexTask包裝:
ComplexTask task = new ComplexTask(); Futurefuture = new FutureTask (task);
上面的FutureTask就是真實的“憑證”,Future則是該憑證的接口(從面向對象的角度來講,調用方應面向接口操作)。
Future接口的定義:
public interface Future{ ? boolean cancel(boolean mayInterruptIfRunning); ? boolean isCancelled(); ? boolean isDone(); ? V get() throws InterruptedException, ExecutionException; ? V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future接口很簡單,提供了isCancelled和isDone兩個方法監控任務的執行狀態,一個cancel方法用于取消任務的執行。兩個get方法用于獲取任務的執行結果,如果任務未執行完成,除非設置超時,否則調用線程將會阻塞。
此外,為了能夠被線程或線程池執行任務,憑證還需要實現Runnable接口,所以J.U.C還提供了一個RunnableFuture接口,其實就是組合了Runnable和Future接口:
public interface RunnableFutureextends Runnable, Future { void run(); }
上面提到的FutureTask,其實就是實現了RunnableFuture接口的“憑證”:
public class FutureTaskimplements RunnableFuture { ? public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // ... } ? public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // ... } }
從構造函數可以看到,FutureTask既可以包裝Callable任務,也可以包裝Runnable任務,但最終都是將Runnable轉換成Callable任務,其實是一個適配過程。
調用方最終,調用方可以以下面這種方式使用Future模式,異步地獲取任務的執行結果。
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException { ComplexTask task = new ComplexTask(); Futurefuture = new FutureTask (task); // time passed... Double result = future.get(); }
}
通過上面的分析,可以看到,整個Future模式其實就三個核心組件:
真實任務/數據類(通常任務執行比較慢,或數據構造需要較長時間),即示例中的ComplexTask
Future接口(調用方使用該憑證獲取真實任務/數據的結果),即Future接口
Future實現類(用于對真實任務/數據進行包裝),即FutureTask實現類
三、FutureTask原理在J.U.C提供的Future模式中,最重要的就是FutureTask類,FutureTask是在JDK1.5時,隨著J.U.C一起引入的,它代表著一個異步任務,這個任務一般提交給Executor執行,當然也可以由調用方直接調用run方法運行。
既然是任務,就有狀態,FutureTask一共給任務定義了7種狀態:
NEW:表示任務的初始化狀態;
COMPLETING:表示任務已執行完成(正常完成或異常完成),但任務結果或異常原因還未設置完成,屬于中間狀態;
NORMAL:表示任務已經執行完成(正常完成),且任務結果已設置完成,屬于最終狀態;
EXCEPTIONAL:表示任務已經執行完成(異常完成),且任務異常已設置完成,屬于最終狀態;
CANCELLED:表示任務還沒開始執行就被取消(非中斷方式),屬于最終狀態;
INTERRUPTING:表示任務還沒開始執行就被取消(中斷方式),正式被中斷前的過渡狀態,屬于中間狀態;
INTERRUPTED:表示任務還沒開始執行就被取消(中斷方式),且已被中斷,屬于最終狀態。
各個狀態之間的狀態轉換圖如下:
上圖需要注意的是兩點:
FutureTask雖然支持任務的取消(cancel方法),但是只有當任務是初始化(NEW狀態)時才有效,否則cancel方法直接返回false;
當執行任務時(run方法),無論成功或異常,都會先過渡到COMPLETING狀態,直到任務結果設置完成后,才會進入響應的終態。
JDK1.7之前,FutureTask通過內部類實現了AQS框架來實現功能。 JDK1.7及以后,則改變為直接通過Unsafe類CAS操作state狀態字段來進行同步。構造
FutureTask在構造時可以接受Runnable或Callable任務,如果是Runnable,則最終包裝成Callable:
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
上述的Executors.callable()方法我們在executors框架概述提到過,其實就是對Runnable對象做了適配,返回Callable適配對象——RunnableAdapter:
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); }
static final class RunnableAdapterimplements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
FutureTask的字段定義非常簡單,State標識任務的當前狀態,狀態之間的轉換通過Unsafe來操作,所有操作都基于自旋+CAS完成:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ? private Callablecallable; // 真正的任務 private volatile Thread runner; // 保存正在執行任務的線程 ? /** * 記錄結果或異常 */ private Object outcome; ? /** * 無鎖棧(Treiber stack) * 保存等待線程 */ private volatile WaitNode waiters;
注意waiters這個字段,waiters指向一個“無鎖棧”,該棧保存著所有等待線程,我們知道當調用FutureTask的get方法時,如果任務沒有完成,則調用線程會被阻塞,其實就是將線程包裝成WaitNode結點保存到waiters指向的棧中:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; ? WaitNode() { thread = Thread.currentThread(); } }任務的運行
FutureTask的運行就是調用了run方法:
public void run() { // 僅當任務為NEW狀態時, 才能執行任務 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
上述方法,首先判斷當前任務的state是否等于NEW,如果不為NEW則說明任務或者已經執行過,或者已經被取消,直接返回。
正常執行完成后,會調用set方法設置任務執行結果:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
如果任務執行過程中拋出異常,則調用setException設置異常信息:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }任務的取消
cancel方法用于取消任務,參數mayInterruptIfRunning如果為true,表示中斷正在執行任務的線程,否則僅僅是將任務狀態置為CANCELLED :
public boolean cancel(boolean mayInterruptIfRunning) { // 僅NEW狀態下可以取消任務 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { // 中斷任務 try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
任務取消后,最終調用finishCompletion方法,釋放所有在棧上等待的線程:
/** * 喚醒棧上的所有等待線程. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } ? done(); // 鉤子方法 callable = null; // to reduce footprint }結果獲取
FutureTask可以通過get方法獲取任務結果,如果需要限時等待,可以調用get(long timeout, TimeUnit unit)。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 映射任務執行結果 }
可以看到,如果當前任務的狀態是NEW或COMPLETING,會調用awaitDone阻塞線程。否則會認為任務已經完成,直接通過report方法映射結果:
/** * 將同步狀態映射為執行結果. */ private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); }
report會根據任務的狀態進行映射,如果任務是Normal狀態,說明正常執行完成,則返回任務結果;如果任務被取消(CANCELLED或INTERRUPTED),則拋出CancellationException;其它情況則拋出ExecutionException。
四、ScheduledFutureTask在ScheduledThreadPoolExecutor一節中,我們曾經介紹過另一種FutureTask——ScheduledFutureTask,ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認調度任務類。
ScheduledFutureTask在普通FutureTask的基礎上增加了周期執行/延遲執行的功能。通過下面的類圖可以看到,它其實是通過繼承FutureTask和Delayed接口來實現周期/延遲功能的。
ScheduledFutureTask(Callablecallable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
ScheduledFutureTask的源碼非常簡單,基本都是委托FutureTask來實現的,關鍵是看下運行任務的方法:
public void run() { boolean periodic = isPeriodic(); // 是否是周期任務 if (!canRunInCurrentRunState(periodic)) // 能否運行任務 cancel(false); else if (!periodic) // 非周期任務:調用FutureTask的run方法運行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 周期任務:調用FutureTask的runAndReset方法運行 setNextRunTime(); reExecutePeriodic(outerTask); } }
FutureTask的runAndReset方法與run方法的區別就是當任務正常執行完成后,不會設置任務的最終狀態(即保持NEW狀態),以便任務重復執行:
protected boolean runAndReset() { // 僅NEW狀態的任務可以執行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable五、總結c = callable; if (c != null && s == NEW) { try { c.call(); // don"t set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
本章我們從源頭開始,講解了Future模式的來龍去脈,并以J.U.C中的Future模式為例,分析了Future模式的組件以及核心實現類——FutureTask,最后回顧了ScheduledFutureTask中定義的內部異步任務類——ScheduledFutureTask。
理解Future模式的關鍵就是理解它的兩個核心組件,可以類比生活中的憑證來理解這一概念,不必拘泥于Java多線程設計模式中Future模式的類關系圖。
真實任務類
憑證
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/71825.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:注意線程與本地操作系統的線程是一一映射的。固定線程數的線程池提供了兩種創建具有固定線程數的的方法,固定線程池在初始化時確定其中的線程總數,運行過程中會始終維持線程數量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發于一世流云專欄:https://segmentfault.com/blog... ...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優先處理自身隊列中的任務或順序,由線程池構造時的參數決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發于一世流云的專欄:https://segmentfau...
摘要:并不會為每個任務都創建工作線程,而是根據實際情況構造線程池時的參數確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
摘要:無限期等待另一個線程執行特定操作。線程安全基本版請說明以及的區別值都不能為空數組結構上,通過數組和鏈表實現。優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。只是在最后獲取鎖成功后再把當前線程置為狀態然后再中斷線程。 前段時間在慕課網直播上聽小馬哥面試勸退(面試虐我千百遍,Java 并發真討厭),發現講得東西比自己拿到offer還要高興,于是自己在線下做了一點小筆記,供各位參考。 課...
閱讀 3021·2021-11-23 09:51
閱讀 3624·2021-10-13 09:39
閱讀 2511·2021-09-22 15:06
閱讀 894·2019-08-30 15:55
閱讀 3164·2019-08-30 15:44
閱讀 1793·2019-08-30 14:05
閱讀 3448·2019-08-29 15:24
閱讀 2373·2019-08-29 12:44