国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

ForkJoin框架之CompletableFuture

lindroid / 3946人閱讀

摘要:內(nèi)部類,用于對(duì)和異常進(jìn)行包裝,從而保證對(duì)進(jìn)行只有一次成功。是取消異常,轉(zhuǎn)換后拋出。判斷是否使用的線程池,在中持有該線程池的引用。

前言

近期作者對(duì)響應(yīng)式編程越發(fā)感興趣,在內(nèi)部分享"JAVA9-12"新特性過(guò)程中,有兩處特性讓作者深感興趣:
1.JAVA9中的JEP266對(duì)并發(fā)編程工具的更新,包含發(fā)布訂閱框架Flow和CompletableFuture加強(qiáng),其中發(fā)布訂閱框架以java.base模塊下的java.util.concurrent.Flow及其中的幾個(gè)內(nèi)部類/接口為組成部分,它們的名稱和作用如下,摘自JAVA12的Flow api文檔。


2.JAVA9中孵化,JAVA11中標(biāo)準(zhǔn)化的HttpClient,在之前分享的JAVA9-12新特性一文中曾引用摘自網(wǎng)絡(luò)的HttpClient代碼片段:
片段1:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
    .thenApply(HttpResponse::body);
}

片段2:

HttpClient client = HttpClient.newHttpClient();
List urls = List.of("http://www.baidu.com","http://www.alibaba.com/","http://www.tencent.com");
List requests = urls.stream()
    .map(url -> HttpRequest.newBuilder(URI.create(url)))
    .map(reqBuilder -> reqBuilder.build())
    .collect(Collectors.toList());

List>> futures = requests.stream()
    .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString()))
    .collect(Collectors.toList());
futures.stream()
    .forEach(e -> e.whenComplete((resp,err) -> {
        if(err != null){
            err.printStackTrace();
        }else{
            System.out.println(resp.body());
            System.out.println(resp.statusCode());
        }
    }));
CompletableFuture.allOf(futures
    .toArray(CompletableFuture[]::new))
    .join();
}

在片段1中,thenApply方法是CompletableFuture的成員,client.sendAsync返回的是一個(gè)CompletableFuture。這兩段代碼很好閱讀,甚至說(shuō)猜出其中的意義。片段2可以說(shuō)對(duì)于作者目前的書寫習(xí)慣是一個(gè)全面的顛覆,顯然我們可以預(yù)先定義響應(yīng)行為,而行為的執(zhí)行時(shí)間則由前一個(gè)階段的實(shí)際完成時(shí)間決定。片段2中的whenComplete方法很好理解,最后一行用allOf生成一個(gè)類似樹的依賴結(jié)構(gòu),在當(dāng)前方法中等待所有CompletableFuture執(zhí)行完成。

簡(jiǎn)單看這兩段代碼,響應(yīng)式編程的魅力可見一斑,甚至可以說(shuō)是美妙不可言。
那么,作為JAVA9中額外照顧增強(qiáng),HttpClient賴以實(shí)現(xiàn)的CompletableFuture,它是何方神圣呢?

CompletionStage接口

CompletionStage是什么?不妨賣個(gè)關(guān)子先。
作者目前使用的JDK版本為8,盡管它不包含9之后的增強(qiáng),萬(wàn)幸CompletionStage是從JDK8引入,因此足以用以了解這一偉大神器了。近期作者在公司使用的一些開源框架中,發(fā)現(xiàn)至處間接對(duì)它的使用:
1.持久化框架Redission。它內(nèi)部使用一個(gè)RedissonExecutorService(實(shí)現(xiàn)ScheduledExecutorService)和PromiseDelegator(實(shí)現(xiàn)CompletionStage,而CompletableFuture同樣也實(shí)現(xiàn)了CompletionStage)來(lái)異步地執(zhí)行task。
2.apollo配置中心。它提供了配置變更的異步通知機(jī)制,而這依賴于spring web-mvc提供的DeferredResult,而在異步處理return value時(shí),DeferredResult的setResult同樣也是相應(yīng)的CompletionStage執(zhí)行。

//例:阿波羅NotificationControllerV2拉取通知接口
@GetMapping
public DeferredResult>>     pollNotification(
  @RequestParam(value = "appId") String appId,
  @RequestParam(value = "cluster") String cluster,
  @RequestParam(value = "notifications") String notificationsAsString,
  @RequestParam(value = "dataCenter", required = false) String dataCenter,
  @RequestParam(value = "ip", required = false) String clientIp) {
List notifications = null;
//省略無(wú)關(guān)代碼
//DeferredResultWrapper是apollo作者包裝的spring DeferredResult
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
//省略無(wú)關(guān)代碼
if (!CollectionUtils.isEmpty(newNotifications)) {
  deferredResultWrapper.setResult(newNotifications);
} else {
  deferredResultWrapper
      .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
 
  deferredResultWrapper.onCompletion(() -> {
    //unregister all keys
    for (String key : watchedKeys) {
      deferredResults.remove(key, deferredResultWrapper);
    }
    logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
  });
 
  //省略
return deferredResultWrapper.getResult();
}

在spring的CompletionStageReturnValueHandler的handleReturnValue()方法中,如下異步地處理響應(yīng)結(jié)果:

@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
        ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
 
    if (returnValue == null) {
        mavContainer.setRequestHandled(true);
        return;
    }
 
    final DeferredResult deferredResult = new DeferredResult();
    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
 
    @SuppressWarnings("unchecked")
    CompletionStage future = (CompletionStage) returnValue;
    future.thenAccept(new Consumer() {
        @Override
        public void accept(Object result) {
            //在一個(gè)CompletionStage完成后執(zhí)行此方法,為defferedResult設(shè)值
            deferredResult.setResult(result);
        }
    });
    future.exceptionally(new Function() {
        @Override
        public Object apply(Throwable ex) {
            //在一個(gè)CompletionStage執(zhí)行出錯(cuò)后執(zhí)行此方法,為deferredResult設(shè)值
            deferredResult.setErrorResult(ex);
            return null;
        }
    });
}

以上代碼的future.thenAccept與future.exceptionally只是規(guī)定了兩種情況下程序接下來(lái)的運(yùn)行行為,相應(yīng)的代碼不是立即執(zhí)行,而是等到相應(yīng)的行為發(fā)生了才去執(zhí)行。很明顯,同步式編程寫流程,響應(yīng)式編程似乎就是在寫行為。
顯然,只要熟悉了CompletionStage的api,以上的代碼就絕對(duì)簡(jiǎn)單了,好了,開胃菜已上完,接下來(lái)介紹CompletionStage。
CompletionStage其實(shí)很好理解,按照官方定義,它表示一個(gè)可能異步運(yùn)行的“階段”,在該階段內(nèi)要執(zhí)行相應(yīng)的行為,而這些運(yùn)算會(huì)在另一個(gè)CompletionStage完成后開始,它自身完成后又可觸發(fā)另一個(gè)依賴的CompletionStage。

在CompletionStage中這些方法均可用來(lái)定義一個(gè)行為,行為的執(zhí)行方式可參考方法名和入?yún)ⅲ@與java8中的stream api持同樣的風(fēng)格。行為參數(shù)可以是Consumer,F(xiàn)unction,Runnable。包含accept的方法,參數(shù)會(huì)有一個(gè)Consumer,它會(huì)消費(fèi)上一或多個(gè)CompletionStage的結(jié)果;包含run的方法,參數(shù)會(huì)有一個(gè)Runnable,它的運(yùn)行不需要前面CompletionStage的執(zhí)行結(jié)果;包含apply的方法,參數(shù)會(huì)包含F(xiàn)unction,該function一般以前一或幾階段的返回值為入?yún)ⅲ宰陨淼膱?zhí)行結(jié)果作為當(dāng)前CompletionStage的結(jié)果。
CompletionStage和實(shí)現(xiàn)類ComletableFuture的方法名中也會(huì)包含either/all/any等簡(jiǎn)單的單詞,和上述的含義相組合,不難理解。
以以下三個(gè)接口為例說(shuō)明:

1.public CompletionStage runAfterBoth(CompletionStage other,Runnable action);

接口會(huì)返回一個(gè)CompletionStage,該stage僅在當(dāng)前stage和參數(shù)中的other正常完成后才會(huì)執(zhí)行參數(shù)中的action。

2.public  CompletionStage applyToEitherAsync(CompletionStage other,Function fn);

接口會(huì)返回一個(gè)CompletionStage,該stage會(huì)在當(dāng)前stage或參數(shù)中的other正常執(zhí)行完畢后異步執(zhí)行參數(shù)中的函數(shù)fn,而fn的參數(shù)就是前面執(zhí)行完畢的stage的結(jié)果,fn的返回值將是被返回的stage的結(jié)果。

3.public  CompletionStage thenAcceptBothAsync(CompletionStage other,BiConsumer action, Executor executor);

