国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Java ThreadPoolExecutor 線程池源碼分析

greatwhole / 1626人閱讀

摘要:線程池常見實現線程池一般包含三個主要部分調度器決定由哪個線程來執行任務執行任務所能夠的最大耗時等線程隊列存放并管理著一系列線程這些線程都處于阻塞狀態或休眠狀態任務隊列存放著用戶提交的需要被執行的任務一般任務的執行的即先提交的任務先被執行調度

線程池常見實現

線程池一般包含三個主要部分:

調度器: 決定由哪個線程來執行任務, 執行任務所能夠的最大耗時等

線程隊列: 存放并管理著一系列線程, 這些線程都處于阻塞狀態或休眠狀態

任務隊列: 存放著用戶提交的需要被執行的任務. 一般任務的執行 FIFO 的, 即先提交的任務先被執行

調度器并非是必須的, 例如 Java 中實現的 ThreadPoolExecutor 就沒有調度器, 而是所有的線程都不斷從任務隊列中取出任務, 然后執行.
線程池模型可以用下圖簡單地表示:

線程池基本概念 線程池大小

ThreadPoolExecutor 有兩個參數用于控制線程池中線程的個數: corePoolSizemaximumPoolSize, 根據這兩個參數, ThreadPoolExecutor 會自適應地調整線程個數, 以適應不同的任務數.
當我們通過 execute(Runnable) 提交一個任務時:

如果此時線程池中線程個數小于 corePoolSize, 則此任務不會插入到任務隊列中, 而是直接創建一個新的線程來執行此任務, 即使當前線程池中有空閑的線程.

如果線程數大于 corePoolSize 但是小于 maximumPoolSize:

如果任務隊列還未滿, 則會將此任務插入到任務隊列末尾;

如果此時任務隊列已滿, 則會創建新的線程來執行此任務.

如果線程數等于 maximumPoolSize:

如果任務隊列還未滿, 則會將此任務插入到任務隊列末尾;

如果此時任務隊列已滿, 則會又 RejectedExecutionHandler 處理, 默認情況下是拋出 RejectedExecutionException 異常.

線程的 Keep-Alive 時間

在創建一個線程池時, 我們可以指定線程池中的線程的最大空閑(Idle)時間, 線程池會根據我們設置的這個值來動態的減少不必要的線程, 釋放系統資源.
當我們的線程池中的線程數大于 corePoolSize 時, 如果此時有線程處于空閑(Idle)狀態超過指定的時間(keepAliveTime), 那么線程池會將此線程銷毀.

工作隊列

工作隊列(WorkQueue) 是 一個 BlockingQueue, 它時用于存放那些已經提交的, 但是還沒有空余線程來執行的任務. 例如我們在前面 線程池大小 一節中討論的情況, 如果當前的線程數大于 corePoolSize 并且工作隊列的還有剩余空間, 那么新提交的任務就會先放到工作隊列中.

根據 Java Docs, 有三種常見的工作隊列的使用場景:

直接切換(Direct handoffs): 一個不錯并且是默認的工作隊列的選擇時

無界隊列(Unbounded queues)

有界隊列(Bounded queues)

任務提交失敗處理

因為線程池中維護有一個工作隊列, 我們自然地會想到, 當線程池中的工作隊列滿了, 不能再添加新的任務了, 此時線程池會怎么處理呢?
一般來說, 當我們提交一個任務到線程池中, 如果此時線程池不能再添加任務了, 那么通常會返回一個錯誤, 或者是調用我們預先設置的一個錯誤處理 handler, 例如在 Java ThreadPoolExecutor 中, 我們可以通過如下方式實例化一個帶有任務提交失敗 handler 的線程池:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(1),
        new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r.toString() + " failed!");
    }
});
活躍線程數與線程池狀態

ThreadPoolExecutor 中有一個名為 ctl 的字段, 它是一個 AtomicInteger 類型, ThreadPoolExecutor 復用了此字段來表示兩個信息:

當前活躍的線程數

線程池狀態

ctl 是一個 AtomicInteger 類型, 它的 低29位 用于存放當前的線程數, 因此一個線程池在理論上最大的線程數是 536870911; 高 3 位是用于表示當前線程池的狀態, 其中高三位的值和狀態對應如下:

111: RUNNING

000: SHUTDOWN

