摘要:本文的源碼基于。人如其名,包含了和兩部分。而將一個任務的狀態(tài)設置成終止態(tài)只有三種方法我們將在下文的源碼解析中分析這三個方法。將棧中所有掛起的線程都喚醒后,下面就是執(zhí)行方法這個方法是一個空方
前言
系列文章目錄
有了上一篇對預備知識的了解之后,分析源碼就容易多了,本篇我們就直接來看看FutureTask的源碼。
本文的源碼基于JDK1.8。
Future和Task在深入分析源碼之前,我們再來拎一下FutureTask到底是干嘛的。人如其名,F(xiàn)utureTask包含了Future和Task兩部分。
我們上一篇說過,F(xiàn)utureTask實現(xiàn)了RunnableFuture接口,即Runnable接口和Future接口。
其中Runnable接口對應了FutureTask名字中的Task,代表FutureTask本質(zhì)上也是表征了一個任務。而Future接口就對應了FutureTask名字中的Future,表示了我們對于這個任務可以執(zhí)行某些操作,例如,判斷任務是否執(zhí)行完畢,獲取任務的執(zhí)行結(jié)果,取消任務的執(zhí)行等。
所以簡單來說,F(xiàn)utureTask本質(zhì)上就是一個“Task”,我們可以把它當做簡單的Runnable對象來使用。但是它又同時實現(xiàn)了Future接口,因此我們可以對它所代表的“Task”進行額外的控制操作。
Java并發(fā)工具類的三板斧關于Java并發(fā)工具類的三板斧,我們在分析AQS源碼的時候已經(jīng)說過了,即:
狀態(tài),隊列,CAS
以這三個方面為切入點來看源碼,有助于我們快速的看清FutureTask的概貌:
狀態(tài)首先是找狀態(tài)。
在FutureTask中,狀態(tài)是由state屬性來表示的,不出所料,它是volatile類型的,確保了不同線程對它修改的可見性:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
state屬性是貫穿整個FutureTask的最核心的屬性,該屬性的值代表了任務在運行過程中的狀態(tài),隨著任務的執(zhí)行,狀態(tài)將不斷地進行轉(zhuǎn)變,從上面的定義中可以看出,總共有7種狀態(tài):包括了1個初始態(tài),2個中間態(tài)和4個終止態(tài)。
雖說狀態(tài)有這么多,但是狀態(tài)的轉(zhuǎn)換路徑卻只有四種:
任務的初始狀態(tài)都是NEW, 這一點是構(gòu)造函數(shù)保證的,我們后面分析構(gòu)造函數(shù)的時候再講;
任務的終止狀態(tài)有4種:
NORMAL:任務正常執(zhí)行完畢
EXCEPTIONAL:任務執(zhí)行過程中發(fā)生異常
CANCELLED:任務被取消
INTERRUPTED:任務被中斷
任務的中間狀態(tài)有2種:
COMPLETING 正在設置任務結(jié)果
INTERRUPTING 正在中斷運行任務的線程
值得一提的是,任務的中間狀態(tài)是一個瞬態(tài),它非常的短暫。而且任務的中間態(tài)并不代表任務正在執(zhí)行,而是任務已經(jīng)執(zhí)行完了,正在設置最終的返回結(jié)果,所以可以這么說:
只要state不處于 NEW 狀態(tài),就說明任務已經(jīng)執(zhí)行完畢
注意,這里的執(zhí)行完畢是指傳入的Callable對象的call方法執(zhí)行完畢,或者拋出了異常。所以這里的COMPLETING的名字顯得有點迷惑性,它并不意味著任務正在執(zhí)行中,而意味著call方法已經(jīng)執(zhí)行完畢,正在設置任務執(zhí)行的結(jié)果。
而將一個任務的狀態(tài)設置成終止態(tài)只有三種方法:
set
setException
cancel
我們將在下文的源碼解析中分析這三個方法。
隊列接著我們來看隊列,在FutureTask中,隊列的實現(xiàn)是一個單向鏈表,它表示所有等待任務執(zhí)行完畢的線程的集合。我們知道,F(xiàn)utureTask實現(xiàn)了Future接口,可以獲取“Task”的執(zhí)行結(jié)果,那么如果獲取結(jié)果時,任務還沒有執(zhí)行完畢怎么辦呢?那么獲取結(jié)果的線程就會在一個等待隊列中掛起,直到任務執(zhí)行完畢被喚醒。這一點有點類似于我們之前學習的AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。
我們前面說過,在并發(fā)編程中使用隊列通常是將當前線程包裝成某種類型的數(shù)據(jù)結(jié)構(gòu)扔到等待隊列中,我們先來看看隊列中的每一個節(jié)點是怎么個結(jié)構(gòu):
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
可見,相比于AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節(jié)點的next屬性。
值得一提的是,F(xiàn)utureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務的執(zhí)行結(jié)果,如果任務還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對于出棧的情況也是同理。
由于FutureTask中的隊列本質(zhì)上是一個Treiber棧,那么使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了,在FutureTask中,就是waiters屬性:
/** Treiber stack of waiting threads */ private volatile WaitNode waiters;
事實上,它就是整個單向鏈表的頭節(jié)點。
綜上,F(xiàn)utureTask中所使用的隊列的結(jié)構(gòu)如下:
CAS操作大多數(shù)是用來改變狀態(tài)的,在FutureTask中也不例外。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性的偏移量:
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
從這個靜態(tài)代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務的狀態(tài),waiters屬性代表了指向棧頂節(jié)點的指針,這兩個我們上面已經(jīng)分析過了。runner屬性代表了執(zhí)行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執(zhí)行任務的線程呢?這是為了中斷或者取消任務做準備的,只有知道了執(zhí)行任務的線程是誰,我們才能去中斷它。
定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調(diào)用的還是Unsafe類的compareAndSwapXXX方法,關于這一點,我們上一篇預備知識中已經(jīng)講過了,這里不再贅述。
核心屬性前面我們以java并發(fā)編程工具類的“三板斧”為切入點分析了FutureTask的狀態(tài),隊列和CAS操作,對這個工具類有了初步的認識。接下來,我們就要開始進入源碼分析了。首先我們先來看看FutureTask的幾個核心屬性:
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callablecallable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
可以看出,F(xiàn)utureTask的核心屬性只有5個:
state
callable
outcome
runner
waiters
關于 state waiters runner三個屬性我們上面已經(jīng)解釋過了。剩下的callable屬性代表了要執(zhí)行的任務本身,即FutureTask中的“Task”部分,為Callable類型,這里之所以用Callable而不用Runnable是因為FutureTask實現(xiàn)了Future接口,需要獲取任務的執(zhí)行結(jié)果。outcome屬性代表了任務的執(zhí)行結(jié)果或者拋出的異常,為Object類型,也就是說outcome可以是任意類型的對象,所以當我們將正常的執(zhí)行結(jié)果返回給調(diào)用者時,需要進行強制類型轉(zhuǎn)換,返回由Callable定義的V類型。這5個屬性綜合起來就完成了整個FutureTask的工作,使用關系如下:
任務本尊:callable
任務的執(zhí)行者:runner
任務的結(jié)果:outcome
獲取任務的結(jié)果:state + outcome + waiters
中斷或者取消任務:state + runner + waiters
構(gòu)造函數(shù)介紹完核心屬性之后,我們來看看FutureTask的構(gòu)造函數(shù):
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask共有2個構(gòu)造函數(shù),這2個構(gòu)造函數(shù)一個是直接傳入Callable對象, 一個是傳入一個Runnable對象和一個指定的result, 然后通過Executors工具類將它適配成callable對象, 所以這兩個構(gòu)造函數(shù)的本質(zhì)是一樣的:
用傳入的參數(shù)初始化callable成員變量
將FutureTask的狀態(tài)設為NEW
(關于將Runnable對象適配成Callable對象的方法Executors.callable(runnable, result)我們在上一篇預備知識中已經(jīng)講過了,不記得的同學可以倒回去再看一下)
接口實現(xiàn)上一篇我們提過,F(xiàn)utureTask實現(xiàn)了RunnableFuture接口:
public class FutureTaskimplements RunnableFuture { ... }
因此,它必須實現(xiàn)Runnable和Future接口的所有方法。
Runnable接口實現(xiàn)要實現(xiàn)Runnable接口, 就得覆寫run方法, 我們看看FutureTask的run方法干了點啥:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
首先我們看到,在run方法的一開始,就檢查當前狀態(tài)是不是New, 并且使用CAS操作將runner屬性設置位當前線程,即記錄執(zhí)行任務的線程。compareAndSwapObject的用法在上一篇預備知識中已經(jīng)介紹過了,這里不再贅述。可見,runner屬性是在運行時被初始化的。
接下來,我們就調(diào)用Callable對象的call方法來執(zhí)行任務,如果任務執(zhí)行成功,就使用set(result)設置結(jié)果,否則,用setException(ex)設置拋出的異常。
我們先來看看set(result)方法:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
這個方法一開始通過CAS操作將state屬性由原來的NEW狀態(tài)修改為COMPLETING狀態(tài),我們在一開始介紹state狀態(tài)的時候說過,COMPLETING是一個非常短暫的中間態(tài),表示正在設置執(zhí)行的結(jié)果。
狀態(tài)設置成功后,我們就把任務執(zhí)行結(jié)果賦值給outcome, 然后直接把state狀態(tài)設置成NORMAL,注意,這里是直接設置,沒有先比較再設置的操作,由于state屬性被設置成volatile, 結(jié)合我們上一篇預備知識的介紹,這里putOrderedInt應當和putIntVolatile是等價的,保證了state狀態(tài)對其他線程的可見性。
在這之后,我們調(diào)用了 finishCompletion()來完成執(zhí)行結(jié)果的設置。
接下來我們再來看看發(fā)生了異常的版本setException(ex):
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
可見,除了將outcome屬性賦值為異常對象,以及將state的終止狀態(tài)修改為EXCEPTIONAL,其余都和set方法類似。在方法的最后,都調(diào)用了 finishCompletion()來完成執(zhí)行結(jié)果的設置。那么我們就來看看 finishCompletion()干了點啥:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
這個方法事實上完成了一個“善后”工作。我們先來看看if條件語句中的CAS操作:
UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)
該方法是將waiters屬性的值由原值設置為null, 我們知道,waiters屬性指向了Treiber棧的棧頂節(jié)點,可以說是代表了整個Treiber棧,將該值設為null的目的就是清空整個棧。如果設置不成功,則if語句塊不會被執(zhí)行,又進行下一輪for循環(huán),而下一輪for循環(huán)的判斷條件又是waiters!=null ,由此我們知道,雖然最外層的for循環(huán)乍一看好像是什么遍歷節(jié)點的操作,其實只是為了確保waiters屬性被成功設置成null,本質(zhì)上相當于一個自旋操作。
將waiters屬性設置成null以后,接下了 for (;;)死循環(huán)才是真正的遍歷節(jié)點,可以看出,循環(huán)內(nèi)部就是一個普通的遍歷鏈表的操作,我們前面講屬性的時候說過,Treiber棧里面存放的WaitNode代表了當前等待任務執(zhí)行結(jié)束的線程,這個循環(huán)的作用也正是遍歷鏈表中所有等待的線程,并喚醒他們。
將Treiber棧中所有掛起的線程都喚醒后,下面就是執(zhí)行done方法:
/** * Protected method invoked when this task transitions to state * {@code isDone} (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { }
這個方法是一個空方法,從注釋上看,它是提供給子類覆寫的,以實現(xiàn)一些任務執(zhí)行結(jié)束前的額外操作。
done方法之后就是callable屬性的清理了(callable = null)。
至此,整個run方法分析完了。
真的嗎???
并沒有!別忘了run方法最后還有一個finally塊呢:
finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }
在finally塊中,我們將runner屬性置為null,并且檢查有沒有遺漏的中斷,如果發(fā)現(xiàn)s >= INTERRUPTING, 說明執(zhí)行任務的線程有可能被中斷了,因為s >= INTERRUPTING 只有兩種可能,state狀態(tài)為INTERRUPTING和INTERRUPTED。
有的同學可能就要問了,咱前面已經(jīng)執(zhí)行過的set方法或者setException方法不是已經(jīng)將state狀態(tài)設置成NORMAL或者EXCEPTIONAL了嗎?怎么會出現(xiàn)INTERRUPTING或者INTERRUPTED狀態(tài)呢?別忘了,咱們在多線程的環(huán)境中,在當前線程執(zhí)行run方法的同時,有可能其他線程取消了任務的執(zhí)行,此時其他線程就可能對state狀態(tài)進行改寫,這也就是我們在設置終止狀態(tài)的時候用putOrderedInt方法,而沒有用CAS操作的原因——我們無法確信在設置state前是處于COMPLETING中間態(tài)還是INTERRUPTING中間態(tài)。
關于任務取消的操作,我們后面講Future接口的實現(xiàn)的時候再講,回到現(xiàn)在的問題,我們來看看handlePossibleCancellationInterrupt方法干了點啥:
/** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let"s spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
可見該方法是一個自旋操作,如果當前的state狀態(tài)是INTERRUPTING,我們在原地自旋,直到state狀態(tài)轉(zhuǎn)換成終止態(tài)。
至此,run方法的分析就真的結(jié)束了。我們來總結(jié)一下:
run方法重點做了以下幾件事:
將runner屬性設置成當前正在執(zhí)行run方法的線程
調(diào)用callable成員變量的call方法來執(zhí)行任務
設置執(zhí)行結(jié)果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結(jié)果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設置結(jié)果之前,先將state狀態(tài)設為中間態(tài)
對outcome的賦值完成后,設置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)
喚醒Treiber棧中所有等待的線程
善后清理(waiters, callable,runner設為null)
檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成。
這里再插一句,我們前面說“state只要不是NEW狀態(tài),就說明任務已經(jīng)執(zhí)行完成了”就體現(xiàn)在這里,因為run方法中,我們是在c.call()執(zhí)行完畢或者拋出了異常之后才開始設置中間態(tài)和終止態(tài)的。
Future接口的實現(xiàn)Future接口一共定義了5個方法,我們一個個來看:
cancel(boolean mayInterruptIfRunning)既然上面在分析run方法的最后,我們提到了任務可能被別的線程取消,那我們就趁熱打鐵,看看怎么取消一個任務的執(zhí)行:
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
還記得我們上一篇在介紹Future接口的時候?qū)ancel方法的說明嗎?
關于cancel方法,這里要補充說幾點:
首先有以下三種情況之一的,cancel操作一定是失敗的:任務已經(jīng)執(zhí)行完成了
任務已經(jīng)被取消過了
任務因為某種原因不能被取消
其它情況下,cancel操作將返回true。值得注意的是,cancel操作返回true并不代表任務真的就是被取消了,這取決于發(fā)動cancel狀態(tài)時,任務所處的狀態(tài):
如果發(fā)起cancel時任務還沒有開始運行,則隨后任務就不會被執(zhí)行;
如果發(fā)起cancel時任務已經(jīng)在運行了,則這時就需要看mayInterruptIfRunning參數(shù)了:
如果mayInterruptIfRunning 為true, 則當前在執(zhí)行的任務會被中斷
如果mayInterruptIfRunning 為false, 則可以允許正在執(zhí)行的任務繼續(xù)運行,直到它執(zhí)行完
我們來看看FutureTask是怎么實現(xiàn)cancel方法的這幾個規(guī)范的:
首先,對于“任務已經(jīng)執(zhí)行完成了或者任務已經(jīng)被取消過了,則cancel操作一定是失敗的(返回false)”這兩條,是通過簡單的判斷state值是否為NEW實現(xiàn)的,因為我們前面說過了,只要state不為NEW,說明任務已經(jīng)執(zhí)行完畢了。從代碼中可以看出,只要state不為NEW,則直接返回false。
如果state還是NEW狀態(tài),我們再往下看:
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
這一段是根據(jù)mayInterruptIfRunning的值將state的狀態(tài)由NEW設置成INTERRUPTING或者CANCELLED,當這一操作也成功之后,就可以執(zhí)行后面的try語句了,但無論怎么,該方法最后都返回了true。
我們再接著看try塊干了點啥:
try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); }
我們知道,runner屬性中存放的是當前正在執(zhí)行任務的線程,因此,這個try塊的目的就是中斷當前正在執(zhí)行任務的線程,最后將state的狀態(tài)設為INTERRUPTED,當然,中斷操作完成后,還需要通過finishCompletion()來喚醒所有在Treiber棧中等待的線程。
我們現(xiàn)在總結(jié)一下,cancel方法實際上完成以下兩種狀態(tài)轉(zhuǎn)換之一:
NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)
對于第一條路徑,雖說cancel方法最終返回了true,但它只是簡單的把state狀態(tài)設為CANCELLED,并不會中斷線程的執(zhí)行。但是這樣帶來的后果是,任務即使執(zhí)行完畢了,也無法設置任務的執(zhí)行結(jié)果,因為前面分析run方法的時候我們知道,設置任務結(jié)果有一個中間態(tài),而這個中間態(tài)的設置,是以當前state狀態(tài)為NEW為前提的。
對于第二條路徑,則會中斷執(zhí)行任務的線程,我們在倒回上面的run方法看看:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
雖然第二條路徑中斷了當前正在執(zhí)行的線程,但是,響不響應這個中斷是由執(zhí)行任務的線程自己決定的,更具體的說,這取決于c.call()方法內(nèi)部是否對中斷進行了響應,是否將中斷異常拋出。
那call方法中是怎么處理中斷的呢?從上面的代碼中可以看出,catch語句處理了所有的Throwable的異常,這自然也包括了中斷異常。
然而,值得一提的是,即使這里進入了catch (Throwable ex){}代碼塊,setException(ex)的操作一定是失敗的,因為在我們?nèi)∠蝿請?zhí)行的線程中,我們已經(jīng)先把state狀態(tài)設為INTERRUPTING了,而setException(ex)的操作要求設置前線程的狀態(tài)為NEW。所以這里響應cancel方法所造成的中斷最大的意義不是為了對中斷進行處理,而是簡單的停止任務線程的執(zhí)行,節(jié)省CPU資源。
那讀者可能會問了,既然這個setException(ex)的操作一定是失敗的,那放在這里有什么用呢?事實上,這個setException(ex)是用來處理任務自己在正常執(zhí)行過程中產(chǎn)生的異常的,在我們沒有主動去cancel任務時,任務的state狀態(tài)在執(zhí)行過程中就會始終是NEW,如果任務此時自己發(fā)生了異常,則這個異常就會被setException(ex)方法成功的記錄到outcome中。
反正無論如何,run方法最終都會進入finally塊,而這時候它會發(fā)現(xiàn)s >= INTERRUPTING,如果檢測發(fā)現(xiàn)s = INTERRUPTING,說明cancel方法還沒有執(zhí)行到中斷當前線程的地方,那就等待它將state狀態(tài)設置成INTERRUPTED。到這里,對cancel方法的分析就和上面對run方法的分析對接上了。
cancel方法到這里就分析完了,如果你一條條的去對照Future接口對于cancel方法的規(guī)范,它每一條都是實現(xiàn)了的,而它實現(xiàn)的核心機理,就是對state的當前狀態(tài)的判斷和設置。由此可見,state屬性是貫穿整個FutureTask的最核心的屬性。
isCancelled()說完了cancel,我們再來看看 isCancelled()方法,相較而言,它就簡單多了:
public boolean isCancelled() { return state >= CANCELLED; }
那么state >= CANCELLED 包含了那些狀態(tài)呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED
我們再來回憶下上一篇講的Future接口對于isCancelled()方法的規(guī)范:
該方法用于判斷任務是否被取消了。如果一個任務在正常執(zhí)行完成之前被Cancel掉了, 則返回true
再對比state的狀態(tài)圖:
可見選取這三個狀態(tài)作為判斷依據(jù)是很合理的, 因為只有調(diào)用了cancel方法,才會使state狀態(tài)進入這三種狀態(tài)。
與 isCancelled方法類似,isDone方法也是簡單地通過state狀態(tài)來判斷。
public boolean isDone() { return state != NEW; }
關于這一點,其實我們之前已經(jīng)說過了,只要state狀態(tài)不是NEW,則任務已經(jīng)執(zhí)行完畢了,因為state狀態(tài)不存在類似“任務正在執(zhí)行中”這種狀態(tài),即使是短暫的中間態(tài),也是發(fā)生在任務已經(jīng)執(zhí)行完畢,正在設置任務結(jié)果的時候。
get()最后我們來看看獲取執(zhí)行結(jié)果的get方法,先來看看無參的版本:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
該方法其實很簡單,當任務還沒有執(zhí)行完畢或者正在設置執(zhí)行結(jié)果時,我們就使用awaitDone方法等待任務進入終止態(tài),注意,awaitDone的返回值是任務的狀態(tài),而不是任務的結(jié)果。任務進入終止態(tài)之后,我們就根據(jù)任務的執(zhí)行結(jié)果來返回計算結(jié)果或者拋出異常。
我們先來看看等待任務完成的awaitDone方法,該方法是獲取任務結(jié)果最核心的方法,它完成了獲取結(jié)果,掛起線程,響應中斷等諸多操作:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
在具體分析它的源碼之前,有一點我們先特別說明一下,F(xiàn)utureTask中會涉及到兩類線程,一類是執(zhí)行任務的線程,它只有一個,F(xiàn)utureTask的run方法就由該線程來執(zhí)行;一類是獲取任務執(zhí)行結(jié)果的線程,它可以有多個,這些線程可以并發(fā)執(zhí)行,每一個線程都是獨立的,都可以調(diào)用get方法來獲取任務的執(zhí)行結(jié)果。如果任務還沒有執(zhí)行完,則這些線程就需要進入Treiber棧中掛起,直到任務執(zhí)行結(jié)束,或者等待的線程自身被中斷。
理清了這一點后,我們再來詳細看看awaitDone方法。可以看出,該方法的大框架是一個自旋操作,我們一段一段來看:
for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // ... }
首先一開始,我們先檢測當前線程是否被中斷了,這是因為get方法是阻塞式的,如果等待的任務還沒有執(zhí)行完,則調(diào)用get方法的線程會被扔到Treiber棧中掛起等待,直到任務執(zhí)行完畢。但是,如果任務遲遲沒有執(zhí)行完畢,則我們也有可能直接中斷在Treiber棧中的線程,以停止等待。
當檢測到線程被中斷后,我們調(diào)用了removeWaiter:
private void removeWaiter(WaitNode node) { if (node != null) { ... } }
removeWaiter的作用是將參數(shù)中的node從等待隊列(即Treiber棧)中移除。如果此時線程還沒有進入Treiber棧,則 q=null,那么removeWaiter(q)啥也不干。在這之后,我們就直接拋出了InterruptedException異常。
接著往下看:
for (;;) { /*if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }*/ int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }
如果任務已經(jīng)進入終止態(tài)(s > COMPLETING),我們就直接返回任務的狀態(tài);
否則,如果任務正在設置執(zhí)行結(jié)果(s == COMPLETING),我們就讓出當前線程的CPU資源繼續(xù)等待
否則,就說明任務還沒有執(zhí)行,或者任務正在執(zhí)行過程中,那么這時,如果q現(xiàn)在還為null, 說明當前線程還沒有進入等待隊列,于是我們新建了一個WaitNode, WaitNode的構(gòu)造函數(shù)我們之前已經(jīng)看過了,就是生成了一個記錄了當前線程的節(jié)點;
如果q不為null,說明代表當前線程的WaitNode已經(jīng)被創(chuàng)建出來了,則接下來如果queued=false,表示當前線程還沒有入隊,所以我們執(zhí)行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
這行代碼的作用是通過CAS操作將新建的q節(jié)點添加到waiters鏈表的頭節(jié)點之前,其實就是Treiber棧的入棧操作,寫的還是很簡潔的,一行代碼就搞定了,如果大家還是覺得暈乎,下面是它等價的偽代碼:
q.next = waiters; //當前節(jié)點的next指向目前的棧頂元素 //如果棧頂節(jié)點在這個過程中沒有變,即沒有發(fā)生并發(fā)入棧的情況 if(waiters的值還是上面q.next所使用的waiters值){ waiters = q; //修改棧頂?shù)闹羔槪赶騽倓側(cè)霔5墓?jié)點 }
這個CAS操作就是為了保證同一時刻如果有多個線程在同時入棧,則只有一個能夠操作成功,也即Treiber棧的規(guī)范。
如果以上的條件都不滿足,則再接下來因為現(xiàn)在是不帶超時機制的get,timed為false,則else if代碼塊跳過,然后來到最后一個else, 把當前線程掛起,此時線程就處于阻塞等待的狀態(tài)。
至此,在任務沒有執(zhí)行完畢的情況下,獲取任務執(zhí)行結(jié)果的線程就會在Treiber棧中被LockSupport.park(this)掛起了。
那么這個掛起的線程什么時候會被喚醒呢?有兩種情況:
任務執(zhí)行完畢了,在finishCompletion方法中會喚醒所有在Treiber棧中等待的線程
等待的線程自身因為被中斷等原因而被喚醒。
我們接下來就繼續(xù)看看線程被喚醒后的情況,此時,線程將回到for(;;)循環(huán)的開頭,繼續(xù)下一輪循環(huán):
for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 掛起的線程從這里被喚醒 }
首先自然還是檢測中斷,所不同的是,此時q已經(jīng)不為null了,因此在有中斷發(fā)生的情況下,在拋出中斷之前,多了一步removeWaiter(q)操作,該操作是將當前線程從等待的Treiber棧中移除,相比入棧操作,這個出棧操作要復雜一點,這取決于節(jié)點是否位于棧頂。下面我們來仔細分析這個出棧操作:
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
首先,我們把要出棧的WaitNode的thread屬性設置為null, 這相當于一個標記,是我們后面在waiters鏈表中定位該節(jié)點的依據(jù)。
(1) 要移除的節(jié)點就在棧頂
我們先來看看該節(jié)點就位于棧頂?shù)那闆r,這說明在該節(jié)點入棧后,并沒有別的線程再入棧了。由于一開始我們就將該節(jié)點的thread屬性設為了null,因此,前面的q.thread != null 和 pred != null都不滿足,我們直接進入到最后一個else if 分支:
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry;
這一段是棧頂節(jié)點出棧的操作,和入棧類似,采用了CAS比較,將棧頂元素設置成原棧頂節(jié)點的下一個節(jié)點。
值得注意的是,當CAS操作不成功時,程序會回到retry處重來,但即使CAS操作成功了,程序依舊會遍歷完整個鏈表,找尋node.thread == null 的節(jié)點,并將它們一并從鏈表中剔除。
(2) 要移除的節(jié)點不在棧頂
當要移除的節(jié)點不在棧頂時,我們會一直遍歷整個鏈表,直到找到q.thread == null的節(jié)點,找到之后,我們將進入
else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; }
這是因為節(jié)點不在棧頂,則其必然是有前驅(qū)節(jié)點pred的,這時,我們只是簡單的讓前驅(qū)節(jié)點指向當前節(jié)點的下一個節(jié)點,從而將目標節(jié)點從鏈表中剔除。
注意,后面多加的那個if判斷是很有必要的,因為removeWaiter方法并沒有加鎖,所以可能有多個線程在同時執(zhí)行,WaitNode的兩個成員變量thread和next都被設置成volatile,這保證了它們的可見性,如果我們在這時發(fā)現(xiàn)了pred.thread == null,那就意味著它已經(jīng)被另一個線程標記了,將在另一個線程中被拿出waiters鏈表,而我們當前目標節(jié)點的原后繼節(jié)點現(xiàn)在是接在這個pred節(jié)點上的,因此,如果pred已經(jīng)被其他線程標記為要拿出去的節(jié)點,我們現(xiàn)在這個線程再繼續(xù)往后遍歷就沒有什么意義了,所以這時就調(diào)到retry處,從頭再遍歷。
如果pred節(jié)點沒有被其他線程標記,那我們就接著往下遍歷,直到整個鏈表遍歷完。
至此,將節(jié)點從waiters鏈表中移除的removeWaiter操作我們就分析完了,我們總結(jié)一下該方法:
在該方法中,會傳入一個需要移除的節(jié)點,我們會將這個節(jié)點的thread屬性設置成null,以標記該節(jié)點。然后無論如何,我們會遍歷整個鏈表,清除那些被標記的節(jié)點(只是簡單的將節(jié)點從鏈表中剔除)。如果要清除的節(jié)點就位于棧頂,則還需要注意重新設置waiters的值,指向新的棧頂節(jié)點。所以可以看出,雖說removeWaiter方法傳入了需要剔除的節(jié)點,但是事實上它可能剔除的不止是傳入的節(jié)點,而是所有已經(jīng)被標記了的節(jié)點,這樣不僅清除操作容易了些(不需要專門去定位傳入的node在哪里),而且提升了效率(可以同時清除所有已經(jīng)被標記的節(jié)點)。
我們再回到awaitDone方法里:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); // 剛剛分析到這里了,我們接著往下看 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
如果線程不是因為中斷被喚醒,則會繼續(xù)往下執(zhí)行,此時會再次獲取當前的state狀態(tài)。所不同的是,此時q已經(jīng)不為null, queued已經(jīng)為true了,所以已經(jīng)不需要將當前節(jié)點再入waiters棧了。
至此我們知道,除非被中斷,否則get方法會在原地自旋等待(用的是Thread.yield,對應于s == COMPLETING)或者直接掛起(對應任務還沒有執(zhí)行完的情況),直到任務執(zhí)行完成。而我們前面分析run方法和cancel方法的時候知道,在run方法結(jié)束后,或者cancel方法取消完成后,都會調(diào)用finishCompletion()來喚醒掛起的線程,使它們得以進入下一輪循環(huán),獲取任務執(zhí)行結(jié)果。
最后,等awaitDone函數(shù)返回后,get方法返回了report(s),以根據(jù)任務的狀態(tài),匯報執(zhí)行結(jié)果:
@SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
可見,report方法非常簡單,它根據(jù)當前state狀態(tài),返回正常執(zhí)行的結(jié)果,或者拋出指定的異常。
至此,get方法就分析結(jié)束了。
值得注意的是,awaitDone方法和get方法都沒有加鎖,這在多個線程同時執(zhí)行g(shù)et方法的時候會不會產(chǎn)生線程安全問題呢?通過查看方法內(nèi)部的參數(shù)我們知道,整個方法內(nèi)部用的大多數(shù)是局部變量,因此不會產(chǎn)生線程安全問題,對于全局的共享變量waiters的修改時,也使用了CAS操作,保證了線程安全,而state變量本身是volatile的,保證了讀取時的可見性,因此整個方法調(diào)用雖然沒有加鎖,它仍然是線程安全的。
get(long timeout, TimeUnit unit)最后我們來看看帶超時版本的get方法:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
它和上面不帶超時時間的get方法很類似,只是在awaitDone方法中多了超時檢測:
else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); }
即,如果指定的超時時間到了,則直接返回,如果返回時,任務還沒有進入終止狀態(tài),則直接拋出TimeoutException異常,否則就像get()方法一樣,正常的返回執(zhí)行結(jié)果。
總結(jié)FutureTask實現(xiàn)了Runnable和Future接口,它表示了一個帶有任務狀態(tài)和任務結(jié)果的任務,它的各種操作都是圍繞著任務的狀態(tài)展開的,值得注意的是,在所有的7個任務狀態(tài)中,只要不是NEW狀態(tài),就表示任務已經(jīng)執(zhí)行完畢或者不再執(zhí)行了,并沒有表示“任務正在執(zhí)行中”的狀態(tài)。
除了代表了任務的Callable對象、代表任務執(zhí)行結(jié)果的outcome屬性,F(xiàn)utureTask還包含了一個代表所有等待任務結(jié)束的線程的Treiber棧,這一點其實和各種鎖的等待隊列特別像,即如果拿不到鎖,則當前線程就會被扔進等待隊列中;這里則是如果任務還沒有執(zhí)行結(jié)束,則所有等待任務執(zhí)行完畢的線程就會被扔進Treiber棧中,直到任務執(zhí)行完畢了,才會被喚醒。
FutureTask雖然為我們提供了獲取任務執(zhí)行結(jié)果的途徑,遺憾的是,在獲取任務結(jié)果時,如果任務還沒有執(zhí)行完成,則當前線程會自旋或者掛起等待,這和我們實現(xiàn)異步的初衷是相違背的,我們后面將繼續(xù)介紹另一個同步工具類CompletableFuture, 它解決了這個問題。
(完)
系列文章目錄
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77337.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續(xù)更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經(jīng)歷。因為寫作的時候發(fā)現(xiàn),為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:在分析它的源碼之前我們需要先了解一些預備知識。因為接口沒有返回值所以為了與兼容我們額外傳入了一個參數(shù)使得返回的對象的方法直接執(zhí)行的方法然后返回傳入的參數(shù)。 前言 系列文章目錄 FutureTask 是一個同步工具類,它實現(xiàn)了Future語義,表示了一種抽象的可生成結(jié)果的計算。在包括線程池在內(nèi)的許多工具類中都會用到,弄懂它的實現(xiàn)將有利于我們更加深入地理解Java異步操作實現(xiàn)。 在分析...
摘要:零前期準備文章異常啰嗦且繞彎。版本版本簡介是中默認的實現(xiàn)類,常與結(jié)合進行多線程并發(fā)操作。所以方法的主體其實就是去喚醒被阻塞的線程。本文僅為個人的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充 零 前期準備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...
閱讀 3496·2021-10-18 13:30
閱讀 2951·2021-10-09 09:44
閱讀 1969·2019-08-30 11:26
閱讀 2299·2019-08-29 13:17
閱讀 765·2019-08-29 12:17
閱讀 2253·2019-08-26 18:42
閱讀 478·2019-08-26 13:24
閱讀 2960·2019-08-26 11:39