摘要:前言在前面的文章和響應(yīng)式編程中提到了和后者毫無疑問是一個(gè)線程池前者則是一個(gè)類似經(jīng)典定義的概念官方有一個(gè)非常無語的解釋就是運(yùn)行在的一個(gè)任務(wù)抽象就是運(yùn)行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的
前言
在前面的文章"CompletableFuture和響應(yīng)式編程"中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個(gè)線程池,前者則是一個(gè)類似FutureTask經(jīng)典定義的概念.
官方有一個(gè)非常無語的解釋:ForkJoinTask就是運(yùn)行在ForkJoinPool的一個(gè)任務(wù)抽象,ForkJoinPool就是運(yùn)行ForkJoinTask的線程池.
ForkJoin框架包含F(xiàn)orkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子類,它的核心在于分治和工作竅取,最大程度利用線程池中的工作線程,避免忙的忙死,餓的餓死.
ForkJoinTask可以理解為類線程但比線程輕量的實(shí)體,在ForkJoinPool中運(yùn)行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任務(wù).ForkJoinTask同時(shí)也是一個(gè)輕量的Future,使用時(shí)應(yīng)避免較長阻塞和io.
ForkJoinTask在JAVA8中應(yīng)用廣泛,但它是一個(gè)抽象類,它的子類派生了各種用途,如后續(xù)計(jì)劃多帶帶介紹的CountedCompleter,以及若干JAVA8中stream?api定義的與并行流有關(guān)的各種操作(ops).
源碼首先看ForkJoinTask的簽名.
public abstract class ForkJoinTaskimplements Future , Serializable
從簽名上看,ForkJoinTask實(shí)現(xiàn)了future,也可以序列化,但它不是一個(gè)Runnable或Callable.
ForkJoinTask雖然可以序列化,但它只對(duì)運(yùn)行前和后敏感,對(duì)于執(zhí)行過程中不敏感.
先來看task的運(yùn)行字段:
//volatie修飾的任務(wù)狀態(tài)值,由ForkJoinPool或工作線程修改. volatile int status; static final int DONE_MASK = 0xf0000000;//用于屏蔽完成狀態(tài)位. static final int NORMAL = 0xf0000000;//表示正常完成,是負(fù)值. static final int CANCELLED = 0xc0000000;//表示被取消,負(fù)值,且小于NORMAL static final int EXCEPTIONAL = 0x80000000;//異常完成,負(fù)值,且小于CANCELLED static final int SIGNAL = 0x00010000;//用于signal,必須不小于1<<16,默認(rèn)為1<<16. static final int SMASK = 0x0000ffff;//后十六位的task標(biāo)簽
很顯然,DONE_MASK能夠過濾掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的狀態(tài),字段的含義也很直白,后面的SIGNAL和SMASK還不明確,后面再看.
//標(biāo)記當(dāng)前task的completion狀態(tài),同時(shí)根據(jù)情況喚醒等待該task的線程. private int setCompletion(int completion) { for (int s;;) { //開啟一個(gè)循環(huán),如果當(dāng)前task的status已經(jīng)是各種完成(小于0),則直接返回status,這個(gè)status可能是某一次循環(huán)前被其他線程完成. if ((s = status) < 0) return s; //嘗試將原來的status設(shè)置為它與completion按位或的結(jié)果. if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { if ((s >>> 16) != 0) //此處體現(xiàn)了SIGNAL的標(biāo)記作用,很明顯,只要task完成(包含取消或異常),或completion傳入的值不小于1<<16, //就可以起到喚醒其他線程的作用. synchronized (this) { notifyAll(); } //cas成功,返回參數(shù)中的completion. return completion; } } }
前面用注釋解釋了這個(gè)方法的邏輯,顯然該方法是阻塞的,如果傳入的參數(shù)不能將status設(shè)置為負(fù)值會(huì)如何?
顯然,可能會(huì)有至多一次的成功cas,并且若滿足喚醒的條件,會(huì)嘗試去喚醒線程,甚至可能因?yàn)闉榱藛拘哑渌€程而被阻塞在synchonized代碼塊外;也可能沒有一次成功的cas,直到其他線程成功將status置為完成.
//final修飾,運(yùn)行ForkJoinTask的核心方法. final int doExec() { int s; boolean completed; //僅未完成的任務(wù)會(huì)運(yùn)行,其他情況會(huì)忽略. if ((s = status) >= 0) { try { //調(diào)用exec completed = exec(); } catch (Throwable rex) { //發(fā)生異常,用setExceptionalCompletion設(shè)置結(jié)果 return setExceptionalCompletion(rex); } if (completed) //正常完成,調(diào)用前面說過的setCompletion,參數(shù)為normal,并將返回值作為結(jié)果s. s = setCompletion(NORMAL); } //返回s return s; } //記錄異常并且在符合條件時(shí)傳播異常行為 private int setExceptionalCompletion(Throwable ex) { //首先記錄異常信息到結(jié)果 int s = recordExceptionalCompletion(ex); if ((s & DONE_MASK) == EXCEPTIONAL) //status去除非完成態(tài)標(biāo)志位(只保留前4位),等于EXCEPTIONAL.內(nèi)部傳播異常 internalPropagateException(ex); return s; } //internalPropagateException方法是一個(gè)空方法,留給子類實(shí)現(xiàn),可用于completer之間的異常傳遞 void internalPropagateException(Throwable ex) {} //記錄異常完成 final int recordExceptionalCompletion(Throwable ex) { int s; if ((s = status) >= 0) { //只能是異常態(tài)的status可以記錄. //hash值禁止重寫,不使用子類的hashcode函數(shù). int h = System.identityHashCode(this); final ReentrantLock lock = exceptionTableLock; //異常鎖,加鎖 lock.lock(); try { //抹除臟異常,后面敘述 expungeStaleExceptions(); //異常表數(shù)組.ExceptionNode后面敘述. ExceptionNode[] t = exceptionTable;//exceptionTable是一個(gè)全局的靜態(tài)常量,后面敘述 //用hash值和數(shù)組長度進(jìn)行與運(yùn)算求一個(gè)初始的索引 int i = h & (t.length - 1); for (ExceptionNode e = t[i]; ; e = e.next) { //找到空的索引位,就創(chuàng)建一個(gè)新的ExceptionNode,保存this,異常對(duì)象并退出循環(huán) if (e == null) { t[i] = new ExceptionNode(this, ex, t[i]);//(1) break; } if (e.get() == this) //已設(shè)置在相同的索引位置的鏈表中,退出循環(huán).//2 break; //否則e指向t[i]的next,進(jìn)入下個(gè)循環(huán),直到發(fā)現(xiàn)判斷包裝this這個(gè)ForkJoinTask的ExceptionNode已經(jīng)出現(xiàn)在t[i]這個(gè)鏈表并break(2), //或者直到e是null,意味著t[i]出發(fā)開始的鏈表并無包裝this的ExceptionNode,則將構(gòu)建一個(gè)新的ExceptionNode并置換t[i], //將原t[i]置為它的next(1).整個(gè)遍歷判斷和置換過程處在鎖中進(jìn)行. } } finally { lock.unlock(); } //記錄成功,將當(dāng)前task設(shè)置為異常完成. s = setCompletion(EXCEPTIONAL); } return s; } //exceptionTable聲明 private static final ExceptionNode[] exceptionTable;//全局異常node表 private static final ReentrantLock exceptionTableLock;//上面用到的鎖,就是一個(gè)普通的可重入鎖. private static final ReferenceQueue
到此doExec(也是每個(gè)ForkJoinTask的執(zhí)行核心過程)就此結(jié)束.
很明顯,ForkJoinTask的doExec負(fù)責(zé)了核心的執(zhí)行,它留下了exec方法給子類實(shí)現(xiàn),而重點(diǎn)負(fù)責(zé)了后面出現(xiàn)異常情況的處理.處理的邏輯前面已論述,在產(chǎn)生異常時(shí)嘗試將異常存放在全局的execptionTable中,存放的結(jié)構(gòu)為數(shù)組+鏈表,按哈希值指定索引,每次存放新的異常時(shí),順便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一個(gè)exceptionTable,因此必然在有關(guān)的幾個(gè)環(huán)節(jié)要進(jìn)行及時(shí)的清理.除了剛剛論述的過程,還有如下的幾處:
前面論述了recordExceptionalCompletion,一共有四處使用了expungeStaleException,將已回收的ExceptionNode從引用隊(duì)列中清除.
clearExceptionalCompletion在對(duì)一個(gè)ForkJoinTask重新初始化時(shí)使用,我們?cè)谇懊嫣岬叫蛄谢瘯r(shí)說過,ForkJoinTask的序列化結(jié)果只保留了兩種情況:運(yùn)行前,運(yùn)行結(jié)束.重新初始化一個(gè)ForkJoinTask,就要去除任何中間狀態(tài),包含自身產(chǎn)出的已被回收的異常node,而expungeStaleExceptions顯然也順便幫助其他task清除.
getThrowableException是查詢task運(yùn)行結(jié)果時(shí)調(diào)用,如一些get/join方法,很明顯,記錄這個(gè)異常的作用就在于返回給get/join,在這一塊順便清理已被回收的node,尤其是將自己運(yùn)行時(shí)生成的node清除.
helpExpungeStaleExceptions是提供給ForkJoinPool在卸載worker時(shí)使用,順便幫助清理全局異常表.
使用它們的方法稍后再論述,先來繼續(xù)看ForkJoinTask的源碼.
//內(nèi)部等待任務(wù)完成,直到完成或超時(shí). final void internalWait(long timeout) { int s; //status小于0代表已完成,直接忽略wait. //未完成,則試著加上SIGNAL的標(biāo)記,令完成任務(wù)的線程喚醒這個(gè)等待. if ((s = status) >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //加鎖,只有一個(gè)線程可以進(jìn)入. synchronized (this) { //再次判斷未完成.等待timeout,且忽略擾動(dòng)異常. if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else //已完成則響醒其他等待者. notifyAll(); } } }
internalWait方法邏輯很簡單,首先判斷是否未完成,滿足未完成,則將標(biāo)記位加上SIGNAL(可能已有別的線程做過),隨后加鎖double?check?status,還未完成則等待并釋放鎖,若發(fā)現(xiàn)已完成,或在后續(xù)被喚醒后發(fā)現(xiàn)已完成,則喚醒其他等待線程.通過notifyAll的方式避免了通知丟失.
同時(shí),它的使用方法目前只有一個(gè)ForkJoinPool::awaitJoin,在該方法中使用循環(huán)的方式進(jìn)行internalWait,滿足了每次按截止時(shí)間或周期進(jìn)行等待,同時(shí)也順便解決了虛假喚醒.
繼續(xù)看externalAwaitDone函數(shù).它體現(xiàn)了ForkJoin框架的一個(gè)核心:外部幫助.
//外部線程等待一個(gè)common池中的任務(wù)完成. private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? //當(dāng)前task是一個(gè)CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,并將完成狀態(tài)返回. ForkJoinPool.common.externalHelpComplete( (CountedCompleter>)this, 0) : //當(dāng)前task不是CountedCompleter,則調(diào)用common pool嘗試外部彈出該任務(wù)并進(jìn)行執(zhí)行, //status賦值doExec函數(shù)的結(jié)果,若彈出失敗(其他線程先行彈出)賦0. ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { //檢查上一步的結(jié)果,即外部使用common池彈出并執(zhí)行的結(jié)果(不是CountedCompleter的情況),或外部嘗試幫助CountedCompleter完成的結(jié)果 //status大于0表示嘗試幫助完成失敗. //擾動(dòng)標(biāo)識(shí),初值false boolean interrupted = false; do { //循環(huán)嘗試,先給status標(biāo)記SIGNAL標(biāo)識(shí),便于后續(xù)喚醒操作. if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { //CAS成功,進(jìn)同步塊發(fā)現(xiàn)double check未完成,則等待. wait(0L); } catch (InterruptedException ie) { //若在等待過程中發(fā)生了擾動(dòng),不停止等待,標(biāo)記擾動(dòng). interrupted = true; } } else //進(jìn)同步塊發(fā)現(xiàn)已完成,則喚醒所有等待線程. notifyAll(); } } } while ((s = status) >= 0);//循環(huán)條件,task未完成. if (interrupted) //循環(huán)結(jié)束,若循環(huán)中間曾有擾動(dòng),則中斷當(dāng)前線程. Thread.currentThread().interrupt(); } //返回status return s; }
externalAwaitDone的邏輯不復(fù)雜,在當(dāng)前task為ForkJoinPool.common的情況下可以在外部進(jìn)行等待和嘗試幫助完成.方法會(huì)首先根據(jù)ForkJoinTask的類型進(jìn)行嘗試幫助,并返回當(dāng)前的status,若發(fā)現(xiàn)未完成,則進(jìn)入下面的等待喚醒邏輯.該方法的調(diào)用者為非worker線程.
相似的方法:externalInterruptibleAwaitDone
private int externalInterruptibleAwaitDone() throws InterruptedException { int s; //不同于externalAwaitDone,入口處發(fā)現(xiàn)當(dāng)前線程已中斷,則立即拋出中斷異常. if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete( (CountedCompleter>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) //wait時(shí)也不catch中斷異常,發(fā)生即拋出. wait(0L); else notifyAll(); } } } } return s; }
externalInterruptibleAwaitDone的邏輯與externalAwaitDone相似,只是對(duì)中斷異常的態(tài)度為拋,后者為catch.
它們的使用點(diǎn),externalAwaitDone為doJoin或doInvoke方法調(diào)用,externalInterruptibleAwaitDone為get方法調(diào)用,很明顯,join操作不可擾動(dòng),get則可以擾動(dòng).
下面來看看doJoin和doInvoke
//join的核心方法 private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //已完成,返回status,未完成再嘗試后續(xù) return (s = status) < 0 ? s : //未完成,當(dāng)前線程是ForkJoinWorkerThread,從該線程中取出workQueue,并嘗試將 //當(dāng)前task出隊(duì)然后執(zhí)行,執(zhí)行的結(jié)果是完成則返回狀態(tài),否則使用當(dāng)線程池所在的ForkJoinPool的awaitJoin方法等待. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : //當(dāng)前線程不是ForkJoinWorkerThread,調(diào)用前面說的externalAwaitDone方法. externalAwaitDone(); } //invoke的核心方法 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; //先嘗試本線程執(zhí)行,不成功才走后續(xù)流程 return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //與上一個(gè)方法基本相同,但在當(dāng)前線程是ForkJoinWorkerThread時(shí)不嘗試將該task移除棧并執(zhí)行,而是等 (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
到此終于可以看一些公有對(duì)外方法了.有了前面的基礎(chǔ),再看get,join,invoke等方法非常簡單.
//get方法還有g(shù)et(long time)的變種. public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? //當(dāng)前線程是ForkJoinWorkerThread則調(diào)用前面提過的doJoin方法. //否則調(diào)用前述externalInterruptibleAwaitDone doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) //異常處理,取消的任務(wù),拋出CancellationException. throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //異常處理,調(diào)用getThrowableException獲取異常,封進(jìn)ExecutionException. throw new ExecutionException(ex); //無異常處理,返回原始結(jié)果. return getRawResult(); } //getRawResult默認(rèn)為一個(gè)抽象實(shí)現(xiàn),在ForkJoinTask中,并未保存該結(jié)果的字段. public abstract V getRawResult(); //getThrowableException方法 private Throwable getThrowableException() { //不是異常標(biāo)識(shí),直接返回null,從方法名的字面意思看,要返回一個(gè)可拋出的異常. if ((status & DONE_MASK) != EXCEPTIONAL) return null; //系統(tǒng)哈希碼來定位ExceptionNode int h = System.identityHashCode(this); ExceptionNode e; final ReentrantLock lock = exceptionTableLock; //加異常表全局鎖 lock.lock(); try { //先清理已被回收的異常node,前面已述. expungeStaleExceptions(); ExceptionNode[] t = exceptionTable; e = t[h & (t.length - 1)]; //循環(huán)找出this匹配的異常node while (e != null && e.get() != this) e = e.next; } finally { lock.unlock(); } Throwable ex; //前面找不出異常node或異常node中存放的異常為null,則返回null if (e == null || (ex = e.ex) == null) return null; if (e.thrower != Thread.currentThread().getId()) { //不是當(dāng)前線程拋出的異常. Class extends Throwable> ec = ex.getClass(); try { Constructor> noArgCtor = null;//該異常的無參構(gòu)造器 Constructor>[] cs = ec.getConstructors();//該異常類公有構(gòu)造器 for (int i = 0; i < cs.length; ++i) { Constructor> c = cs[i]; Class>[] ps = c.getParameterTypes(); if (ps.length == 0) //構(gòu)建器參數(shù)列表長度0說明存在無參構(gòu)造器,存放. noArgCtor = c; else if (ps.length == 1 && ps[0] == Throwable.class) { //發(fā)現(xiàn)有參構(gòu)造器且參數(shù)長度1且第一個(gè)參數(shù)類型是Throwable,說明可以存放cause. //反射將前面取出的ex作為參數(shù),反射調(diào)用該構(gòu)造器創(chuàng)建一個(gè)要拋出的Throwable. Throwable wx = (Throwable)c.newInstance(ex); //反射失敗,異常會(huì)被catch,返回ex,否則返回wx. return (wx == null) ? ex : wx; } } if (noArgCtor != null) { //在嘗試了尋找有參無參構(gòu)造器,并發(fā)現(xiàn)只存在無參構(gòu)造器的情況,用無參構(gòu)造器初始化異常. Throwable wx = (Throwable)(noArgCtor.newInstance()); if (wx != null) { //將ex設(shè)置為它的cause并返回它的實(shí)例. wx.initCause(ex); return wx; } } } catch (Exception ignore) { //此方法不可拋出異常,一定要成功返回. } } //有參無參均未成功,返回找到的異常. return ex; } //join公有方法 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) //調(diào)用doJoin方法阻塞等待的結(jié)果不是NORMAL,說明有異常或取消.報(bào)告異常. reportException(s); //等于NORMAL,正常執(zhí)行完畢,返回原始結(jié)果. return getRawResult(); } //報(bào)告異常,可在前一步判斷執(zhí)行status是否為異常態(tài),然后獲取并重拋異常. private void reportException(int s) { //參數(shù)s必須用DONE_MASK處理掉前4位以后的位. if (s == CANCELLED) //傳入的狀態(tài)碼等于取消,拋出取消異常. throw new CancellationException(); if (s == EXCEPTIONAL) //使用前面的getThrowableException方法獲取異常并重新拋出. rethrow(getThrowableException()); } //invoke公有方法. public final V invoke() { int s; //先嘗試執(zhí)行 if ((s = doInvoke() & DONE_MASK) != NORMAL) //doInvoke方法的結(jié)果status只保留完成態(tài)位表示非NORMAL,則報(bào)告異常. reportException(s); //正常完成,返回原始結(jié)果. return getRawResult(); }
終于,讀到此處的讀者將關(guān)鍵的方法線串了起來,前述的所有內(nèi)部方法,常量和變量與公有接口的關(guān)系已經(jīng)明了.
很顯然,ForkJoinTask是個(gè)抽象類,且它并未保存任務(wù)的完成結(jié)果,也不負(fù)責(zé)這個(gè)結(jié)果的處理,但聲明并約束了返回結(jié)果的抽象方法getRawResult供子類實(shí)現(xiàn).
因此,ForkJoinTask的自身關(guān)注任務(wù)的完成/異常/未完成,子類關(guān)注這個(gè)結(jié)果的處理.
每當(dāng)獲取到任務(wù)的執(zhí)行狀態(tài)時(shí),ForkJoinTask可根據(jù)status來判斷是否是異常/正常完成,并進(jìn)入相應(yīng)的處理邏輯,最終使用子類實(shí)現(xiàn)的方法完成一個(gè)閉環(huán).
如果理解為將ForkJoinTask和子類的有關(guān)代碼合并起來,在結(jié)果/完成狀態(tài)/異常信息這一塊,相當(dāng)于同時(shí)有三個(gè)part在合作.
第一個(gè)part:status字段,它同時(shí)表示了未完成/正常完成/取消/異常完成等狀態(tài),也同時(shí)告訴有關(guān)等待線程是否要喚醒其他線程(每個(gè)線程等待前會(huì)設(shè)置SIGNAL),同時(shí)留出了后面16位對(duì)付其他情況.
第二個(gè)part:result,在ForkJoinTask見不到它,也沒有相應(yīng)的字段,子類也未必需要提供這個(gè)result字段,前面提到的CountedCompleter就沒有提供這個(gè)result,它的getRawResult會(huì)固定返回null.但是CountedCompleter可以繼承子類并實(shí)現(xiàn)這個(gè)result的保存與返回(道格大神在注釋中舉出了若干典型代碼例子),在JAVA8中,stream?api中的并行流也會(huì)保存每一步的計(jì)算結(jié)果,并對(duì)結(jié)果進(jìn)行合并.
第三個(gè)part:異常.在ForkJoinTask中已經(jīng)完成了所有異常處理流程和執(zhí)行流程的定義,重點(diǎn)在于異常的存放,它是由ForkJoinTask的類變量進(jìn)行存放的,結(jié)構(gòu)為數(shù)組+鏈表,且元素利用了弱引用,借gc幫助清除掉已經(jīng)被回收的ExceptionNode,顯然在gc之前必須得到使用.而異常隨時(shí)可以發(fā)生并進(jìn)行record入列,但相應(yīng)的能消費(fèi)掉這個(gè)異常的只有相應(yīng)的外部的get,join,invoke等方法或者內(nèi)部擴(kuò)展了exec()等方式,得到其他線程執(zhí)行的task異常結(jié)果的情況.巧妙的是,只有外部調(diào)用者調(diào)用(get,invoke,join)時(shí),這個(gè)異常信息才足夠重要,需要rethrow出去并保存關(guān)鍵的堆棧信息;而內(nèi)部線程在訪問一些非自身執(zhí)行的任務(wù)時(shí),往往只需要status判斷是否異常即可,在exec()中fork新任務(wù)的,也往往必須立即join這些新的子任務(wù),這就保證了能夠及時(shí)得到子任務(wù)中的異常堆棧(即使拿不到堆棧也知道它失敗了).
經(jīng)過前面的論述,ForkJoinTask的執(zhí)行和異常處理已經(jīng)基本論結(jié),但是,一個(gè)ForkJoinTask在創(chuàng)建之后是如何運(yùn)行的?顯然,它不是一個(gè)Runnable,也不是Callable,不能直接submit或execute到普通的線程池.
臨時(shí)切換到ForkJoinPool的代碼,前面提到過,ForkJoinTask的官方定義就是可以運(yùn)行在ForkJoinPool中的task.
//ForkJoinPool代碼,submit一個(gè)ForkJoinTask到ForkJoinPool,并將該task自身返回. //拿到返回的task,我們就可以進(jìn)行前述的get方法了. publicForkJoinTask submit(ForkJoinTask task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } //execute,不返回.類似普通線程池提交一個(gè)runnable的行為. public void execute(ForkJoinTask> task) { if (task == null) throw new NullPointerException(); externalPush(task); }
顯然,若要使用一個(gè)自建的ForkJoinPool,可以使用execute或submit函數(shù)提交入池,然后用前述的get方法和變種方法進(jìn)行.這是一種運(yùn)行task的方式.
前面論述過的invoke方法會(huì)先去先去嘗試本地執(zhí)行,然后才去等待,故我們自己new一個(gè)ForkJoinTask,一樣可以通過invoke直接執(zhí)行,這是第二種運(yùn)行task的方式.
前面論述的join方法在某種情況下也是一種task的運(yùn)行方式,在當(dāng)前線程是ForkJoinWorkerThread時(shí),會(huì)去嘗試將task出隊(duì)并doExec,也就是會(huì)先用本線程執(zhí)行一次,不成功才干等,非ForkJoinWorkerThread則直接干等了.顯然我們可以自己構(gòu)建一個(gè)ForkJoinWorkerThread并去join,這時(shí)會(huì)將任務(wù)出隊(duì)并執(zhí)行(但存在一個(gè)問題:什么時(shí)候入隊(duì)).且出隊(duì)后若未執(zhí)行成功,則awaitJoin(參考ForkJoinPool::awaitJoin),此時(shí)因任務(wù)已出隊(duì),不會(huì)被竊取或幫助(在awaitJoin中會(huì)有helpStealer,但其實(shí)任務(wù)是當(dāng)前線程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子類無法獲取這個(gè)已出隊(duì)的任務(wù),比如CountedCompleter使用時(shí),可以在compute中新生成的Completer時(shí),將源CountedCompleter(ForkJoinTask的子類)作為新生成的CountedCountedCompleter的completer(該子類中的一個(gè)字段),這樣,若有一個(gè)ForkJoinWorkerThread竊取了這個(gè)新生成的CountedCompleter,可以通過completer鏈表找到先前被出隊(duì)的CountedCompleter(ForkJoinTask).關(guān)于CountedCompleter多帶帶文章詳述.
除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread調(diào)用的情況,不使用ForkJoinPool的submit?execute入池,如何能讓一個(gè)ForkJoinTask在將來執(zhí)行?我們來看后面的方法.
//fork方法,將當(dāng)前任務(wù)入池. public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //如果當(dāng)前線程是ForkJoinWorkerThread,將任務(wù)壓入該線程的任務(wù)隊(duì)列. ((ForkJoinWorkerThread)t).workQueue.push(this); else //否則調(diào)用common池的externalPush方法入隊(duì). ForkJoinPool.common.externalPush(this); return this; }
顯然,我們還可以通過對(duì)一個(gè)ForkJoinTask進(jìn)行fork方法入池,入哪個(gè)池完全取決于當(dāng)前線程的類型.這是第四種讓任務(wù)能被運(yùn)行的方式.
同樣,我們也看到了第五種方式,ForkJoinPool.common其實(shí)就是一個(gè)常量保存的ForkJoinPool,它能夠調(diào)用externalPush,我們自然也可以直接new一個(gè)ForkJoinPool,然后將當(dāng)前task進(jìn)行externalPush,字面意思外部壓入.這種辦法,非ForkJoinWorkerThread也能將任務(wù)提交到非common的ForkJoinPool.
從名字來看,ForkJoinTask似乎已經(jīng)說明了一切,按照官方的注釋也是如此.對(duì)一個(gè)task,先Fork壓隊(duì),再Join等待執(zhí)行結(jié)果,這是一個(gè)ForkJoinTask的執(zhí)行周期閉環(huán)(但不要簡單理解為生命周期,前面提到過,任務(wù)可以被重新初始化,而且重新初始化時(shí)還會(huì)清空ExceptionNode數(shù)組上的已回收成員).
到此為止,ForkJoinTask的核心函數(shù)和api已經(jīng)基本了然,其它同類型的方法以及周邊的方法均不難理解,如invokeAll的各種變種.下面來看一些"周邊"類型的函數(shù).有前述的基礎(chǔ),它們很好理解.
//取消一個(gè)任務(wù)的執(zhí)行,直接將status設(shè)置成CANCELLED,設(shè)置后判斷該status 是否為CANCELLED,是則true否則false. public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } //判斷是否完成,status小于0代表正常完成/異常完成/取消,很好理解. public final boolean isDone() { return status < 0; } //判斷當(dāng)前任務(wù)是否取消. public final boolean isCancelled() { //status前4位 return (status & DONE_MASK) == CANCELLED; } public final boolean isCompletedAbnormally() { //是否為異常完成,前面說過,CANCELLED和EXCEPTIONAL均小于NORMAL return status < NORMAL; } //是否正常完成. public final boolean isCompletedNormally() { //完成態(tài)位等于NORMAL return (status & DONE_MASK) == NORMAL; } //獲取異常. public final Throwable getException() { int s = status & DONE_MASK; //當(dāng)為正常完成或未完成時(shí),返回null. return ((s >= NORMAL) ? null : //是取消時(shí),新建一個(gè)取消異常. (s == CANCELLED) ? new CancellationException() : //不是取消,參考前面提到的getThrowableException. getThrowableException()); } //使用異常完成任務(wù). public void completeExceptionally(Throwable ex) { //參考前述的setExceptionalCompletion, //ex已經(jīng)是運(yùn)行時(shí)異常或者Error,直接使用ex完成,若是受檢異常,包裝成運(yùn)行時(shí)異常. setExceptionalCompletion((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } //使用value完成任務(wù). public void complete(V value) { try { //設(shè)置原始結(jié)果,它是一個(gè)空方法.前面說過ForkJoinTask沒有維護(hù)result之類的結(jié)果字段,子類可自行發(fā)揮. setRawResult(value); } catch (Throwable rex) { //前述步驟出現(xiàn)異常,就用異常方式完成. setExceptionalCompletion(rex); return; } //前面的結(jié)果執(zhí)行完,標(biāo)記當(dāng)前為完成. setCompletion(NORMAL); } //安靜完成任務(wù).直接用NORMAL setCompletion,沒什么好說的. public final void quietlyComplete() { setCompletion(NORMAL); } //安靜join,它不會(huì)返回result也不會(huì)拋出異常.處理集合任務(wù)時(shí),如果需要所有任務(wù)都被執(zhí)行而不是一個(gè)執(zhí)行出錯(cuò)(取消)其他也跟著出錯(cuò)的情況下, //很明顯適用,這不同于invokeAll,靜態(tài)方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)會(huì)在任何一個(gè)任務(wù)出現(xiàn)異常后取消執(zhí)行并拋出. public final void quietlyJoin() { doJoin(); } //安靜執(zhí)行一次,不返回結(jié)果不拋出異常,沒什么好說的. public final void quietlyInvoke() { doInvoke(); } //重新初臺(tái)化當(dāng)前task public void reinitialize() { if ((status & DONE_MASK) == EXCEPTIONAL) //如果當(dāng)前任務(wù)是異常完成的,清除異常.該方法參考前面的論述. clearExceptionalCompletion(); else //否則重置status為0. status = 0; } //反fork. public boolean tryUnfork() { Thread t; //當(dāng)前線程是ForkJoinWorkerThread,從它的隊(duì)列嘗試移除. return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //當(dāng)前線程不是ForkJoinWorkerThread,用common池外部移除. ForkJoinPool.common.tryExternalUnpush(this)); }
上面是一些簡單的周邊方法,大多并不需要再論述了,unfork方法很明顯在某些場景下不會(huì)成功,顯然,當(dāng)一個(gè)任務(wù)剛剛?cè)腙?duì)并未進(jìn)行后續(xù)操作時(shí),很可能成功.按前面所述,當(dāng)對(duì)一個(gè)任務(wù)進(jìn)行join時(shí),可能會(huì)成功的彈出當(dāng)前任務(wù)并執(zhí)行,此時(shí)不可能再次彈出;當(dāng)一個(gè)任務(wù)被其他線程竊取或被它本身執(zhí)行的也不會(huì)彈出.
再來看一些老朋友,在前面的文章"CompletableFuture和響應(yīng)式編程"一文中,作者曾著重強(qiáng)調(diào)過它將每個(gè)要執(zhí)行的動(dòng)作進(jìn)行壓棧(未能立即執(zhí)行的情況),而棧中的元素Completion即是ForkJoinTask的子類,而標(biāo)記該Completion是否被claim的方法和周邊方法如下:
//獲取ForkJoinTask的標(biāo)記,返回結(jié)果為short型 public final short getForkJoinTaskTag() { //status的后16位 return (short)status; } //原子設(shè)置任務(wù)的標(biāo)記位. public final short setForkJoinTaskTag(short tag) { for (int s;;) { //不停循環(huán)地嘗試將status的后16位設(shè)置為tag. if (U.compareAndSwapInt(this, STATUS, s = status, //替換的結(jié)果,前16位為原status的前16位,后16位為tag. (s & ~SMASK) | (tag & SMASK))) //返回被換掉的status的后16位. return (short)s; } } //循環(huán)嘗試原子設(shè)置標(biāo)記位為tag,前提是原來的標(biāo)記位等于e,成功true失敗false public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { if ((short)(s = status) != e) //如果某一次循環(huán)的原標(biāo)記位不是e,則返回false. return false; //同上個(gè)方法 if (U.compareAndSwapInt(this, STATUS, s, (s & ~SMASK) | (tag & SMASK))) return true; } }
還記得CompletableFuture在異步執(zhí)行Completion時(shí)要先claim嗎?claim方法中,會(huì)嘗試設(shè)置這個(gè)標(biāo)記位.這是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.
目前來看,在CompletableFuture的內(nèi)部實(shí)現(xiàn)Completion還沒有使用到ForkJoinTask的其他屬性,比如放入一個(gè)ForkJoinPool執(zhí)行(沒有任何前面總結(jié)的調(diào)用,比如用ForkJoinPool的push,execute,submit等,也沒有fork到common池).但是很明顯,道格大神令它繼承自ForkJoinTask不可能純粹只為了使用區(qū)區(qū)一個(gè)標(biāo)記位,試想一下,在如此友好支持響應(yīng)式編程的CompletableFuture中傳入的每一個(gè)action都可以生成若干新的action,那么CompletableFuture負(fù)責(zé)將這些action封裝成Completion放入ForkJoinPool執(zhí)行,將最大化利用到ForkJoin框架的工作竊取和外部幫助的功效,強(qiáng)力結(jié)合分治思想,這將是多么優(yōu)雅的設(shè)計(jì).或者在jdk9-12中已經(jīng)出現(xiàn)了相應(yīng)的Completion實(shí)現(xiàn)(盡管作者寫過JAVA9-12,遺憾的是也沒有去翻它們的源碼).
另外,盡管Completion的眾多子類也沒有result之類的表示結(jié)果的字段,但它的一些子類通過封裝,實(shí)際上間接地將這個(gè)Completion所引用的dep的result作為了自己的"result",當(dāng)然,getRawResult依舊是null,但是理念卻是相通的.
以上是ForkJoinTask的部分核心源碼,除了上述的源碼外,還有一些同屬于ForkJoinTask的核心源碼部分,比如其他的public方法(參考join?fork?invoke?即可),一些利用ForkJoinPool的實(shí)現(xiàn),要深入了解ForkJoinPool才能了解的方法,一些不太難的靜態(tài)方法等,這些沒有必要論述了.
除了核心源碼外,ForkJoinTask也提供了對(duì)Runnable,Callable的適配器實(shí)現(xiàn),這塊很好理解,簡單看一看.
//對(duì)Runnable的實(shí)現(xiàn),如果在ForkJoinPool中提交一個(gè)runnable,會(huì)用它封裝成ForkJoinTask static final class AdaptedRunnableextends ForkJoinTask implements RunnableFuture { final Runnable runnable; T result; AdaptedRunnable(Runnable runnable, T result) { //不能沒有runnable if (runnable == null) throw new NullPointerException(); this.runnable = runnable; //對(duì)runnable做適配器時(shí),可以提交將結(jié)果傳入,并設(shè)置為當(dāng)前ForkJoinTask子類的result. //前面說過,ForkJoinTask不以result作為完成標(biāo)記,判斷一個(gè)任務(wù)是否完成或異常,使用status足以, //返回的結(jié)果才使用result. this.result = result; } public final T getRawResult() { return result; } public final void setRawResult(T v) { result = v; } //前面說過提交入池的ForkJoinTask最終會(huì)運(yùn)行doExec,而它會(huì)調(diào)用exec,此處會(huì)調(diào)用run. public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L;//序列化用 } //無結(jié)果的runnable適配器 static final class AdaptedRunnableAction extends ForkJoinTask implements RunnableFuture { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //區(qū)別就是result固定為null,也不能set public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } //對(duì)runnable的適配器,但強(qiáng)制池中的工作線程在執(zhí)行任務(wù)發(fā)現(xiàn)異常時(shí)拋出 static final class RunnableExecuteAction extends ForkJoinTask { final Runnable runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //默認(rèn)null結(jié)果,set也是空實(shí)現(xiàn) public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } void internalPropagateException(Throwable ex) { //前面說過doExec會(huì)被執(zhí)行,它會(huì)調(diào)exec并catch,在catch塊中設(shè)置當(dāng)前任務(wù)為異常完成態(tài), //然后調(diào)用internalPropagateException方法,而在ForkJoinTask中默認(rèn)為空實(shí)現(xiàn). //此處將異常重新拋出,將造成worker線程拋出異常. rethrow(ex); } private static final long serialVersionUID = 5232453952276885070L; } //對(duì)callable的適配器,當(dāng)將callable提交至ForkJoinPool時(shí)使用. static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture { final Callable extends T> callable; T result; AdaptedCallable(Callable extends T> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; } //字段中有一個(gè)result,直接使用它返回. public final T getRawResult() { return result; } //result可外部直接設(shè)置. public final void setRawResult(T v) { result = v; } public final boolean exec() { try { //默認(rèn)的result用call函數(shù)設(shè)置. result = callable.call(); return true; } catch (Error err) { //catch住Error,拋出 throw err; } catch (RuntimeException rex) { //catch住運(yùn)行時(shí)異常,拋出 throw rex; } catch (Exception ex) { //catch住受檢異常,包裝成運(yùn)行時(shí)異常拋出. throw new RuntimeException(ex); } } //run方法一樣只是調(diào)用invoke,進(jìn)而調(diào)用doExec. public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } //runnable生成適配器的工具方法 public static ForkJoinTask> adapt(Runnable runnable) { return new AdaptedRunnableAction(runnable); } //指定結(jié)果設(shè)置runnable的適配器工具方法 public static ForkJoinTask adapt(Runnable runnable, T result) { return new AdaptedRunnable (runnable, result); } //對(duì)callable生成適配器的方法. public static ForkJoinTask adapt(Callable extends T> callable) { return new AdaptedCallable (callable); }
以上的代碼都不復(fù)雜,只要熟悉了ForkJoinTask的本身代碼結(jié)構(gòu),對(duì)于這一塊了解非常容易,這也間接說明了ForkJoinPool中是如何處理Runnable和Callable的(因?yàn)镕orkJoinPool本身也是一種線程池,可以接受提交Callable和Runnable).
將runnable提交到pool時(shí),可以指定result,也可以不指定,也可以用submit或execute方法區(qū)分異常處理行為,ForkJoinPool會(huì)自行選擇相應(yīng)的適配器.
將callable?提交到pool時(shí),pool會(huì)選擇對(duì)callable的適配器,它的結(jié)果將為task的結(jié)果,它的異常將為task的異常.
到此為止,ForkJoinTask的源碼分析完成.
后語本文詳細(xì)分析了ForkJoinTask的源碼,并解釋了前文CompletableFuture中Completion與它的關(guān)聯(lián),以及分析了Completion繼承自ForkJoinTask目前已帶來的功能利用(tag)和將來可能增加的功用(一個(gè)Completion產(chǎn)生若干多個(gè)Completion并在ForkJoinPool中運(yùn)行,還支持工作竊取).
同時(shí)本文也對(duì)ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream?api中的并行流進(jìn)行了略微的描述.
在文章的最后,或許有一些新手讀者會(huì)好奇,我們究竟什么時(shí)候會(huì)使用ForkJoinTask?
首先,如果你在項(xiàng)目中大肆使用了流式計(jì)算,并使用了并行流,那么你已經(jīng)在使用了.
前面提過,官方解釋ForkJoinTask可以視作比線程輕量許多的實(shí)體,也是輕量的Future.結(jié)合在源碼中時(shí)不時(shí)出來秀存在感的ForkJoinWorkerThread,顯然它就是據(jù)說比普通線程輕量一些的線程,在前面的源碼中可以看出,它維護(hù)了一組任務(wù)的隊(duì)列,每個(gè)線程負(fù)責(zé)完成隊(duì)列中的任務(wù),也可以偷其他線程的任務(wù),甚至池外的線程都可以時(shí)不時(shí)地來個(gè)join,順便幫助出隊(duì)執(zhí)行任務(wù).
顯然,對(duì)于重計(jì)算,輕io,輕阻塞的任務(wù),適合使用ForkJoinPool,也就使用了ForkJoinTask,你不會(huì)認(rèn)為它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的適配器ForkJoinPool在這種情況下必用的,可以去翻相應(yīng)的源碼.
本章沒有去詳述CountedCompleter,但前面論述時(shí)說過,你可以在exec()中將一個(gè)計(jì)算復(fù)雜的任務(wù)拆解為小的子任務(wù),然后將子任務(wù)入池執(zhí)行,父任務(wù)合并子任務(wù)的結(jié)果.這種分治的算法此前基本是在單線程模式下運(yùn)行,使用ForkJoinTask,則可以將這種計(jì)算交給一個(gè)ForkJoinPool中的所有線程并行執(zhí)行.
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/77869.html
摘要:前言在前面的三篇文章中先后介紹了框架的任務(wù)組件體系體系源碼并簡單介紹了目前的并行流應(yīng)用場景框架本質(zhì)上是對(duì)的擴(kuò)展它依舊支持經(jīng)典的使用方式即任務(wù)池的配合向池中提交任務(wù)并異步地等待結(jié)果毫無疑問前面的文章已經(jīng)解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務(wù)組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹...
摘要:前言在前面的文章框架之中梳理了框架的簡要運(yùn)行格架和異常處理流程顯然要理解框架的調(diào)度包含工作竊取等思想需要去中了解而對(duì)于的拓展和使用則需要了解它的一些子類前文中偶爾會(huì)提到的一個(gè)子類直譯為計(jì)數(shù)的完成器前文也說過的并行流其實(shí)就是基于了框架實(shí)現(xiàn)因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡要運(yùn)行格架和異常處理流程,顯然要理解ForkJoi...
摘要:對(duì)于任務(wù)的分割,要求各個(gè)子任務(wù)之間相互獨(dú)立,能夠并行獨(dú)立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來。使用方法會(huì)阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個(gè)小...
摘要:第二步執(zhí)行任務(wù)并合并結(jié)果。使用兩個(gè)類來完成以上兩件事情我們要使用框架,必須首先創(chuàng)建一個(gè)任務(wù)。用于有返回結(jié)果的任務(wù)。如果任務(wù)順利執(zhí)行完成了,則設(shè)置任務(wù)狀態(tài)為,如果出現(xiàn)異常,則紀(jì)錄異常,并將任務(wù)狀態(tài)設(shè)置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的...
摘要:內(nèi)部類,用于對(duì)和異常進(jìn)行包裝,從而保證對(duì)進(jìn)行只有一次成功。是取消異常,轉(zhuǎn)換后拋出。判斷是否使用的線程池,在中持有該線程池的引用。 前言 近期作者對(duì)響應(yīng)式編程越發(fā)感興趣,在內(nèi)部分享JAVA9-12新特性過程中,有兩處特性讓作者深感興趣:1.JAVA9中的JEP266對(duì)并發(fā)編程工具的更新,包含發(fā)布訂閱框架Flow和CompletableFuture加強(qiáng),其中發(fā)布訂閱框架以java.base...
閱讀 855·2021-11-25 09:43
閱讀 3688·2021-11-19 09:40
閱讀 890·2021-09-29 09:34
閱讀 1799·2021-09-26 10:21
閱讀 880·2021-09-22 15:24
閱讀 4201·2021-09-22 15:08
閱讀 3279·2021-09-07 09:58
閱讀 2686·2019-08-30 15:55