接口會(huì)返回一個(gè)CompletionStage,它會(huì)在當(dāng)前stage和參數(shù)中的other正常執(zhí)行完畢后執(zhí)行,以這兩個(gè)stage的結(jié)果作為參數(shù),在參數(shù)executor線程池中執(zhí)行action函數(shù),因?yàn)樗且粋€(gè)消費(fèi)者,因此沒有返回值。
接口的其他方法邏輯類似,不再綴述。

CompletableFuture源碼

上一節(jié)簡(jiǎn)述了CompletionStage接口的函數(shù)定義,作為官方提供的實(shí)現(xiàn)類,CompletableFuture實(shí)現(xiàn)了有關(guān)的所有接口,它的作者依舊是我等膜拜的道格大神,下面來(lái)具體分析CompletableFuture的實(shí)現(xiàn)。

類簽名:

public class CompletableFuture implements Future, CompletionStage
從簽名信息來(lái)看,CompletableFuture實(shí)現(xiàn)了Future和CompletionStage接口,這意味著它即滿足CompletableStage的階段執(zhí)行,也提供了Future中獲取該執(zhí)行結(jié)果的方法。

首先來(lái)看成員變量和核心函數(shù):

volatile Object result;       // 當(dāng)前對(duì)象的結(jié)果或者一個(gè)異常包裝對(duì)象AltResult,關(guān)于AltResult下面再看
volatile Completion stack;    // 任務(wù)棧,Completion后面再述。
 
final boolean internalComplete(Object r) {
//使用cas原子操作,將原本為null的result置為r,所有調(diào)用者都保證r不是null,因此只有第一次才能返回true。
    return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
}
 
final boolean casStack(Completion cmp, Completion val) {
//嘗試用cas原子操作將當(dāng)前stack的值從cmp換為val。
    return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}
//其中STACK,RESULT就是上面stack和result的句柄,這點(diǎn)和其他juc中的工具慣例相同
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
    try {
        final sun.misc.Unsafe u;
        UNSAFE = u = sun.misc.Unsafe.getUnsafe();
        Class k = CompletableFuture.class;
        RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
        STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
        NEXT = u.objectFieldOffset
            (Completion.class.getDeclaredField("next"));
    } catch (Exception x) {
        throw new Error(x);
    }
}

stack的類型為Completion,為了方便理解,在介紹Completion類之前,先看幾個(gè)聲明在CompletableFuture的常量

static final int SYNC   =  0;//同步
static final int ASYNC  =  1;//異步
static final int NESTED = -1;//嵌套

再來(lái)看Completion類的結(jié)構(gòu)

//繼承ForkJoinTask,實(shí)現(xiàn)Runnable,以及簽名接口AsynchronousCompletionTask
abstract static class Completion extends ForkJoinTask  implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // 指向下一個(gè)Completion
 
    //當(dāng)被觸發(fā)時(shí),執(zhí)行completion動(dòng)作,如果存在需要傳遞的行為,
    //返回一個(gè)代表該行為的CompletableFuture  
    //參數(shù)只能是上面提到的SYNC,ASYNC,NESTED,后面留意它的正負(fù)。  
    abstract CompletableFuture tryFire(int mode);
 
    //如果當(dāng)前completion依舊是可觸發(fā)的,則返回true,這會(huì)在清理任務(wù)棧時(shí)使用. 
    abstract boolean isLive();
    //繼承自Runnable,直接調(diào)用tryFile,參數(shù)為1
    public final void run() { tryFire(ASYNC); }
    //繼承自ForkJoinTask,直接調(diào)用tryFile,參數(shù)為1,返回true
    public final boolean exec() { tryFire(ASYNC); return true; }
    //繼承自ForkJoinTask,直接返回null
    public final Void getRawResult() { return null; }
    //繼承自ForkJoinTask,空方法。
    public final void setRawResult(Void v) {}
}

上面列舉了內(nèi)部類Completion的全部代碼,它繼承并實(shí)現(xiàn)了ForkJoinTask和Runnable中的抽象方法,同時(shí)聲明了tryFire這個(gè)抽象方法供子類實(shí)現(xiàn)。因?yàn)槔^承了ForkJoinTask,這意味著Completion也是一個(gè)任務(wù),且它可能在ForkJoinPool中執(zhí)行。關(guān)于Completion和它的子類后面詳述。先來(lái)繼續(xù)看核心函數(shù)和成員實(shí)現(xiàn)。

/** 嘗試將一個(gè)任務(wù)壓棧,成功返回true */
final boolean tryPushStack(Completion c) {
    Completion h = stack;
    lazySetNext(c, h);//把當(dāng)前的棧設(shè)置為c的next
    //嘗試把當(dāng)前棧(h)更新為新值(c)
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
//lazySetNext定義
static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);
}

方法tryPushStack的流程很簡(jiǎn)單,先調(diào)用lazySetNext將當(dāng)前棧設(shè)置為參數(shù)的next,這樣達(dá)到了棧的后入為頂層的目的,然后試圖將頂部元素設(shè)置為新壓入棧的c。

/** 不加鎖將任務(wù)壓棧,使用cas加自旋的方式,這也是道格大神的經(jīng)典. */
final void pushStack(Completion c) {
    do {} while (!tryPushStack(c));
}

接下來(lái)是一些對(duì)輸出結(jié)果編碼的代碼。

//內(nèi)部類,用于對(duì)null和異常進(jìn)行包裝,從而保證對(duì)result進(jìn)行cas只有一次成功。
static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}
 
/** 空值用一個(gè)ex為null的AltResult表示 */
static final AltResult NIL = new AltResult(null);
 
/** 使用上面的NIL完成任務(wù),若任務(wù)已經(jīng)被完成過(guò),返回false */
final boolean completeNull() {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       NIL);
}
 
/** 對(duì)空值進(jìn)行編碼,使用NIL */
final Object encodeValue(T t) {
    return (t == null) ? NIL : t;
}
 
/** 使用t完成當(dāng)前任務(wù),t是null時(shí)使用NIL作為結(jié)果,否則使用t */
final boolean completeValue(T t) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}
 
//對(duì)異常進(jìn)行編碼,返回一個(gè)AltResult,其值ex取決于參數(shù)x,
//若x為CompletionException則直接用x賦值ex,
//否則用CoimpletionException包一層。 
static AltResult encodeThrowable(Throwable x) {
    return new AltResult((x instanceof CompletionException) ? x :
                         new CompletionException(x));
}
 
/** 使用參數(shù)提供的異常的編碼結(jié)果完成任務(wù),若result已非空,返回false */
final boolean completeThrowable(Throwable x) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x));
}
 
// 如果x非CompletionException,將它包裹成CompletionException返回。
//如果不是,則判斷,若r是AltResult且其ex就是參數(shù)x的值,則將r返回。  
// 否則將x包裹成AltResult返回。
static Object encodeThrowable(Throwable x, Object r) {
    if (!(x instanceof CompletionException))
        x = new CompletionException(x);
    else if (r instanceof AltResult && x == ((AltResult)r).ex)
        return r;
    return new AltResult(x);
}
 
// 給定一個(gè)Throwble x,一個(gè)Object r,使用上面的方法編碼的結(jié)果來(lái)嘗試完成。  
final boolean completeThrowable(Throwable x, Object r) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeThrowable(x, r));
}
 
//如果x不是null,使用上面的encodeThrowable對(duì)x編碼的結(jié)果返回,否則若t是空,  
// 返回NIL,否則返回t。
Object encodeOutcome(T t, Throwable x) {
    return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
}
 
 
static Object encodeRelay(Object r) {
    Throwable x;
    //對(duì)非空參數(shù)r進(jìn)行判斷。
    //若r是AltResult且具備非空的ex,且ex并不是CompletionException類型,
    //將ex包裝成CompletionException,并包裹成AltResult返回。
    //其他情況直接返回r。
    return (((r instanceof AltResult) &&
             (x = ((AltResult)r).ex) != null &&
             !(x instanceof CompletionException)) ?
            new AltResult(new CompletionException(x)) : r);
}
 
 
final boolean completeRelay(Object r) {
//這段代碼的邏輯和上一個(gè)方法聯(lián)合去看,當(dāng)前未完成的情況下,嘗試使用參數(shù)r完成。
//如果r是異常,嘗試將它包裝成CompletionException并外包一層AltResult。
//用這個(gè)AltResult完成。
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       encodeRelay(r));
}

CompletableFuture本質(zhì)也是一個(gè)Future,因此也會(huì)支持異步的阻塞的result獲取。因?yàn)樵谕瓿蛇@個(gè)future時(shí),為了便于處理和維護(hù),使用了編碼的結(jié)果,固在讀取結(jié)果時(shí),也要對(duì)結(jié)果進(jìn)行解碼。