001: STOP

010: TIDYING

110: TERMINATED

線程池的基本使用 創建線程池

前面我們提到, 一個線程池中有 corePoolSize, maximumPoolSize, keepAliveTime, workQueue 之類的概念, 這些屬性我們必須在實例化線程池時通過構造器傳入. Java 線程池實現類 ThreadPoolExecutor 中提供了不少構造方法, 我們來看一下其中兩個常用的構造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

可以看到, 在實例化一個 ThreadPoolExecutor 線程池時, 我們需要指定一些線程池的基本屬性, 并且可選地, 我們還可以指定當任務提交失敗時的處理 handler.

例如我們可以通過如下方式實例化一個帶有任務提交失敗 handler 的線程池:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(1),
        new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task " + r.toString() + " failed!");
    }
});

當然, 除了上述使用構造器來直接創建線程池, Java 還提供了幾個簡便地創建線程池的方法:

Executors.newCachedThreadPool

Executors.newFixedThreadPool

Executors.newWorkStealingPool

Executors.newSingleThreadExecutor

Executors.newScheduledThreadPool

例如我們想創建一個有五個線程的線程池, 那么可以調用 Executors.newFixedThreadPool, 這個方法等效于:

new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue())
提交任務

提交任務到線程池中比較簡單, 如果是 ThreadPoolExecutor 類型的線程池, 我們直接調用它的 execute 方法即可, 例如:

ExecutorService executorService = ...
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
});

如果我們獲取到一個 ScheduledThreadPoolExecutor 類型的線程池, 那么除了調用 execute 方法外, 我們還可以通過調用 schedule 方法提交一個定時任務, 例如:

ScheduledExecutorService executorService = xxx
executorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
}, 1, TimeUnit.SECONDS);

上面代代碼就會在1秒后執行我們的定時任務.

關閉線程池

Java 線程池提供了兩個方法用于關閉一個線程池, 一個是 shutdownNow(), 另一個是 shutdown(). 我們可以看一下這兩個方法的簽名:

void shutdown();
List shutdownNow();

這兩個方法除了名字不一樣外(廢話), 它們的返回值也不太一樣.
那么這兩個方法到底有什么區別呢? 它們的區別有:

當線程池調用該方法時,線程池的狀態則立刻變成 SHUTDOWN 狀態. 我們不能再往線程池中添加任何任務, 否則將會拋出RejectedExecutionException異常; 但是, 此時線程池不會立刻退出, 直到添加到線程池中的任務都已經處理完成后才會退出.

當執行該方法, 線程池的狀態立刻變成STOP狀態, 并試圖停止所有正在執行的線程, 不再處理還在池隊列中等待的任務, 并以返回值的形式返回那些未執行的任務.
此方法會通過調用 Thread.interrupt() 方法來試圖停止正在運行的 Worker 線程, 但是這種方法的作用有限, 如果線程中沒有 sleep 、wait、Condition、定時鎖 等操作時, interrupt() 方法是無法中斷當前的線程的. 所以, ShutdownNow() 并不代表線程池就一定立即就能退出, 可能必須要等待所有正在執行的任務都執行完成了才能退出.

廢話了一大堆, 我們來看一下具體的例子吧:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("OK, thread name: " + Thread.currentThread().getName());
    }
}, 1, TimeUnit.SECONDS);

// 調用此方法關閉線程時, 我們提交的定時任務不會被執行
// executorService.shutdownNow();

executorService.shutdown();

可以看到, 如果我們調用的是 executorService.shutdownNow(), 那么原先提交的未執行的定時任務并不會再被執行, 但是如果我們調用的是 executorService.shutdown(), 那么此調用會阻塞住, 直到所有提交的任務都執行完畢才會返回.

代碼分析 線程池的屬性字段

