国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Java多線程進階(四三)—— J.U.C之executors框架:Fork/Join框架(2)實現

FingerLiu / 1171人閱讀

摘要:并不會為每個任務都創建工作線程,而是根據實際情況構造線程池時的參數確定是喚醒已有空閑工作線程,還是新建工作線程。

本文首發于一世流云的專欄:https://segmentfault.com/blog...
一、引言

前一章——Fork/Join框架(1) 原理,我們從整體上對Fork/Join框架作了介紹。

回顧一下,Fork/Join框架的核心實現類是ForkJoinPool線程池,其它核心組件包括:ForkJoinTask(任務)、ForkJoinWorkerThread(工作線程)、WorkQueue(任務隊列)。

這一章,我們將深入F/J框架的實現細節,看看ForkJoinPool線程池究竟有何特殊之處,F/J框架的整個任務調度流程又是怎樣的。

二、任務調度流程

在開始之前,先來看下下面這張圖:

上圖包含了F/J框架的整個任務調度流程,這里先簡要介紹下,以便讀者在有個印象,后續的源碼分析將完全按照這張圖進行。

F/J框架調度任務的流程一共可以分為四大部分。

任務提交

任務提交是整個調度流程的第一步,F/J框架所調度的任務來源有兩種:

①外部提交任務

所謂外部提交任務,是指通過ForkJoinPoolexecute/submit/invoke方法提交的任務,或者非工作線程(ForkJoinWorkerThread)直接調用ForkJoinTaskfork/invoke方法提交的任務:

外部提交的任務的特點就是調用線程是非工作線程。這個過程涉及以下方法:

ForkJoinPool.submit

ForkJoinPool.invoke

ForkJoinPool.execute

ForkJoinTask.fork

ForkJoinTask.invoke

ForkJoinPool.externalPush

ForkJoinPool.externalSubmit

②工作線程fork任務

所謂工作線程fork任務,是指由ForkJoinPool所維護的工作線程(ForkJoinWorkerThread)從自身任務隊列中獲取任務(或從其它任務隊列竊?。?,然后執行任務。

工作線程fork任務的特點就是調用線程是工作線程。這個過程涉及以下方法:

ForkJoinTask.doExec

WorkQueue.push

創建工作線程

任務提交完成后,ForkJoinPool會根據情況創建或喚醒工作線程,以便執行任務。

ForkJoinPool并不會為每個任務都創建工作線程,而是根據實際情況(構造線程池時的參數)確定是喚醒已有空閑工作線程,還是新建工作線程。這個過程還是涉及任務隊列的綁定、工作線程的注銷等過程:

ForkJoinPool.signalWork

ForkJoinPool.tryAddWorker

ForkJoinPool.createWorker

ForkJoinWorkerThread.registerWorker

ForkJoinPool.deregisterWorker

任務執行

任務入隊后,由工作線程開始執行,這個過程涉及任務竊取、工作線程等待等過程:

ForkJoinWorkerThread.run

ForkJoinPool.runWorker

ForkJoinPool.scan

ForkJoinPool.runTask

ForkJoinTask.doExec

ForkJoinPool.execLocalTasks

ForkJoinPool.awaitWork

任務結果獲取

任務結果一般通過ForkJoinTaskjoin方法獲得,其主要流程如下圖:

任務結果獲取的核心涉及兩點:

互助竊取:ForkJoinPool.helpStealer

算力補償:ForkJoinPool.tryCompensate

三、源碼分析

通過第二部分,大致了解了F/J框架調度任務的流程,我們來看下源碼實現。

任務提交

①外部提交任務

我們通過ForkJoinPoolsubmit(ForkJoinTask task)方法來看下這個過程(其它提交任務的方法內部調用幾乎一樣,不再贅述):

public  ForkJoinTask submit(ForkJoinTask task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

ForkJoinPool.submit內部調用了externalPush方法:

final void externalPush(ForkJoinTask task) {
    WorkQueue[] ws;
    WorkQueue q;
    int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;

    // m & r & SQMASK必為偶數,所以通過externalPush方法提交的任務都添加到了偶數索引的任務隊列中(沒有綁定的工作線程)
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask[] a;
        int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)                 // 隊列里只有一個任務
                signalWork(ws, q);      // 創建或激活一個工作線程
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }

    // 未命中任務隊列時(WorkQueue == null 或 WorkQueue[i] == null),會進入該方法
    externalSubmit(task);
}