/**  * 供future.get()使用。  */
private static  T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    if (r == null)
    //參數(shù)r代表一個(gè)CompletableFuture的result,因?yàn)樗鼤?huì)對(duì)異常和null進(jìn)行編碼。
    //故null可以視為get的中間被擾動(dòng)的結(jié)果。
        throw new InterruptedException();
    if (r instanceof AltResult) {
        Throwable x, cause;
        //這一段很簡(jiǎn)單,是AltResult,ex是空返回空。
        if ((x = ((AltResult)r).ex) == null)
            return null;
             
        if (x instanceof CancellationException)
        //ex是取消異常,轉(zhuǎn)換后拋出。
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)
            //異常是包裝異常CompletionException,取出被包裝的異常拋出。
            x = cause;
        throw new ExecutionException(x);
    }
    //result不是null也不能體現(xiàn)異常,強(qiáng)轉(zhuǎn)返回。
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}
 
//reportJoin方法相對(duì)簡(jiǎn)單,因?yàn)閖oin操作會(huì)一直等待,r能保證非空。  
//對(duì)于非AltResult類型的r直接強(qiáng)轉(zhuǎn)返回,AltResult類型的處理與  
//reportGet類似,但是不解CompletionException,直接拋出。  
//此方法拋出的異常均不受檢。 
private static  T reportJoin(Object r) {
    if (r instanceof AltResult) {
        Throwable x;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if (x instanceof CompletionException)
            throw (CompletionException)x;
        throw new CompletionException(x);
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

相應(yīng)的get和join方法實(shí)現(xiàn)。

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

可以看出,get和join方法分別先調(diào)用reportGet,reportJoin,若得到的空結(jié)果,會(huì)繼續(xù)調(diào)用waitingGet方法,只是參數(shù)分別為true和false,waitingGet方法的實(shí)現(xiàn)需要先了解剩余的核心函數(shù)以及Completion子類,稍后再看。

一些與異步操作的準(zhǔn)備:

/**  * 標(biāo)識(shí)是異步方法產(chǎn)生的任務(wù)的接口,對(duì)于異步行為的監(jiān)控,debug,追蹤會(huì)很有用。  
*     在jdk8的CompletableFuture實(shí)現(xiàn)中,它有三個(gè)直接實(shí)現(xiàn)類,AsyncRun,  
*     AsyncSupply以及前面提到過(guò)的Completion。  
*/
public static interface AsynchronousCompletionTask {
}
//判斷是否使用ForkJoinPool的common線程池,在ForkJoinTask中持有該線程池的引用。
//判斷規(guī)則是可用cpu核數(shù)大于1.
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);
 
//異步線程池,根據(jù)上述判斷,決定使用commonPool還是ThreadPerTaskExecutor,  
// 后者是一個(gè)對(duì)每一個(gè)任務(wù)都新建一個(gè)線程的low逼線程池。 
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 
/** low逼線程池源碼,沒什么可說(shuō)的 */
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}
 
 
static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
    //判斷參數(shù)執(zhí)行器(線程池的父接口,一般會(huì)傳入線程池)是否需要屏蔽,
    //如果參數(shù)就是ForkJoinPool.commonPool()并且經(jīng)前面的系統(tǒng)判斷
    //useCommonPool為false,則強(qiáng)制使用asyncPool。
        return asyncPool;
    if (e == null) throw new NullPointerException();
//非空且通過(guò)驗(yàn)證,返回參數(shù)e
return e;

}

為異步做的這些準(zhǔn)備很好理解,屏蔽不合理的線程池使用,在用戶提供的線程池,commonPool和ThreadPerTaskExecutor之中擇一,在后續(xù)的操作中需要使用它們。
還有兩個(gè)重要的核心函數(shù),是道格大神的神作。

final void postComplete() {
     
    CompletableFuture f = this; Completion h;//初始化f為this
    while ((h = f.stack) != null ||//1,f的棧非空
           (f != this && (h = (f = this).stack) != null)) {//2 f的棧為空且不是this,重置
        CompletableFuture d; Completion t;
        if (f.casStack(h, t = h.next)) {//3 h出棧
            if (t != null) {//4 出棧的h不是最后一個(gè)元素,最后一個(gè)元素直接執(zhí)行7即可,減少一次循環(huán)cas競(jìng)態(tài)
                if (f != this) {//f不是this
                    pushStack(h);//5 將f剛出棧的h(頂)入this的棧(頂)
                    continue;
                }
                h.next = null;    //6 detach 幫助gc
            }
            //tryFire參數(shù)為NESTED,即-1,這是它唯一一次使用。
            f = (d = h.tryFire(NESTED)) == null ? this : d;//7 f棧的最后一個(gè)元素或者就是this棧中的元素
        }
    }
}

這寥寥數(shù)行代碼的含金量不可小覷。它應(yīng)在將要完成時(shí)調(diào)用,很明顯,它會(huì)將當(dāng)前CompletableFuture的棧以及傳遞依賴的其他CompletableFuture的棧清空。為了便于解釋,在相應(yīng)的代碼上打出了編號(hào),下面詳細(xì)分析。

調(diào)用該方法,首先進(jìn)入1,此時(shí)f是當(dāng)前CompletableFuture,h是它的stack,滿足不為空的判斷,進(jìn)入3.

到達(dá)3時(shí),將棧頂Completion h出棧,一般除非并發(fā)多個(gè)線程對(duì)同一個(gè)CompletableFuture調(diào)用postComplete,否則一定會(huì)成功并到達(dá)4。若出現(xiàn)多個(gè)線程調(diào)用,cas失敗,則重新循環(huán)。

到達(dá)4后,若發(fā)現(xiàn)f的棧已空,則直接進(jìn)入7,否則判斷f是否為當(dāng)前CompletableFuture,若是,則進(jìn)行6,取消h和t的關(guān)聯(lián),若不是則進(jìn)入5,將h(f中剛剛移除的棧頂)壓入當(dāng)前Completable的棧并重新循環(huán)。

顯然,只要處理當(dāng)前CompletableFuture的棧,就一定會(huì)執(zhí)行7,只要處理的是另一個(gè)CompletableFuture的棧,就會(huì)將其出棧,然后壓入當(dāng)前CompletableFuture的棧。

在7處,會(huì)嘗試執(zhí)行棧頂?shù)腃ompletion的tryFile方法,它會(huì)返回一個(gè)可能為null的CompletableFuture,若非空,則賦給f,否則將this賦給f。

所以這段方法的真實(shí)執(zhí)行流程:當(dāng)前CompletableFuture的棧中元素逐個(gè)出棧并tryFile,發(fā)現(xiàn)新的CompletableFuture,將它的元素反向壓入本CompletableFuture的棧,壓入結(jié)束后,繼續(xù)對(duì)棧中元素逐個(gè)出棧并tryFire,發(fā)現(xiàn)非空CompletableFuture則繼續(xù)上述過(guò)程。直到本CompletableFuture的棧中不再有元素(此時(shí)tryFire返回的CompletableFuture棧也是空的)為止。

膜拜道格大神的同時(shí),順便點(diǎn)一下,這似乎是一種避免遞歸的方式。只不過(guò)tryFire返回的CompletableFuture中的棧元素將會(huì)反向執(zhí)行。