在開始深入了解 ThreadPoolExecutor 代碼之前, 我們先來簡單地看一下 ThreadPoolExecutor 類中到底有哪些重要的字段.

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 這個是一個復用字段, 它復用地表示了當前線程池的狀態, 當前線程數信息.
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 用于存放提交到線程池中, 但是還未執行的那些任務.
    private final BlockingQueue workQueue;

    // 線程池內部鎖, 對線程池內部操作加鎖, 防止競態條件
    private final ReentrantLock mainLock = new ReentrantLock();

    // 一個 Set 結構, 包含了當前線程池中的所有工作線程.
    // 對 workers 字段的操作前, 需要獲取到這個鎖.
    private final HashSet workers = new HashSet();

    // 條件變量, 用于支持 awaitTermination 操作
    private final Condition termination = mainLock.newCondition();

    // 記錄線程池中曾經到達過的最大的線程數.
    // 這個字段在獲取 mainLock 鎖的前提下才能操作.
    private int largestPoolSize;

    // 記錄已經完成的任務數. 僅僅當工作線程結束時才更新此字段.
    // 這個字段在獲取 mainLock 鎖的前提下才能操作.
    private long completedTaskCount;

    // 線程工廠. 當需要一個新的線程時, 這里生成.
    private volatile ThreadFactory threadFactory;

    // 任務提交失敗后的處理 handler
    private volatile RejectedExecutionHandler handler;

    // 空閑線程的等待任務時間, 以納秒為單位.
    // 當當前線程池中的線程數大于 corePoolSize 時, 
    // 或者 allowCoreThreadTimeOut 為真時, 線程才有 idle 等待超時時間, 
    // 如果超時則此線程會停止.; 
    // 反之線程會一直等待新任務到來.
    private volatile long keepAliveTime;

    // 默認為 false.
    // 當為 false 時, keepAliveTime 不起作用, 線程池中的 core 線程會一直存活, 
    // 即使這些線程是 idle 狀態.
    // 當為 true 時, core 線程使用 keepAliveTime 作為 idle 超時
    // 時間來等待新的任務.
    private volatile boolean allowCoreThreadTimeOut;

    // 核心線程數.
    private volatile int corePoolSize;

    // 最大線程數.
    private volatile int maximumPoolSize;
}

ThreadPoolExecutor 中, 使用到 ctl 這個字段來維護線程池中當前線程數和線程池的狀態. ctl 是一個 AtomicInteger 類型, 它的 低29位 用于存放當前的線程數, 因此一個線程池在理論上最大的線程數是 536870911; 高 3 位是用于表示當前線程池的狀態, 其中高三位的值和狀態對應如下:

111: RUNNING

000: SHUTDOWN

001: STOP

010: TIDYING

110: TERMINATED

提交任務到線程池
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

上面的代碼有三個步驟, 首先第一步是檢查當前線程池的線程數是否小于 corePoolSize, 如果小于, 那么由我們前面提到的規則, 線程池會創建一個新的線程來執行此任務, 因此在第一個 if 語句中, 會調用 addWorker(command, true) 來創建一個新 Worker 線程, 并執行此任務. addWorker 的第二個參數是一個 boolean 類型的, 它的作用是用于標識是否需要使用 corePoolSize 字段, 如果它為真, 則添加新任務時, 需要考慮到 corePoolSize 字段的影響. 這里至于 addWorker 內部的實現細節我們暫且不管, 先把整個提交任務的大體脈絡理清了再說.

如果前面的判斷不滿足, 那么會將此任務插入到工作隊列中, 即 workQueue.offer(command). 當然, 為了健壯性考慮, 當插入到 workQueue 后, 我們還需要再次檢查一下此時線程池是否還是 RUNNING 狀態, 如果不是的話就會將原來插入隊列中的那個任務刪除, 然后調用 reject 方法拒絕此任務的提交; 接著考慮到在我們插入任務到 workQueue 中的同時, 如果此時線程池中的線程都執行完畢并終止了, 在這樣的情況下剛剛插入到 workQueue 中的任務就永遠不會得到執行了. 為了避免這樣的情況, 因此我們由再次檢查一下線程池中的線程數, 如果為零, 則調用 addWorker(null, false) 來添加一個線程.
如果前面所分析的情況都不滿足, 那么就會進入到第三個 if 判斷, 在這里會調用 addWorker(command, false) 來將此任務提交到線程池中. 注意到這個方法的第二個參數是 false, 表示我們在此次調用 addWorker 時, 不考慮 corePoolSize 的影響, 即忽略 corePoolSize 字段.

關于 addWorker 方法

前面我們大體分析了一下 execute 提交任務的流程, 不過省略了一個關鍵步驟, 即 addWorker 方法. 現在我們就來揭開它的神秘面紗吧.
首先看一下 addWorker 方法的簽名:

private boolean addWorker(Runnable firstTask, boolean core)

這個方法接收兩個參數, 第一個是一個 Runnable 類型的, 一般來說是我們調用 execute 方法所傳輸的參數, 不過也有可能是 null 值, 這樣的情況我們在前面一小節中也見到過.
那么第二個參數是做什么的呢? 第二個參數是一個 boolean 類型的變量, 它的作用是標識是否使用 corePoolSize 屬性. 我們知道, ThreadPoolExecutor 中, 有一個 corePoolSize 屬性, 用于動態調整線程池中的核心線程數. 那么當 core 這個參數是 true 時, 則表示在添加新任務時, 需要考慮到 corePoolSzie 的影響(例如如果此時線程數已經大于 corePoolSize 了, 那么就不能再添加新線程了); 當 core 為 false 時, 就不考慮 corePoolSize 的影響(其實代碼中是以 maximumPoolSize 作為 corePoolSize 來做判斷條件的), 一有新任務, 就對應地生成一個新的線程.
說了這么多, 還不如來看一下 addWorker 的源碼吧:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 這里一大段的 for 語句, 其實就是判斷和處理 core 參數的.
    // 當經過判斷, 如果當前的線程大于 corePoolSize 或 maximumPoolSize 時(根據 core 的值來判斷), 
    // 則表示不能新建新的 Worker 線程, 此時返回 false.
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 當 core 為真, 那么就判斷當前線程是否大于 corePoolSize
            // 當 core 為假, 那么就判斷當前線程數是否大于 maximumPoolSize
            // 這里的 for 循環是一個自旋CAS(CompareAndSwap)操作, 用于確保多線程環境下的正確性
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : ma))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

首先在 addWorker 的一開始, 有一個 for 循環, 用于判斷當前是否可以添加新的 Worker 線程. 它的邏輯如下:

如果傳入的 core 為真, 那么判斷當前的線程數是否大于 corePoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.

如果傳入的 core 為假, 那么判斷當前的線程數是否大于 maximumPoolSize, 如果大于, 則不能新建 Worker 線程, 返回 false.

如果條件符合, 那么在 for 循環內, 又有一個自旋CAS 更新邏輯, 用于遞增當前的線程數, 即 compareAndIncrementWorkerCount(c), 這個方法會原子地更新 ctl 的值, 將當前線程數的值遞增一.

addWorker 接下來有一個 try...finally 語句塊, 這里就是實際上的創建線程、啟動線程、添加線程到線程池中的工作了.
首先可以看到 w = new Worker(firstTask); 這里是實例化一個 Worker 對象, 這個類其實就是 ThreadPoolExecutor 中對工作線程的封裝. Worker 類繼承于 AbstractQueuedSynchronizer 并實現了 Runnable 接口, 我們來看一下它的構造器:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

它會把我們提交的任務(firstTask) 設置為自己的內部屬性 firstTask, 然后呢, 使用 ThreadPoolExecutor 中的 threadFactory 來創建一個新的線程, 并保存在 thread 字段中, 而且注意到, 創建線程時, 我們傳遞給新線程城的 Runnable 其實是 Worker 對象本身(this), 因此當這個線程啟動時, 實際上運行的是 Worker.run() 中的代碼.

回過頭來再看一下 addWorker 方法. 當創建好 Worker 線程后, 就會將這個 worker 線程存放在 workers 這個 HashSet 類型的字段中. 而且注意到, 正如我們在前面所提到的, mainLock 是 ThreadPoolExecutor 的內部鎖, 我們對 ThreadPoolExecutor 中的字段進行操作時, 為了保證線程安全, 因此都需要在獲取到 mainLock 的前提下才能操作的.

最后別忘啦, 新建了一個線程后, 需要調用它的 start() 方法后, 這個線程才真正地運行, 因此我們可以看到, 在 addWorker 方法的最后, 調用了 t.start(); 來啟動這個新建的線程.

任務的分配與調度

我們已經分析了工作線程的創建和任務插入到 wokerQuque 的過程, 那么根據本文最開頭的線程池工作模型可知, 光有工作線程和工作隊列還不行啊, 還需要有一個調度器, 把任務和工作線程關聯起來才是一個真正的線程池.