當我們首次創建了ForkJoinPool時,任務隊列數組并沒有初始化,只有當首次提交任務時,才會初始化。

externalPush方法包含兩部分:

根據線程隨機變量、任務隊列數組信息,計算命中槽(即本次提交的任務應該添加到任務隊列數組中的哪個隊列),如果命中且隊列中任務數<1,則創建或激活一個工作線程;

否則,調用externalSubmit初始化隊列,并入隊。

/**
 * 完整版本的externalPush.
 * 處理線程池提交任務時未命中隊列的情況和異常情況.
 */
private void externalSubmit(ForkJoinTask task) {
    int r;                                    // 線程相關的隨機數
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }

    for (; ; ) {
        WorkQueue[] ws;
        WorkQueue q;
        int rs, m, k;
        boolean move = false;

        // CASE1: 線程池已關閉
        if ((rs = runState) < 0) {
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        // CASE2: 初始化線程池
        else if ((rs & STARTED) == 0 ||     // initialize
            ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                        new AtomicLong());

                    // 初始化工作隊列數組, 數組大小必須為2的冪次
                    int p = config & SMASK;
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1;
                    n |= n >>> 2;
                    n |= n >>> 4;
                    n |= n >>> 8;
                    n |= n >>> 16;
                    n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;   // 線程池狀態轉化為STARTED
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        // CASE3: 入隊任務
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask[] a = q.array;
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);
                        U.putOrderedInt(q, QTOP, s + 1);
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) {
                    signalWork(ws, q);
                    return;
                }
            }
            move = true;                   // move on failure
        }
        // CASE4: 創建一個任務隊列
        else if (((rs = runState) & RSLOCK) == 0) {
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;        // k為任務隊列在隊列數組中的索引: k == r & m & SQMASK, 在CASE3的IF判斷中賦值
            q.scanState = INACTIVE;             // 任務隊列狀態為INACTIVE
            rs = lockRunState();
            if (rs > 0 && (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q;                 // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        } else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

externalSubmit方法的邏輯很清晰,一共分為4種情況:

CASE1:線程池已經關閉,則執行終止操作,并拒絕該任務的提交;

CASE2:線程池未初始化,則進行初始化,主要就是初始化任務隊列數組;

CASE3:命中了任務隊列,則將任務入隊,并嘗試創建/喚醒一個工作線程(Worker);

CASE4:未命中任務隊列,則在偶數索引處創建一個任務隊列

②工作線程fork任務

工作線程fork的任務其實就是子任務,ForkJoinTask.fork方法完成。

看下ForkJoinTask.fork方法,當調用線程為工作線程時,直接添加到其自身隊列中:

public final ForkJoinTask fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)   // 如果調用線程為【工作線程】
        ((ForkJoinWorkerThread) t).workQueue.push(this);           // 直接添加到線程的自身隊列中
    else
        ForkJoinPool.common.externalPush(this);                    // 外部(其它線程)提交的任務
    return this;
}

WorkQueue.push方法,任務存入自身隊列的棧頂(top):

final void push(ForkJoinTask task) {
    ForkJoinTask[] a;
    ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);       // 任務存入棧頂(top+1)
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);   // 喚醒或創建一個工作線程
        } else if (n >= m)
            growArray();            // 擴容
    }
}
如果當前 WorkQueue 為新建的等待隊列(top - base <= 1),則調用signalWork方法為當前 WorkQueue 新建或喚醒一個工作線程;
如果 WorkQueue 中的任務數組容量過小,則調用growArray方法對其進行兩倍擴容,
創建工作線程

從流程圖可以看出,任務提交后,會調用signalWork方法創建或喚醒一個工作線程,該方法的核心其實就兩個分支:

工作線程數不足:創建一個工作線程;

工作線程數足夠:喚醒一個空閑(阻塞)的工作線程。

/**
 * 嘗試創建或喚醒一個工作線程.
 *
 * @param ws 任務隊列數組
 * @param q  當前操作的任務隊列WorkQueue
 */