/* 遍歷棧并去除死亡任務(wù)/

final void cleanStack() {
    for (Completion p = null, q = stack; q != null;) {//初始條件,q指向null時(shí)終止。
        Completion s = q.next;//循環(huán)內(nèi)第一行,q永遠(yuǎn)指向棧頂,s永遠(yuǎn)指向棧頂?shù)诙€(gè)元素或者null
        if (q.isLive()) {//a只要q存活,就將p指向q,并將q指向s
            p = q;
            q = s;
        }
        else if (p == null) {//b q不存活,p是null,兩種可能,從未見到存活的節(jié)點(diǎn),或執(zhí)行過(guò)最后的重啟
            casStack(q, s);/將q出棧
            q = stack;//變量q重新指向新的棧頂。
        }
        else {
            p.next = s;//q已死亡,且當(dāng)前已經(jīng)找到過(guò)存活的元素。p指向q的下一個(gè)元素s,從而將q出棧
            if (p.isLive())//c判斷p是否存活,而p只能是null或者最近一個(gè)存活的Completion
                q = s;//6.q前進(jìn)
            else {//4
                p = null;  //d 重新將p置null并將q指向當(dāng)前的棧,重啟循環(huán)。
                q = stack;
            }
        }
    }
}

為了讓這段代碼的說(shuō)明更加清晰,不妨舉個(gè)簡(jiǎn)單的例子說(shuō)明。

假定當(dāng)前CompletableFuture的棧中有1-9個(gè)元素,其中14568在調(diào)用cleanStack方法時(shí)已死亡,在執(zhí)行過(guò)程中,也出現(xiàn)方法執(zhí)行過(guò)程中出現(xiàn)死亡的狀態(tài)。
進(jìn)入循環(huán),p為null,q指向1,滿足循環(huán)條件,開始第一輪循環(huán)。
第一輪循環(huán)進(jìn)入后,s指向2,p為null,q指向1,是個(gè)死亡對(duì)象,因此在第一個(gè)判斷條件a處未能通過(guò),b判斷條件為真,q被移除,循環(huán)結(jié)束,此時(shí)p為null,q指向2,棧變?yōu)?-9.
第二輪循環(huán)進(jìn)入,s指向3,p為null,q指向2,是個(gè)存活對(duì)象,進(jìn)入a,循環(huán)結(jié)束,p指向2,q指向3。棧依舊為2-9.
第三輪循環(huán)進(jìn)入,s指向4,p為2,q指向3,是存活對(duì)象,進(jìn)入a,循環(huán)結(jié)束,p指向3,q指向4,棧保持2-9不變。
第四輪循環(huán)進(jìn)入,s指向5,p為3,q指向4,是個(gè)死亡對(duì)象,p非空且存活,進(jìn)入c,則p保持為3,3的next指向5,q指向5.循環(huán)結(jié)束,棧變?yōu)?356789.
第五輪循環(huán)進(jìn)入,s指向6,p指向3,q指向5,是個(gè)死亡對(duì)象,p非空且存活,進(jìn)入c,p保持為3,3的next指向6,q指向6,循環(huán)結(jié)束,棧變?yōu)?36789.
第六輪循環(huán)進(jìn)入,s指向7,p指向3,q指向6,是個(gè)死亡對(duì)象,假定此時(shí)3死亡,則3的next指向7,進(jìn)入d分支,p為null,q為2,棧為23789.
第七輪循環(huán)進(jìn)入,s指向3,p為null,q指向2,是個(gè)存活對(duì)象,p指向2,q指向3,棧依舊為23789.
第八輪循環(huán)進(jìn)入,s指向4,p指向2,q指向3,是個(gè)死亡對(duì)象,p非空且存活,進(jìn)入c,則p保持為2,q指向7,3的next指向7,棧變2789.
第九輪進(jìn)入,s指向8,p指向2,q指向7,是個(gè)存活對(duì)象,進(jìn)入a分支,p變?yōu)?,q變?yōu)?,棧保持2789.假定此步之后2死亡,但此時(shí)p已經(jīng)指向7.
第十輪進(jìn)入,s指向9,p指向7,q指向8,是個(gè)死亡對(duì)象,p當(dāng)前指向7且存活,所以盡管2不存活,仍舊進(jìn)入分支c,p保持為7,q指向9,7的next指向9.棧為279.
第十一輪,s為null,p指向7,q指向9,是個(gè)存活對(duì)象,則進(jìn)入a分支,p變?yōu)?,q變?yōu)閚ull,棧保持279.
因q為null,循環(huán)終止。棧經(jīng)過(guò)清理只剩下279三個(gè)元素,其中2因?yàn)榍珊隙劳銮椅幢磺謇怼?/p>

下面回到Completion,Completion是一個(gè)抽象類,前面已經(jīng)簡(jiǎn)單展示它的源碼,它的子類如下:


可以看到有三個(gè)直接子類,CoCompletion,Signaller和UniCompletion。UniCompletion又有若干子類,它們分別作為一些CompletionStage中聲明方法的實(shí)現(xiàn)工具,很明顯,道格大神在此處大量使用了策略模式。
先來(lái)簡(jiǎn)單看一下CoCompletion的實(shí)現(xiàn):

static final class CoCompletion extends Completion {
    //CoCompletion完全委托給base執(zhí)行。
    BiCompletion base;
    CoCompletion(BiCompletion base) { this.base = base; }
    final CompletableFuture tryFire(int mode) {
        BiCompletion c; CompletableFuture d;
        if ((c = base) == null || (d = c.tryFire(mode)) == null)
            //base未指定,或base的tryFire返回null,則返回null。
            return null;
        base = null; // 解除關(guān)聯(lián),再isLive判斷為死亡。
        //返回的d就是base的tryFire返回的非空CompletableFuture
        return d;
    }
    final boolean isLive() {
        BiCompletion c;
        //存活標(biāo)準(zhǔn),base非空且base的dep非空。
        return (c = base) != null && c.dep != null;
    }
}

CoCompletion雖然是Completion的直接子類,但它依賴了BiCompletion,且BiCompletion是UniCompletion的直接子類,先來(lái)看UniCompletion.

abstract static class UniCompletion extends Completion {
    Executor executor;//用來(lái)執(zhí)行任務(wù)的執(zhí)行器                
    CompletableFuture dep; //要完成的依賴CompletableFuture
    CompletableFuture src; //作為行為源的CompletableFuture
 
    UniCompletion(Executor executor, CompletableFuture dep,
                  CompletableFuture src) {
        this.executor = executor; this.dep = dep; this.src = src;
    }
 
     
    final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {//1
            //compareAndSetForkJoinTaskTag是ForkJoinTask的方法,利用cas,保證任何一種情況下,該行為只能執(zhí)行一次。
            if (e == null)
                //不精確的說(shuō)法是同步調(diào)用返回true,異步調(diào)用返回false,然后在線程池中執(zhí)行。
                //有e代表要在執(zhí)行器中執(zhí)行,盡管大多數(shù)情況下e都是線程池實(shí)例,會(huì)異步運(yùn)行任務(wù)。但對(duì)于Executor來(lái)說(shuō),完全可以實(shí)現(xiàn)在同一個(gè)線程執(zhí)行。
                return true;//2.
            //對(duì)于3這行代碼,道格大神注釋就寫了個(gè)disable,為此我翻了大量代碼,發(fā)現(xiàn)根本過(guò)不了上面cas這一關(guān),所以個(gè)人有兩個(gè)理解:
            //1.對(duì)于當(dāng)前Completion而言,它的線程池只能用來(lái)做一次事,在claim之后立即置空,盡管此時(shí)還沒有執(zhí)行action,也不允許當(dāng)前Completion使用它做別的事了。
            //2.減少了一個(gè)指向該線程池的引用,線程池也有被gc的時(shí)候吧。就算不gc,關(guān)閉虛擬機(jī)或者dump的時(shí)候也能少做點(diǎn)事。
            executor = null; // 3.
            e.execute(this);//使用該線程池異步執(zhí)行,回憶上面Completion的聲明,它實(shí)現(xiàn)了runnable,在run方法中tryFire(ASYNC),參數(shù)ASYNC是正數(shù)。
        }
        return false;
    }
 
    final boolean isLive() { return dep != null; }
}

盡管UniCompletion本身代碼不多,但是有關(guān)代碼卻很繞,后面會(huì)從CompletableFuture調(diào)用開始說(shuō)明一個(gè)完整的工作流,作者本來(lái)有幾次都已經(jīng)十分艱難的“確定發(fā)現(xiàn)問題”,寫出了“問題”,但最終還是在描述過(guò)程中啟動(dòng)大腦自我否定,不得不佩服道格大神強(qiáng)大的邏輯和大腦。

很明顯,UniCompletion是一個(gè)可以擁有執(zhí)行器的Completion,它是兩個(gè)操作的結(jié)合,dep為要最終執(zhí)行的依賴操作,src為來(lái)源CompletableFuture,tryFire沒有默認(rèn)實(shí)現(xiàn),它的子類分別根據(jù)不同情況實(shí)現(xiàn)了該方法,實(shí)現(xiàn)的方式依舊是優(yōu)雅的策略模式。

claim方法要在執(zhí)行action前調(diào)用,若claim方法返回false,則不能調(diào)用action,原則上要保證action只執(zhí)行一次。

claim的意思是聲稱,開個(gè)玩笑,在美劇行尸走肉第四季,有一伙武裝分子解決為了解決內(nèi)部分配問題的提出了一個(gè)辦法,對(duì)任何事物只看誰(shuí)先喊一句”claimed“,代表”我要了“。調(diào)用claim方法和稍后運(yùn)行action的動(dòng)作發(fā)生在一個(gè)線程,因此需要該線程嘗試去claim這個(gè)action,claim成功則執(zhí)行,claim不成功則不執(zhí)行。

但在提供Executor的前提下,claim除了聲明以外,還會(huì)直接在方法內(nèi)使用該executor執(zhí)行tryFire,間接地執(zhí)行action,并返回false,避免調(diào)用者也執(zhí)行action,因?yàn)橛衏as的效果,多次claim只有第一次可能返回true。

接下來(lái)看BiCompletion,它也是一個(gè)抽象類,不同在于它有兩個(gè)源,也就是它的成員dep要等到另外兩個(gè)成員CompletableFuture(src,snd)完成,具體的依賴關(guān)系要看子類實(shí)現(xiàn)。

abstract static class BiCompletion extends UniCompletion {
    CompletableFuture snd; // 第二個(gè)源action
    BiCompletion(Executor executor, CompletableFuture dep,
                 CompletableFuture src, CompletableFuture snd) {
        super(executor, dep, src); this.snd = snd;
    }
}

BiCompletion有多個(gè)實(shí)現(xiàn)類,看名稱可以看到Apply,Accept,Run等字眼,前面已經(jīng)討論過(guò)相應(yīng)的語(yǔ)義。

以O(shè)rApply為例

static final class OrApply extends BiCompletion {
    Function fn;
    OrApply(Executor executor, CompletableFuture dep,
            CompletableFuture src,
            CompletableFuture snd,
            Function fn) {
        //構(gòu)造函數(shù),多傳一個(gè)函數(shù),該函數(shù)就是dep對(duì)應(yīng)的action。
        super(executor, dep, src, snd); this.fn = fn;
    }
    //tryFire父類沒有實(shí)現(xiàn)
    final CompletableFuture tryFire(int mode) {
        CompletableFuture d;
        CompletableFuture a;
        CompletableFuture b;
        if ((d = dep) == null ||//沒有dep,則沒有相應(yīng)的依賴行為,已經(jīng)執(zhí)行過(guò)的dep會(huì)是null。
            //執(zhí)行orApply返回false,則返回null。最后一個(gè)參數(shù)僅當(dāng)mode是ASYNC(只有它大于1)時(shí)會(huì)是this
            !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
            //到此可能是運(yùn)行過(guò),或者不滿執(zhí)行fn的條件,返回null。
            return null;
        //前面dep不是null,執(zhí)行orApply也成功了,則解除引用關(guān)聯(lián),下次運(yùn)行會(huì)直接返回null,也不影響gc。
        //回憶前面看過(guò)的核心函數(shù)postComplete,會(huì)對(duì)CompletableFuture中棧上的所有Completion進(jìn)行tryFire,
        //返回非null則進(jìn)行類似遞歸的操作,很明顯,在調(diào)用postComplete
        //方法時(shí),dep為null會(huì)返回一個(gè)null,避免了再次tryFire。
        dep = null; src = null; snd = null; fn = null;
        //正常運(yùn)行結(jié)束,調(diào)用dep的postFire并返回。
        return d.postFire(a, b, mode);
    }
}

orApply方法定義在CompletionFuture。前面沒有敘述。它不是作者稱為的”核心函數(shù)“(即各種Completion都能使用到)。

final  boolean orApply(CompletableFuture a,
                                      CompletableFuture b,
                                      Function f,
                                      OrApply c) {
    Object r; Throwable x;
    if (a == null || b == null ||
        //為r賦值用于后續(xù)的計(jì)算,因?yàn)槭莖r,r優(yōu)先取第一個(gè),第一個(gè)源action未完成的情況下再取第二個(gè)。
        ((r = a.result) == null && (r = b.result) == null) || f == null)
        //首先檢測(cè)兩個(gè)源action,若a和b均未完成,則說(shuō)明依賴dep不可被執(zhí)行,返回false。
        return false;
    //僅當(dāng)當(dāng)前(dep)未完成(result為null)時(shí),可進(jìn)行完成工作。
    tryComplete: if (result == null) {
        try {
            //前面說(shuō)過(guò),c不為null說(shuō)明是異步執(zhí)行,需要先去嘗試claim這個(gè)action。
            if (c != null && !c.claim())
                //異步且claim不成功,返回false。
                return false;
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    //如果r表示異常,調(diào)用completeThrowable核心函數(shù)并結(jié)束代碼塊,直接返回true。
                    completeThrowable(x, r);
                    break tryComplete;
                }
                //第一個(gè)非空的action(a或b)結(jié)果代表異常,但ex是null,則將r置為null并返回true。
                r = null;
            }
            //r不代表異常結(jié)果,直接強(qiáng)轉(zhuǎn),用該結(jié)果作為action的參數(shù),執(zhí)行action,用結(jié)果作為當(dāng)前的result。出現(xiàn)異常則進(jìn)入catch塊。
            @SuppressWarnings("unchecked") R rr = (R) r;
            completeValue(f.apply(rr));
        } catch (Throwable ex) {
            //上述代碼出現(xiàn)異常,調(diào)用completeThrowable完成dep(this)
            completeThrowable(ex);
        }
    }
    return true;
}

正常運(yùn)行結(jié)束還會(huì)調(diào)用dep的postFire,它也位于CompletableFuture中,但它只供 BiCompletion在tryFire成功之后才可使用,該方法源碼如下:

final CompletableFuture postFire(CompletableFuture a,
                                    CompletableFuture b, int mode) {
    //對(duì)于ab兩個(gè)源,先處理b,后處理a
    if (b != null && b.stack != null) {
        //b存在且b的棧還有元素
        if (mode < 0 || b.result == null)
            //當(dāng)為NESTED(只有它的值是-1)時(shí),或者b沒有結(jié)果時(shí),對(duì)b進(jìn)行清棧。調(diào)用postFire意味著d執(zhí)行tryFire成功,
            //即d獲得了結(jié)果,而這前提是ab之一已執(zhí)行成功(orApply的含義),所以ab可能是其一完成。
            b.cleanStack();
        else
            //非NESTED,則對(duì)b進(jìn)行postComplete,該方法內(nèi)部又會(huì)對(duì)b的棧上的每一個(gè)Completion執(zhí)行tryFire,而且用NESTED模式。
            b.postComplete();
    }
    //接下來(lái)對(duì)a直接進(jìn)行postFire,并沿用mode。
    return postFire(a, mode);
}

對(duì)a進(jìn)行postComplete的方法如下:

final CompletableFuture postFire(CompletableFuture a, int mode) {
    if (a != null && a.stack != null) {
        //棧非空
        if (mode < 0 || a.result == null)
        //類似上面的邏輯,是NESTED模式時(shí)或者a未完成時(shí),對(duì)a進(jìn)行清棧,否則對(duì)a執(zhí)行postComplete.
            a.cleanStack();
        else
            a.postComplete();
    }
    //處理a之后,處理當(dāng)前(即dep)
    if (result != null && stack != null) {
        //有結(jié)果且棧非空
        if (mode < 0)
            //NESTED模式,直接返回this。
            return this;
        else
            //非NESTED模式,執(zhí)行postComplete,其中會(huì)對(duì)d的棧中所有Completion進(jìn)行tryFire(NESTED),
            //并在每一個(gè)tryFire返回的CompletableFuture逆棧執(zhí)行同一樣操作,參見上面的源碼。
            postComplete();
    }
    return null;
}

以上是全部與OrApply的實(shí)現(xiàn)有關(guān)的源碼,下面來(lái)看一看OrApply的應(yīng)用,再簡(jiǎn)單梳理一下流程。

在CompletableFuture中有三個(gè)有關(guān)的方法:

可以看到三個(gè)方法的簽名和調(diào)用信息,這三個(gè)方法均是實(shí)現(xiàn)自CompletionStage。關(guān)于方法的字意和大致邏輯的推測(cè)方法前面已分析。

public  CompletableFuture applyToEither(
    CompletionStage other, Function fn) {
    //直接調(diào)用orApplyStage,不指定線程池。
    return orApplyStage(null, other, fn);
}
 
public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn) {
    //調(diào)用orApplyStage方法,外部不提供線程池,使用asyncPool,關(guān)于asyncPool前面已分析。
    return orApplyStage(asyncPool, other, fn);
}
 
public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn,
    Executor executor) {
    //調(diào)用orApplyStage方法,但對(duì)外面?zhèn)魅氲木€程池進(jìn)行屏蔽,條件符合則使用,不符合則更換,屏蔽原則前面已分析。
    return orApplyStage(screenExecutor(executor), other, fn);
}

可見三個(gè)方法均使用了orApplyStage方法,只是在參數(shù)上有所不同。再來(lái)看orApplyStage方法。

private  CompletableFuture orApplyStage(
    Executor e, CompletionStage o,
    Function f) {
    CompletableFuture b;
    if (f == null || (b = o.toCompletableFuture()) == null)
        //要執(zhí)行的函數(shù)未提供,或者參數(shù)o轉(zhuǎn)換的CompletableFuture也是null,則拋出空指針。
        throw new NullPointerException();
    //新建了一個(gè)dep,后面將它返回,故直接調(diào)用實(shí)現(xiàn)自CompletionStage的方法不用考慮返回空的問題,可以鏈?zhǔn)秸{(diào)用。
    CompletableFuture d = new CompletableFuture();
    //如果指定了線程池,直接進(jìn)入if。未指定線程池,首先嘗試調(diào)用orApply方法,并以this和b作參數(shù)。
    //前面分析過(guò),若條件滿足,即this和b有一個(gè)是完成態(tài),則會(huì)立即執(zhí)行f,結(jié)果或異常作為d的結(jié)果。
    //d.orApply的最后一個(gè)參數(shù)是null(c),說(shuō)明是同步操作,不會(huì)進(jìn)行c.claim操作。
    if (e != null || !d.orApply(this, b, f, null)) {
        //指定了線程池,或者嘗試d.orApply條件不滿足,轉(zhuǎn)為異步。
        //構(gòu)建OrApply對(duì)象壓入Completion棧。
        OrApply c = new OrApply(e, d, this, b, f);
        orpush(b, c);
        //壓棧后再次嘗試同步調(diào)用一次tryFire,前面分析過(guò),tryFire成功會(huì)最終調(diào)用相應(yīng)的cleanStack,postComplete等操作,
        //將死亡的Completion(各子類有不同的判定,CoCompletion判定base是null,有些判斷dep是null,而完成一般會(huì)把dep置null)
        //從棧上移除。
        c.tryFire(SYNC);
    }
    return d;
}
 
 
public CompletableFuture toCompletableFuture() {
    //直接返回this
    return this;
}
final void orpush(CompletableFuture b, BiCompletion c) {
    if (c != null) {
        //循環(huán)條件,b不存在或未完成且同時(shí)當(dāng)前CompletableFuture未完成。有任何一個(gè)完成則終止,若無(wú)完成,則執(zhí)行下面的代碼將任務(wù)入this和b的棧。
        while ((b == null || b.result == null) && result == null) {
            //將c壓入當(dāng)前CompletableFuture棧并退出循環(huán)。
            if (tryPushStack(c)) {
                if (b != null && b != this && b.result == null) {
                    //存在b,b不是當(dāng)前,b未完成時(shí)。嘗試將c封裝成CoCompletion并壓入b的棧,前面說(shuō)過(guò)
                    //這個(gè)壓入b棧的q完全依賴于c,并使用c的運(yùn)行結(jié)果。
                    Completion q = new CoCompletion(c);
                    //內(nèi)循環(huán),參數(shù)外循環(huán)說(shuō)明。
                    while (result == null && b.result == null &&
                           !b.tryPushStack(q))
                        lazySetNext(q, null); // clear on failure
                }
                break;
            }
            //到此說(shuō)明c壓入當(dāng)前棧失敗,則將c的next恢復(fù)為null。
            lazySetNext(c, null); // clear on failure
        }
    }
}

簡(jiǎn)單梳理OrApply這一條線的流程,其他線邏輯類似。

當(dāng)使用Completable的applyToEitherAsync/applyToEither時(shí),將進(jìn)入這一條線的代碼執(zhí)行,CompletableFuture在初步驗(yàn)參后,會(huì)封裝一個(gè)d用于表示結(jié)果的CompletableFuture,稍后將會(huì)用它作為返回值。隨后根據(jù)入?yún)⒉煌M(jìn)入不停的邏輯。

同步的情況,即未提供Executor,首先就嘗試調(diào)用它的d.uniApply方法,若此時(shí)當(dāng)前CompletableFuture或參數(shù)中的另一個(gè)stage已完成,則用完成的結(jié)果直接執(zhí)行用戶指定的action并對(duì)d的結(jié)果進(jìn)行賦值,并進(jìn)一步完成d的后續(xù)清棧和postComplete(1);若此時(shí)當(dāng)前的Completable或另一個(gè)stage未完成,則不滿足執(zhí)行action的條件,將當(dāng)前Completable作為第一個(gè)source,另一個(gè)stage作為第二個(gè)source,封裝成一個(gè)OrApply并壓當(dāng)前CompletableFuture和另一個(gè)stage的棧(2),隨后立即以同步方式調(diào)用它的tryFire(1)。

異步的情況,直接封裝OrApply對(duì)象,將由線程池間接調(diào)用tryFire(3),進(jìn)一步調(diào)用orApply方法,因?yàn)槭钱惒剑词節(jié)M足了前面的條件(ab之一正常或異常完成),依舊需要進(jìn)行claim,claim失敗則不會(huì)執(zhí)行action。claim成功,執(zhí)行action出現(xiàn)異常,則用異常來(lái)完成這個(gè)action。

以上三種情況最終都會(huì)執(zhí)行action,標(biāo)注了(1)和(3)是很明確的兩種情況。

任何一個(gè)CompletableFuture完成后,都會(huì)根據(jù)mode進(jìn)行后續(xù)處理,其實(shí)盡管每個(gè)Completion都具備一個(gè)next指針,但每一個(gè)Completion的完成均不依賴于棧中的其他Completion,僅在cleanStack,壓棧,postComplete使用了該棧的結(jié)構(gòu)。現(xiàn)在來(lái)回答前面分析時(shí)發(fā)現(xiàn)的兩個(gè)問題。

1.當(dāng)前CompletableFuture在完成后,執(zhí)行postComplete,會(huì)將它自身的棧中completion出棧并執(zhí)行action,若要產(chǎn)生新的CompletableFuture,則將它的棧反向壓入自身的棧,然后重復(fù)執(zhí)行出棧-執(zhí)行的操作。反向壓棧有問題嗎?答案是沒有。因?yàn)闂V械拿恳粋€(gè)Completion在執(zhí)行上互不影響,它們的順序只影響到cleanStack和postComplete的處理順序。CompletableFuture和它的棧元素產(chǎn)生的CompletableFuture彼此間有順序要求,但對(duì)同一個(gè)CompletableFuture的棧內(nèi)的Completion元素彼此間沒有順序要求,決定他們順序的是對(duì)源CompletionFuture調(diào)用orApply,thenApply等等方法的順序,后續(xù)運(yùn)行也完全獨(dú)立。只不過(guò)在源CompletableFuture進(jìn)行postComplete時(shí),執(zhí)行的順序?qū)?huì)與原本的”先來(lái)后到“相反。

2.cleanStack到一半,p指向的Completion依舊存活,位于p以上的Completion已執(zhí)行完畢,那么不會(huì)重新開始循環(huán),p之前的死亡Completion會(huì)留在棧中。這也是為什么前面使用OrApply來(lái)解釋這個(gè)問題的原因,因?yàn)楹芸赡芫筒淮嬖谶@個(gè)問題。根據(jù)前面的源碼,僅有postComplete觸發(fā)的tryFire會(huì)使用NESTED(-1)模式,只有NESTED模式下,或者源CompletableFuture的result為null(未完成)的情況下執(zhí)行postFire才會(huì)進(jìn)入到cleanStack,否則會(huì)進(jìn)入postComplete,后者會(huì)將所有元素出棧并執(zhí)行存活元素,顯然不存在要考慮存活的問題。而只有or且為BiCompletion的情況下,才可能出現(xiàn)兩個(gè)源之一實(shí)際并未完成,這樣在非NESTED模式下調(diào)用cleanStack方法。

可見2的問題是存在的。但它對(duì)于整體的運(yùn)行結(jié)果是無(wú)影響的,后續(xù)該source執(zhí)行完畢,調(diào)用自身的postComplete時(shí),將已死亡的Completion出棧并tryFire,會(huì)發(fā)現(xiàn)諸如”dep=null"等情況,直接返回null,則postComplete方法中的f會(huì)保持指向this并繼續(xù)迭代下一個(gè)棧元素。

目前關(guān)于2中提到的cleanStack的調(diào)用只出現(xiàn)在UniCompletion成功后調(diào)用postFire時(shí)依賴模式和result運(yùn)行。其實(shí)還有一種情況,就是前面提了一次的,屬于future接口的get方法,以及類似的join方法。

前面提到,get和join方法都會(huì)在獲取不到結(jié)果是按條件輪循watingGet方法,下面來(lái)看waitingGet方法。

private Object waitingGet(boolean interruptible) {
    Signaller q = null;//信號(hào)器
    boolean queued = false;//是否入隊(duì)
    int spins = -1;//自旋次數(shù)
    Object r;//結(jié)果引用
    //循環(huán)條件是只等待result,內(nèi)部有根據(jù)擾動(dòng)決定的break
    while ((r = result) == null) {
        //自旋次數(shù)只有第一次進(jìn)來(lái)是負(fù)值,后續(xù)只能是0或其他正數(shù)。
        if (spins < 0)
        //自旋次數(shù),多處理器下初始化為16,否則為0,即不自旋。設(shè)置值后此次循環(huán)結(jié)束。
            spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                1 << 8 : 0;
        //第二次循環(huán)時(shí)才會(huì)判斷自旋次數(shù)。只要spins大于0就繼續(xù)循環(huán),直到達(dá)到0為止再執(zhí)行下面的else代碼。
        else if (spins > 0) {
            //僅當(dāng)下一個(gè)種子數(shù)不小于0時(shí),減小一次自旋次數(shù)。nextSecondarySeed是Thread類中使用@Contended注解標(biāo)識(shí)的變量,
            //這與傳說(shuō)中的偽共享有關(guān)。
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        //停止自旋后的第一輪循環(huán),result依舊是null,則對(duì)q進(jìn)行初始化,關(guān)于Signaller后續(xù)再講。
        else if (q == null)
            q = new Signaller(interruptible, 0L, 0L);
        //初始化q后的下一輪循環(huán)(停止自旋后的第二輪),queued是false,將上一輪循環(huán)初始化的q壓入棧。
        else if (!queued)
            queued = tryPushStack(q);
        //停止自旋后的若干次循環(huán)(上一步可能壓棧失敗,則下一輪自旋會(huì)再次壓棧,直到成功)后,判斷是否可擾動(dòng)。
        else if (interruptible && q.interruptControl < 0) {
            //擾動(dòng)信號(hào)匹配,將q的有關(guān)字段全部置空,順帶清一下棧,返回null。
            q.thread = null;
            //這個(gè)清棧的過(guò)程,細(xì)看上面的解釋還有有關(guān)的源碼,可能會(huì)發(fā)出一個(gè)疑問,cleanStack只能清除isLive判斷false的Completion,
            //但目前的實(shí)現(xiàn),基本上都只能在dep為null,base為null等僅當(dāng)dep執(zhí)行完成的情況發(fā)生,而dep完成的情況是當(dāng)前CompletableFuture的
            //result不是null,而方法運(yùn)行到此,很明顯result必然是null,那么還有必要清棧嗎?
            //答案是必要的,首先將來(lái)也許能出現(xiàn)存活或死亡狀態(tài)與source的result無(wú)關(guān)的Completion,那么此處清一下棧也是幫助后面的工作。
            //其次,剛才壓入棧的q在thread指向null時(shí)即已死亡,它也必須要進(jìn)行清除。
            cleanStack();
            return null;
        }
        else if (q.thread != null && result == null) {
            //q關(guān)聯(lián)的線程存在,即q存活,且依舊沒有執(zhí)行完畢,使用ForkJoinPool的阻塞管理機(jī)制,q的策略進(jìn)行阻塞。
            try {
                ForkJoinPool.managedBlock(q);
            } catch (InterruptedException ie) {
                //阻塞是可以擾動(dòng)的,此時(shí)會(huì)將q的擾動(dòng)控制信號(hào)設(shè)置為-1,則下一次循環(huán)時(shí)將可能進(jìn)入上一個(gè)else if。
                q.interruptControl = -1;
            }
        }
    }
    //前面的循環(huán)沒有break,能執(zhí)行到此,只有result獲得非null值的情況。
    if (q != null) {
        //若q不是null,說(shuō)明沒有在自旋階段獲取到result,需要對(duì)它進(jìn)行禁用。
        q.thread = null;
        if (q.interruptControl < 0) {
            if (interruptible)
                //可擾動(dòng)且有擾動(dòng)信號(hào),則說(shuō)明擾動(dòng)后未能進(jìn)入上面帶有cleanStack的那個(gè)else if,
                //可能是恰好在這次循環(huán)開始時(shí)獲取到了非空result,從而退出循環(huán),也可能是參數(shù)interruptible為假,
                //在外部擾動(dòng)了當(dāng)前線程后,依舊等到了result。
                //只要發(fā)生了擾動(dòng),就將結(jié)果置null,外面調(diào)用者如果是join,可以報(bào)出擾動(dòng)。
                r = null; // report interruption
            else
                //如果不可擾動(dòng),則中斷當(dāng)前線程(創(chuàng)建q的線程)。
                Thread.currentThread().interrupt();
        }
    }
    //當(dāng)前future已經(jīng)有結(jié)果,進(jìn)行postComplete邏輯并返回r。
    postComplete();
    return r;
}

根據(jù)該方法的注釋,waitingGet方法只會(huì)有兩個(gè)結(jié)果,null(可擾動(dòng)并且擾動(dòng)了)和原始的result。而get方法可擾動(dòng),也即可返回null,join方法不可擾動(dòng),只能等待結(jié)束或拋出異常。

waitingGet方法中出現(xiàn)了第三個(gè)也是最后一個(gè)Completion的直接子類Signaller,前面沒有對(duì)它進(jìn)行介紹,不過(guò)它也只使用在此處,因此可以一并介紹。

static final class Signaller extends Completion
    implements ForkJoinPool.ManagedBlocker {
    long nanos;                    // 計(jì)時(shí)的情況下,要等待的時(shí)間。
    final long deadline;           // 計(jì)時(shí)的情況下指定不為0的值
    volatile int interruptControl; // 大于0代表可擾動(dòng),小于0代表已擾動(dòng)。
    volatile Thread thread;//持有的線程
 
    Signaller(boolean interruptible, long nanos, long deadline) {
        this.thread = Thread.currentThread();
        this.interruptControl = interruptible ? 1 : 0;//不可擾動(dòng),賦0
        this.nanos = nanos;
        this.deadline = deadline;
    }
    final CompletableFuture tryFire(int ignore) {//ignore無(wú)用
        Thread w; //Signaller自持有創(chuàng)建者線程,tryFire只是單純喚醒創(chuàng)建它的線程。
        if ((w = thread) != null) {
            thread = null;//釋放引用
            LockSupport.unpark(w);//解除停頓。
        }
        //返回null,當(dāng)action已執(zhí)行并進(jìn)行postComlete調(diào)用時(shí),f依舊指向當(dāng)前CompletableFuture引用并解除停頓。
        return null;
    }
    public boolean isReleasable() {
        //線程是空,允許釋放。這可能是某一次調(diào)用本方法或tryFire方法造成。
        if (thread == null)
            return true;
        if (Thread.interrupted()) {
            //如果調(diào)用isReleasable方法的線程被擾動(dòng)了,則置擾動(dòng)信號(hào)為-1
            int i = interruptControl;
            interruptControl = -1;
            if (i > 0)
            //原擾動(dòng)信號(hào)是”可擾動(dòng)“,則是本次調(diào)用置為”已擾動(dòng)“,返回true。
                return true;
        }
        //未定時(shí)(deadline是0)的情況只能在上面釋放,定時(shí)的情況,本次計(jì)算nanos(deadline-System.nanoTime())
        //或上次計(jì)算的nanos不大于0時(shí),說(shuō)明可以釋放。
        if (deadline != 0L &&
            (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
            //只要可釋放,將創(chuàng)建者線程的引用釋放。下次調(diào)用直接返回true,線程運(yùn)行結(jié)束銷毀后可被gc回收。
            thread = null;
            return true;
        }
        //仍持有創(chuàng)建者線程,調(diào)用此方法的線程未擾動(dòng)或當(dāng)前擾動(dòng)不是第一次,未定時(shí)或不滿足定時(shí)設(shè)置的一律返回false。
        return false;
    }
    public boolean block() {
        //block方法
        if (isReleasable())
            //判斷可釋放,直接return true。
            return true;
        //判斷deadline是0,說(shuō)明不計(jì)時(shí),默認(rèn)park。
        else if (deadline == 0L)
            LockSupport.park(this);
        else if (nanos > 0L)
            //計(jì)時(shí)情況,park指定nanos。
            LockSupport.parkNanos(this, nanos);
        //睡醒后再次返回isReleasable的結(jié)果。
        return isReleasable();
    }
    //創(chuàng)建者線程引用被釋放即代表死亡。
    final boolean isLive() { return thread != null; }
}

Signaller是一個(gè)Completion的直接子類,同時(shí)實(shí)現(xiàn)了ForkJoinPool的內(nèi)部接口ManagedBlocker,這使得它可以在當(dāng)ForkJoinPool出現(xiàn)大量線程阻塞堆積時(shí)避免饑餓。
Signaller的作用是持有和釋放一個(gè)線程,并提供相應(yīng)的阻塞策略。
前面提到的waitingGet方法創(chuàng)建了一個(gè)Signaller(interruptible, 0L, 0L),類似的,可以看到timedGet方法使用Signaller(true, nanos, d == 0L ? 1L : d)來(lái)進(jìn)行阻塞的管理,管理的方法依賴ForkJoinPool內(nèi)部的

ForkJoinPool.managedBlock(q)來(lái)實(shí)現(xiàn),而這用到了被Signaller實(shí)現(xiàn)的ForkJoinPool.ManagedBlocker,managedBlock方法源碼如下。

//ForkJoinPool的managedBlock方法。
public static void managedBlock(ManagedBlocker blocker)
    throws InterruptedException {
    ForkJoinPool p;
    ForkJoinWorkerThread wt;
    Thread t = Thread.currentThread();//調(diào)用此方法的線程,即前面的Signaller的創(chuàng)建者線程。
    if ((t instanceof ForkJoinWorkerThread) &&
        (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
        //調(diào)用managedBlock方法的線程是ForkJoinWorkerThread,則它可運(yùn)行在ForkJoinPool中。此處要求內(nèi)部持有pool的引用。
        WorkQueue w = wt.workQueue;
        //循環(huán),只要判斷blocker(即Signaller)不可釋放。
        while (!blocker.isReleasable()) {
            //嘗試用ForkJoinPool對(duì)當(dāng)前線程的工作隊(duì)列進(jìn)行補(bǔ)償。
            //tryCompensate方法會(huì)嘗試減少活躍數(shù)并可能創(chuàng)建或釋放一個(gè)準(zhǔn)備阻塞的worker線程,
            //它會(huì)在發(fā)生競(jìng)態(tài),臟數(shù)據(jù),松弛或池終止時(shí)返回false。
            //關(guān)于ForkJoinPool的詳情多帶帶準(zhǔn)備文章。
            if (p.tryCompensate(w)) {
                 
                try {
                    //補(bǔ)償成功,不停地對(duì)線程池嘗試先isReleasable再block,任何一個(gè)方法返回true則終止循環(huán)。
                    do {} while (!blocker.isReleasable() &&
                                 !blocker.block());
                } finally {
                    //出現(xiàn)任何異常,或循環(huán)終止時(shí),控制信號(hào)加上一個(gè)活躍數(shù)單元,因?yàn)榍懊嫱ㄟ^(guò)補(bǔ)償才會(huì)進(jìn)入循環(huán),已減少了一個(gè)單元。
                    U.getAndAddLong(p, CTL, AC_UNIT);
                }
                break;
            }
        }
    }
    else {
        //當(dāng)前線程不是ForkJoinWorkerThread或不持有ForkJoinPool的引用。連續(xù)先嘗試isReleasable再嘗試block,直到有一者返回true為止。
        do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
}

關(guān)于ForkJoinPool本文不做額外介紹,只列舉這一個(gè)方法,到此為止,對(duì)于CompletableFuture的主要接口(繼承自CompletionStage)和實(shí)現(xiàn)已經(jīng)描述完畢(其實(shí)只過(guò)了一個(gè)特殊案例的接口,但是前面提到過(guò),其他接口的邏輯和實(shí)現(xiàn)方式類似,無(wú)非就是run,active,apply的更換,或either,both,then,when等,有上面的基礎(chǔ),再憑借規(guī)則推測(cè)語(yǔ)義,源碼并不難理解。

CompletableFuture還有一些獨(dú)立聲明的公有方法,源碼也有些非常值得借鑒的地方,如allOf,anyOf兩個(gè)方法。

//anyOf方法,返回一個(gè)CompletableFuture對(duì)象,任何一個(gè)cfs列表中的成員進(jìn)入完成態(tài)(正常完成或異常),則它也一并完成,結(jié)果一致。
public static CompletableFuture anyOf(CompletableFuture... cfs) {
    //直接調(diào)用orTree
    return orTree(cfs, 0, cfs.length - 1);
}
//allOf方法,當(dāng)所有cfs列表中的成員進(jìn)入完成態(tài)后完成(使用空結(jié)果),或有任何一個(gè)列表成員異常完成時(shí)完成(使用同一個(gè)異常)。
public static CompletableFuture allOf(CompletableFuture... cfs) {
    //直接調(diào)用andTree
    return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture andTree(CompletableFuture[] cfs,
                                       int lo, int hi) {
    //聲明一個(gè)后續(xù)返回的dep
    CompletableFuture d = new CompletableFuture();
    if (lo > hi) //驗(yàn)參
        d.result = NIL;
    else {
        CompletableFuture a, b;
        //折半驗(yàn)證參數(shù)并歸并。每相鄰的兩個(gè)成員會(huì)在一個(gè)遞歸中生成另一個(gè)"d",
        //總量奇數(shù)的最后一個(gè)多帶帶表示這個(gè)d。
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        //調(diào)用d.biRelay的中繼方法嘗試完成。
        if (!d.biRelay(a, b)) {
            //不滿足完成條件,生成一個(gè)中繼并壓棧,再次嘗試同步完成。若不滿足條件,ab任何一個(gè)完成后都會(huì)再間接調(diào)用它的tryFire。
            BiRelay c = new BiRelay<>(d, a, b);
            a.bipush(b, c);//除非ab均完成,否則bipush要進(jìn)ab兩者的棧。
            c.tryFire(SYNC);
        }
    }
    return d;
}
//biRelay方法,有前面的基礎(chǔ),很簡(jiǎn)單,只要ab之一任何一個(gè)未完成則返回false,都完成且dep未完成則進(jìn)入相應(yīng)的正常異常完成策略,
//不論dep是否已完成,只要ab均已完成,則返回true
boolean biRelay(CompletableFuture a, CompletableFuture b) {
    Object r, s; Throwable x;
    if (a == null || (r = a.result) == null ||
        b == null || (s = b.result) == null)
        return false;
    //biRelay是嘗試根據(jù)兩個(gè)CompletableFuture完成dep,因?yàn)槿齻€(gè)complete*方法均已做到原子性,也沒有action要執(zhí)行,因此它不需要claim。
    if (result == null) {
        if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
            completeThrowable(x, r);
        else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
            completeThrowable(x, s);
        else
            //正常情況,用null完成。
            completeNull();
    }
    return true;
}
 
 
//壓入棧的BiRelay
static final class BiRelay extends BiCompletion { // for And
    BiRelay(CompletableFuture dep,
            CompletableFuture src,
            CompletableFuture snd) {
        super(null, dep, src, snd);
    }
    final CompletableFuture tryFire(int mode) {
        CompletableFuture d;
        CompletableFuture a;
        CompletableFuture b;
        if ((d = dep) == null || !d.biRelay(a = src, b = snd))
            //已經(jīng)完成過(guò),或者未完成,本次也不能完成,返回一個(gè)null
            return null;
        //BiRelay通過(guò)BiCompletion間接繼承了UniCompletion,因此dep取null代表死亡。
        //這樣也能規(guī)避錯(cuò)誤的tryFire,如當(dāng)它已被完成,持有的dep引用置null,當(dāng)d進(jìn)行postFire的postComplete時(shí)會(huì)保持f=this并持續(xù)出棧
        //dep未完成時(shí)清棧也能有效移除已完成的任務(wù)。
        src = null; snd = null; dep = null;
        return d.postFire(a, b, mode);
    }
}
//orTree類似上面的andTree,有一個(gè)完成或異常,就用它的結(jié)果或異常作為返回的CompletableFuture的結(jié)果或異常。
static CompletableFuture orTree(CompletableFuture[] cfs,
                                        int lo, int hi) {
    CompletableFuture d = new CompletableFuture();
    if (lo <= hi) {
        CompletableFuture a, b;
        int mid = (lo + hi) >>> 1;
        //同上
        if ((a = (lo == mid ? cfs[lo] :
                  orTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        //同上,下面簡(jiǎn)述orRelay和OrRelay
        if (!d.orRelay(a, b)) {
            OrRelay c = new OrRelay<>(d, a, b);
            //除非ab任何一個(gè)已完成,否則orpush要進(jìn)棧,且只進(jìn)一個(gè)棧。
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}
//很明顯,orRelay就是兩個(gè)CompletableFuture的或關(guān)系中繼者。
final boolean o           
               
                                           
                       
                 
            
                     
             
               

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/77760.html

相關(guān)文章

  • ForkJoin框架ForkJoinTask

    摘要:前言在前面的文章和響應(yīng)式編程中提到了和后者毫無(wú)疑問是一個(gè)線程池前者則是一個(gè)類似經(jīng)典定義的概念官方有一個(gè)非常無(wú)語(yǔ)的解釋就是運(yùn)行在的一個(gè)任務(wù)抽象就是運(yùn)行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應(yīng)式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無(wú)疑問是一個(gè)線程...

    crossoverJie 評(píng)論0 收藏0
  • Java SDK 并發(fā)包全面總結(jié)

    摘要:一和并發(fā)包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于的使用。寫鎖與讀鎖之間互斥,一個(gè)線程在寫時(shí),不允許讀操作。的注意事項(xiàng)不支持重入,即不可反復(fù)獲取同一把鎖。沒有返回值,也就是說(shuō)無(wú)法獲取執(zhí)行結(jié)果。 一、Lock 和 Condition Java 并發(fā)包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當(dāng)于 synchron...

    luckyyulin 評(píng)論0 收藏0
  • ForkJoin框架ForkJoinPool

    摘要:前言在前面的三篇文章中先后介紹了框架的任務(wù)組件體系體系源碼并簡(jiǎn)單介紹了目前的并行流應(yīng)用場(chǎng)景框架本質(zhì)上是對(duì)的擴(kuò)展它依舊支持經(jīng)典的使用方式即任務(wù)池的配合向池中提交任務(wù)并異步地等待結(jié)果毫無(wú)疑問前面的文章已經(jīng)解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務(wù)組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡(jiǎn)單介紹...

    mayaohua 評(píng)論0 收藏0
  • ForkJoin框架CountedCompleter,工作線程及并行流

    摘要:前言在前面的文章框架之中梳理了框架的簡(jiǎn)要運(yùn)行格架和異常處理流程顯然要理解框架的調(diào)度包含工作竊取等思想需要去中了解而對(duì)于的拓展和使用則需要了解它的一些子類前文中偶爾會(huì)提到的一個(gè)子類直譯為計(jì)數(shù)的完成器前文也說(shuō)過(guò)的并行流其實(shí)就是基于了框架實(shí)現(xiàn)因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡(jiǎn)要運(yùn)行格架和異常處理流程,顯然要理解ForkJoi...

    msup 評(píng)論0 收藏0
  • Java8的CompletableFuture進(jìn)階

    摘要:方法接收的是的實(shí)例,但是它沒有返回值方法是函數(shù)式接口,無(wú)參數(shù),會(huì)返回一個(gè)結(jié)果這兩個(gè)方法是的升級(jí),表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡(jiǎn)介 作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時(shí)在Java 9 也有對(duì)Completab...

    SunZhaopeng 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<