在 ThreadPoolExecutor 中, 調度器的實現很簡單, 其實就是每個工作線程在執行完一個任務后, 會再次中 workQueue 中拿出下一個任務, 如果獲取到了任務, 那么就再次執行.
我們來看一下具體的代碼實現吧.
在前面一小節中, 我們講到 addWorker 中會新建一個 Worker 對象來代表一個 worker 線程, 接著會調用線程的 start() 來啟動這個線程, 我們也提到了當啟動這個線程后, 會運行到 Worker 中的 run 方法, 那么這里我們就來看一下 Worker.run有什么玄機吧:

public void run() {
    runWorker(this);
}

Worker.run 方法很簡單, 只是調用了 ThreadPoolExecutor.runWorker 方法而已.
runWorker 方法比較關鍵, 它是整個線程池任務分配的核心:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法是整個工作線程的核心循環, 在這個循環中, 工作線程會不斷的從 workerQuque 中獲取新的 task, 然后執行它.
我們注意到在 runWorker 一開始, 有一個 w.unlock();, 咦, 這是為什么呢? 其實這是 Worker 類玩的一個小把戲. 回想一下, Worker 類繼承于 AbstractQueuedSynchronizer 并實現了 Runnable 接口, 它的構造器如下:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

setState(-1); 方法是 AbstractQueuedSynchronizer 提供的, 初始化 Worker 時, 會先設置 state 為 -1, 根據注釋, 這樣做的原因是為了抑制工作線程的 interrupt 信號, 直到此工作線程正是開始執行 task. 那么在 addWorker 中的 w.unlock(); 就是允許 Worker 的 interrupt 信號.
接著在 addWorker 中會進入一個 while 循環, 在這里此工作線程會不斷地從 workQueue 中取出一個任務, 然后調用 task.run() 來執行這個任務, 因此就執行到了用戶所提交的 Runnable 中的 run() 方法了.

工作線程的 idle 超時處理

工作線程的 idle 超出處理在底層依賴于 BlockingQueue 帶超時的 poll 方法, 即工作線程會不斷地從 workQueue 這個 BlockingQueue 中獲取任務, 如果 allowCoreThreadTimeOut 字段為 true, 或者當前的工作線程數大于 corePoolSize, 那么線程的 idle 超時機制就生效了, 此時工作線程會以帶超時的 poll 方式從 workQueue 中獲取任務. 當超時了還沒有獲取到任務, 那么我們就知道此線程一個到達 idle 超時時間, 因此終止此工作線程.
具體源碼如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

從源碼中就可以看到, 一開始會判斷當前的線程池狀態, 如果不是 SHUTDOWNSTOP 之類的狀態, 那么接著獲取當前的工作線程數, 然后判斷工作線程數量是否已經大于了 corePoolSize. 當 allowCoreThreadTimeOut 字段為 true, 或者當前的工作線程數大于 corePoolSize, 那么線程的 idle 超時機制就生效, 此時工作線程會以帶超時的 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式從 workQueue 中獲取任務; 反之會以 workQueue.take() 方式阻塞等待任務, 直到獲取一個新的任務.
當從 workQueue 獲取新任務超時時, 那么就會調用 compareAndDecrementWorkerCount 將當前的工作線程數減一, 并返回 null. getTask 方法返回 null 后, 那么 runWorker 中的 while 循環自然也就結束了, 因此也導致了 runWorker 方法的返回, 最后自然整個工作線程的 run() 方法執行完畢, 工作線程自然就終止了.

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66811.html

相關文章

  • 后端ing

    摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...

    roadtogeek 評論0 收藏0
  • 使用 Executors,ThreadPoolExecutor,創建線程源碼分析理解

    摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 評論0 收藏0
  • 一看就懂的Java線程分析詳解

    摘要:任務性質不同的任務可以用不同規模的線程池分開處理。線程池在運行過程中已完成的任務數量。如等于線程池的最大大小,則表示線程池曾經滿了。線程池的線程數量。獲取活動的線程數。通過擴展線程池進行監控。框架包括線程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個子線程共同在運行的線程組合。 舉個容易理解的例子:有個線程組合(即線程池,咱可以比喻為一個公司),里面有3...

    Yangder 評論0 收藏0
  • Java調度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....

    cheukyin 評論0 收藏0
  • Java調度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....

    myshell 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<