final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c;
    int sp, i;
    WorkQueue v;
    Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        // CASE1: 工作線程數不足
        if ((sp = (int) c) == 0) {
            if ((c & ADD_WORKER) != 0L)
                tryAddWorker(c);                    // 增加工作線程
            break;
        }

        // CASE2: 存在空閑工作線程,則喚醒
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

先來看創建工作線程的方法tryAddWorker,其實就是設置下字段值(活躍/總工作線程池數),然后調用createWorker真正創建一個工作線程:

private void tryAddWorker(long c) {
    boolean add = false;
    do {

        // 設置活躍工作線程數、總工作線程池數
        long nc = ((AC_MASK & (c + AC_UNIT)) |
            (TC_MASK & (c + TC_UNIT)));
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            if (stop != 0)
                break;

            // 創建工作線程
            if (add) {
                createWorker();
                break;
            }
        }
    } while (((c = ctl) & ADD_WORKER) != 0L && (int) c == 0);
}
?
private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        
        // 使用線程池工廠創建線程
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();     // 啟動線程
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    
    // 創建出現異常,則注銷該工作線程
    deregisterWorker(wt, ex);
    return false;
}

如果創建過程中出現異常,則調用deregisterWorker注銷線程:

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    // 1.移除workQueue
    if (wt != null && (w = wt.workQueue) != null) {     // 獲取ForkJoinWorkerThread的等待隊列
        WorkQueue[] ws;                           
        int idx = w.config & SMASK;                     // 計算workQueue索引
        int rs = lockRunState();                        // 獲取runState鎖和當前池運行狀態
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;                             // 移除workQueue
        unlockRunState(rs, rs & ~RSLOCK);   // 解除runState鎖
    }
    // 2.減少CTL數
    long c;                                       // decrement counts
    do {
    } while (!U.compareAndSwapLong
        (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
            (TC_MASK & (c - TC_UNIT)) |
            (SP_MASK & c))));
    // 3.處理被移除workQueue內部相關參數
    if (w != null) {
        w.qlock = -1;                             // ensure set
        w.transferStealCount(this);
        w.cancelAll();                            // cancel remaining tasks
    }
    // 4.如果線程未終止,替換被移除的workQueue并喚醒內部線程
    for (; ; ) {                                    // possibly replace
        WorkQueue[] ws;
        int m, sp;
        // 嘗試終止線程池
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              // already terminating
            break;
        // 喚醒被替換的線程,依賴于下一步
        if ((sp = (int) (c = ctl)) != 0) {         // wake up replacement
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        // 創建工作線程替換
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            tryAddWorker(c);                      // create replacement
            break;
        } else                                      // don"t need replacement
            break;
    }
    // 5.處理異常
    if (ex == null)                               // help clean on way out
        ForkJoinTask.helpExpungeStaleExceptions();
    else                                          // rethrow
        ForkJoinTask.rethrow(ex);
}
deregisterWorker方法用于工作線程運行完畢之后終止線程或處理工作線程異常,主要就是清除已關閉的工作線程或回滾創建線程之前的操作,并把傳入的異常拋給 ForkJoinTask 來處理。

工作線程在構造的過程中,會保存線程池信息和與自己綁定的任務隊列信息。它通過ForkJoinPool.registerWorker方法將自己注冊到線程池中:

protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);

}
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    wt.setDaemon(true);                           // configure thread
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);

    // 創建一個工作隊列, 并于該工作線程綁定
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0;                                    // 記錄隊列在任務隊列數組中的索引, 始終為奇數
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws;
        int n;
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            i = ((s << 1) | 1) & m;               // 經計算后, i為奇數
            if (ws[i] != null) {                  // 槽沖突, 即WorkQueue[i]位置已經有了任務隊列

                // 重新計算索引i
                int probes = 0;                   // step by approx half n
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                while (ws[i = (i + step) & m] != null) {
                    if (++probes >= n) {
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        probes = 0;
                    }
                }
            }

            // 設置隊列狀態為SCANNING
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fence
            ws[i] = w;
        }
    } finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

前文講過,工作線程(Worker)自身的任務隊列,其數組下標始終是奇數,registerWorker方法的主要作用就是在任務隊列數組WorkQueue[]找到一個空的奇數位,然后在該位置創建WorkQueue

至此,線程池的任務提交工作和工作線程創建工作就全部完成了,接下來開始工作線程的執行。

任務執行

ForkJoinWorkerThread啟動后,會執行它的run方法,該方法內部調用了ForkJoinPool.runWorker方法來執行任務:

public void run() {
    if (workQueue.array == null) {  // only run once
        Throwable exception = null;
        try {
            onStart();              // 鉤子方法
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

runWorker方法的核心流程如下:

scan:嘗試獲取一個任務;

runTask:執行取得的任務;

awaitWork:沒有任務則阻塞。

final void runWorker(WorkQueue w) {
    w.growArray();                   // 初始化任務隊列
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask t; ; ) {

        // CASE1: 嘗試獲取一個任務
        if ((t = scan(w, r)) != null)
            w.runTask(t);                       // 獲取成功, 執行任務
        // CASE2: 獲取失敗, 阻塞等待任務入隊
        else if (!awaitWork(w, r))              // 等待失敗, 跳出該方法后, 工作線程會被注銷
            break;
        r ^= r << 13;
        r ^= r >>> 17;
        r ^= r << 5; // xorshift
    }
}
注意:如果awaitWork返回false,等不到任務,則跳出runWorker的循環,回到run中執行finally,最后調用deregisterWorker注銷工作線程。

任務竊取——scan

private ForkJoinTask scan(WorkQueue w, int r) {
    WorkQueue[] ws;
    int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        int ss = w.scanState;                     // initially non-negative
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
            WorkQueue q;
            ForkJoinTask[] a;
            ForkJoinTask t;
            int b, n;
            long c;

            // 根據隨機數r定位一個任務隊列
            if ((q = ws[k]) != null) {      // 獲取workQueue
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask)
                        U.getObjectVolatile(a, i))) != null &&  // 取base位置任務
                        q.base == b) {

                        // 成功獲取到任務
                        if (ss >= 0) {
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                q.base = b + 1;         // 更新base位
                                if (n < -1)
                                    signalWork(ws, q);  // 創建或喚醒工作線程來運行任務
                                return t;
                            }
                        } else if (oldSum == 0 &&   // try to activate
                            w.scanState < 0)
                            tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);  // 喚醒棧頂工作線程
                    }

                    // base位置任務為空或base位置偏移,隨機移位重新掃描
                    if (ss < 0)                   // refresh
                        ss = w.scanState;
                    r ^= r << 1;
                    r ^= r >>> 3;
                    r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;
            }
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                // 運行到這里說明已經掃描了全部的 workQueues,但并未掃描到任務
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;

                    // 對當前WorkQueue進行滅活操作
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                        (UC_MASK & ((c = ctl) - AC_UNIT)));
                    w.stackPred = (int) c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}

掃描并嘗試偷取一個任務。隨機選擇一個WorkQueue,獲取base位的 ForkJoinTask,成功取到后更新base位并返回任務;如果取到的 WorkQueue 中任務數大于1,則調用signalWork創建或喚醒其他工作線程。

阻塞等待——awaitWork

如果scan方法未掃描到任務,會調用awaitWork等待獲取任務:

private boolean awaitWork(WorkQueue w, int r) {
    if (w == null || w.qlock < 0)                  // w is terminating
        return false;
    for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
        if ((ss = w.scanState) >= 0)               // 正在掃描,跳出循環
            break;
        else if (spins > 0) {
            r ^= r << 6;
            r ^= r >>> 21;
            r ^= r << 7;
            if (r >= 0 && --spins == 0) {           // randomize spins
                WorkQueue v;
                WorkQueue[] ws;
                int s, j;
                AtomicLong sc;
                if (pred != 0 && (ws = workQueues) != null &&
                    (j = pred & SMASK) < ws.length &&
                    (v = ws[j]) != null &&          // see if pred parking
                    (v.parker == null || v.scanState >= 0))
                    spins = SPINS;                  // continue spinning
            }
        } else if (w.qlock < 0)                     // 當前workQueue已經終止,返回false recheck after spins
            return false;
        else if (!Thread.interrupted()) {           // 判斷線程是否被中斷,并清除中斷狀態
            long c, prevctl, parkTime, deadline;
            int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);      // 活躍線程數
            if ((ac <= 0 && tryTerminate(false, false)) ||      // 無active線程,嘗試終止
                (runState & STOP) != 0)             // pool terminating
                return false;
            if (ac <= 0 && ss == (int) c) {         // is last waiter
                // 計算活躍線程數(高32位)并更新為下一個棧頂的scanState(低32位)
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                int t = (short) (c >>> TC_SHIFT);   // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過量
                    return false;                   // else use timed wait
                // 計算空閑超時時間
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
            } else
                prevctl = parkTime = deadline = 0L;
            Thread wt = Thread.currentThread();
            U.putObject(wt, PARKBLOCKER, this);     // emulate LockSupport
            w.parker = wt;                              // 設置parker,準備阻塞
            if (w.scanState < 0 && ctl == c)            // recheck before park
                U.park(false, parkTime);             // 阻塞指定的時間

            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            if (w.scanState >= 0)                       // 正在掃描,說明等到任務,跳出循環
                break;
            if (parkTime != 0L && ctl == c &&
                deadline - System.nanoTime() <= 0L &&
                U.compareAndSwapLong(this, CTL, c, prevctl))    // 未等到任務,更新ctl,返回false
                return false;                                      // shrink pool
        }
    }
    return true;
}

任務執行——runTask

竊取到任務后,調用WorkQueue.runTask方法執行任務:

final void runTask(ForkJoinTask task) {
    if (task != null) {
        scanState &= ~SCANNING;             // mark as busy
        (currentSteal = task).doExec();     // 更新currentSteal并執行任務
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();                   // 依次執行本地任務
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)                  // collect on overflow
            transferStealCount(pool);       // 增加偷取任務數
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();     // 執行鉤子函數
    }
}

1.首先調用FutureTask.deExec()執行任務,其內部會調用FutureTask.exec()方法,該方法為抽象方法,由子類實現。

子類實現該方法時,一般會進行fork,導致生成子任務,并最終添加到調用線程自身地任務隊列中:

final int doExec() {
    int s;
    boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();     // exec為抽象方法, 由子類實現
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

2.除了執行竊取到的任務,工作線程還會執行自己隊列中的任務,即WorkQueue.execLocalTasks方法:

final void execLocalTasks() {
    int b = base, m, s;
    ForkJoinTask[] a = array;
    if (b - (s = top - 1) <= 0 && a != null &&
        (m = a.length - 1) >= 0) {
        if ((config & FIFO_QUEUE) == 0) {   // LIFO, 從top -> base 遍歷執行任務
            for (ForkJoinTask t; ; ) {
                if ((t = (ForkJoinTask) U.getAndSetObject
                    (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                    break;
                U.putOrderedInt(this, QTOP, s);
                t.doExec();
                if (base - (s = top - 1) > 0)
                    break;
            }
        } else  // FIFO,  從base -> top 遍歷執行任務
            pollAndExecAll();
    }

}
構建線程池時的asyncMode參數,決定了工作線程執行自身隊列中的任務的方式。如果 asyncMode == true,則以FIFO的方式執行任務;否則,以LIFO的方式執行任務。
任務結果獲取

ForkJoinTask.join()可以用來獲取任務的執行結果。join方法的執行邏輯如下:

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

可以看到,內部先調用doJoin方法:

private int doJoin() {
    int s;
    Thread t;
    ForkJoinWorkerThread wt;
    ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
}

?
doJoin方法會判斷調用線程是否是工作線程:

1.如果是非工作線程調用的join,則最終調用externalAwaitDone()阻塞等待任務的完成。

2.如果是工作線程調用的join,則存在以下情況:

如果需要join的任務已經完成,直接返回運行結果;

如果需要join的任務剛剛好是當前線程所擁有的隊列的top位置,則立即執行它。

如果該任務不在top位置,則調用awaitJoin方法等待

關鍵看下ForkJoinPool.awaitJoin等待過程中發生了什么:

final int awaitJoin(WorkQueue w, ForkJoinTask task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask prevJoin = w.currentJoin;   // 獲取給定Worker的join任務
        U.putOrderedObject(w, QCURRENTJOIN, task);  // 把currentJoin替換為給定任務
        
        // 判斷是否為CountedCompleter類型的任務
        CountedCompleter cc = (task instanceof CountedCompleter) ?
            (CountedCompleter) task : null;
        for (; ; ) {
            if ((s = task.status) < 0)              // 已經完成|取消|異常 跳出循環
                break;

            if (cc != null)                         // CountedCompleter任務由helpComplete來完成join
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))  //嘗試執行
                helpStealer(w, task);               // 隊列為空或執行失敗,任務可能被偷,幫助偷取者執行該任務

            if ((s = task.status) < 0)              // 已經完成|取消|異常,跳出循環
                break;
            
            // 計算任務等待時間
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;

            if (tryCompensate(w)) {                         // 執行補償操作
                task.internalWait(ms);                      // 補償執行成功,任務等待指定時間
                U.getAndAddLong(this, CTL, AC_UNIT);     // 更新活躍線程數
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);      // 循環結束,替換為原來的join任務
    }
    return s;
}

ForkJoinPool.awaitJoin方法中有三個重要方法:

tryRemoveAndExec

helpStealer

tryCompensate

這里說下這三個方法的主要作用,不貼代碼了:

tryRemoveAndExec:

當工作線程正在等待join的任務時,它會從top位開始自旋向下查找該任務:

如果找到則移除并執行它;

如果找不到,說明說明任務可能被偷,則調用helpStealer方法反過來幫助偷取者執行它自己的任務。

helpStealer:

先定位的偷取者的任務隊列;

從偷取者的base索引開始,每次偷取一個任務執行。

tryCompensate:

tryCompensate主要用來補償工作線程因為阻塞而導致的算力損失,當工作線程自身的隊列不為空,且還有其它空閑工作線程時,如果自己阻塞了,則在此之前會喚醒一個工作線程。

四、總結

本章和上一章——Fork/Join框架(1) 原理,從思想、使用、實現等方面較完整地分析了Fork/Join框架,Fork/Join框架的使用需要根據實際情況劃分子任務的大小。

理解F/J框架需要先從整體上了解框架調度任務的流程(參考本章開頭的調度圖),可以自己通過示例模擬一個任務的調度過程,然后根據實際運用過程中遇到的問題,再去調試及在相應的源碼中查看實現原理。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/71970.html

相關文章

  • Java線程進階四三)—— J.U.Cexecutors框架Fork/Join框架(1) 原

    摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優先處理自身隊列中的任務或順序,由線程池構造時的參數決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發于一世流云的專欄:https://segmentfau...

    cooxer 評論0 收藏0
  • Java線程進階(一)—— J.U.C并發包概述

    摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...

    anonymoussf 評論0 收藏0
  • Java線程進階(三九)—— J.U.Cexecutors框架executors框架概述

    摘要:注意線程與本地操作系統的線程是一一映射的。固定線程數的線程池提供了兩種創建具有固定線程數的的方法,固定線程池在初始化時確定其中的線程總數,運行過程中會始終維持線程數量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發于一世流云專欄:https://segmentfault.com/blog... ...

    wdzgege 評論0 收藏0
  • Java線程進階(四二)—— J.U.Cexecutors框架:Future模式

    摘要:本文首發于一世流云的專欄一模式簡介模式是多線程設計模式中的一種常見模式,它的主要作用就是異步地執行任務,并在需要的時候獲取結果。二中的模式在多線程基礎之模式中,我們曾經給出過模式的通用類關系圖。 showImg(https://segmentfault.com/img/bVbiwcx?w=1000&h=667); 本文首發于一世流云的專欄:https://segmentfault.co...

    marek 評論0 收藏0
  • 聊聊面試中關于并發問題的應對方案

    摘要:這里呢,我直接給出高并發場景通常都會考慮的一些解決思路和手段結尾如何有效的準備面試中并發類問題,我已經給出我的理解。 showImg(https://segmentfault.com/img/bV7Viy?w=550&h=405); 主題 又到面試季了,從群里,看到許多同學分享了自己的面試題目,我也抽空在網上搜索了一些許多公司使用的面試題,目前校招和社招的面試題基本都集中在幾個大方向上...

    xzavier 評論0 收藏0

發表評論

0條評論

FingerLiu

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<