摘要:時,標準類庫添加了,作為對型線程池的實現。類圖用來專門定義型任務完成將大任務分割為小任務以及合并結果的工作。
JDK 1.7 時,標準類庫添加了 ForkJoinPool,作為對 Fork/Join 型線程池的實現。Fork 在英文中有 分叉 的意思,而 Join 有 合并 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務分叉為多個小任務,然后讓小任務執行,Join 是獲得小任務的結果,然后進行合并,將合并的結果作為大任務的結果 —— 并且這會是一個遞歸的過程 —— 因為任務如果足夠大,可以將任務多級分叉直到任務足夠小。
由此可見,ForkJoinPool 可以滿足 并行 地實現 分治算法(Divide-and-Conquer) 的需要。
ForkJoinPool 的類圖如下:
可以看到 ForkJoinPool 實現了 ExecutorService 接口,所以首先 ForkJoinPool 也是一個 ExecutorService (線程池)。因而 Runnable 和 Callable 類型的任務,ForkJoinPool 也可以通過 submit、invokeAll 和 invokeAny 等方法來執行。但是標準類庫還為 ForkJoinPool 定義了一種新的任務,它就是 ForkJoinTask
ForkJoinTask 類圖:
ForkJoinTask
ForkJoinPool 可以使用三種方法用來執行 ForkJoinTask:
invoke 方法:
invoke 方法用來執行一個帶返回值的任務(通常繼承自RecursiveTask),并且該方法是阻塞的,直到任務執行完畢,該方法才會停止阻塞并返回任務的執行結果。
submit 方法:
除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執行 ForkJoinTask 的 submit 方法 —— 一般該 submit 方法用來執行帶返回值的ForkJoinTask(通常繼承自RecursiveTask)。該方法是非阻塞的,調用之后將任務提交給 ForkJoinPool 去執行便立即返回,返回的便是已經提交到 ForkJoinPool 去執行的 task —— 由類圖可知 ForkJoinTask 實現了 Future 接口,所以可以直接通過 task 來和已經提交的任務進行交互。
execute 方法:
除了從 Executor 獲得的 execute 方法外,ForkJoinPool 也定義了用來執行ForkJoinTask 的 execute 方法 —— 一般該 execute 方法用來執行不帶返回值的ForkJoinTask(通常繼承自RecursiveAction) ,該方法同樣是非阻塞的。
現在讓我們來實踐下 ForkJoinPool 的功能:計算 π 的值。計算 π 的值有一個通過多項式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多項式的項數越多,計算出的 π 的值越精確。
首先我們定義用來估算 π 的 PiEstimateTask:
class PiEstimateTask extends RecursiveTask{ private final long begin; private final long end; private final long threshold; // 分割任務的臨界值 public PiEstimateTask(long begin, long end, long threshold) { this.begin = begin; this.end = end; this.threshold = threshold; } @Override protected Double compute() { // 實現 compute 方法 if (end - begin <= threshold) { // 臨界值之下,不再分割,直接計算 int sign; // 符號,多項式中偶數位取 1,奇數位取 -1(位置從 0 開始) double result = 0.0; for (long i = begin; i < end; i++) { sign = (i & 1) == 0 ? 1 : -1; result += sign / (i * 2.0 + 1); } return result * 4; } // 分割任務 long middle = (begin + end) / 2; PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold); PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold); leftTask.fork(); // 異步執行 leftTask rightTask.fork(); // 異步執行 rightTask double leftResult = leftTask.join(); // 阻塞,直到 leftTask 執行完畢返回結果 double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執行完畢返回結果 return leftResult + rightResult; // 合并結果 } }
然后我們使用 ForkJoinPool 的 invoke 執行 PiEstimateTask:
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 計算 10 億項,分割任務的臨界值為 1 千萬 PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); double pi = forkJoinPool.invoke(task); // 阻塞,直到任務執行完畢返回結果 System.out.println("π 的值:" + pi); forkJoinPool.shutdown(); // 向線程池發送關閉的指令 } }
運行結果:
我們也可以使用 submit 方法異步的執行任務(此處 submit 方法返回的 future 指向的對象即提交任務時的 task):
public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); Futurefuture = forkJoinPool.submit(task); // 不阻塞 double pi = future.get(); System.out.println("π 的值:" + pi); System.out.println("future 指向的對象是 task 嗎:" + (future == task)); forkJoinPool.shutdown(); // 向線程池發送關閉的指令 }
運行結果:
值得注意的是,選取一個合適的分割任務的臨界值,對 ForkJoinPool 執行任務的效率有著至關重要的影響。臨界值選取過大,任務分割的不夠細,則不能充分利用 CPU;臨界值選取過小,則任務分割過多,可能產生過多的子任務,導致過多的線程間的切換和加重 GC 的負擔從而影響了效率。所以,需要根據實際的應用場景選擇一個合適的分割任務的臨界值。
ForkJoinPool 相比于 ThreadPoolExecutor,還有一個非常重要的特點(優點)在于,ForkJoinPool具有 Work-Stealing (工作竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實現為:線程池中每個線程都有一個互不影響的任務隊列(雙端隊列),線程每次都從自己的任務隊列的隊頭中取出一個任務來運行;如果某個線程對應的隊列已空并且處于空閑狀態,而其他線程的隊列中還有任務需要處理但是該線程處于工作狀態,那么空閑的線程可以從其他線程的隊列的隊尾取一個任務來幫忙運行 —— 感覺就像是空閑的線程去偷人家的任務來運行一樣,所以叫 “工作竊取”。
Work-Stealing 的適用場景是不同的任務的耗時相差比較大,即某些任務需要運行較長時間,而某些任務會很快的運行完成,這種情況下用 Work-Stealing 很合適;但是如果任務的耗時很平均,則此時 Work-Stealing 并不適合,因為竊取任務時不同線程需要搶占鎖,這可能會造成額外的時間消耗,而且每個線程維護雙端隊列也會造成更大的內存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作為對 ThreadPoolExecutor 的補充。
總結:
ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(線程池),但ForkJoinPool 的獨特點在于:
ThreadPoolExecutor 只能執行 Runnable 和 Callable 任務,而 ForkJoinPool 不僅可以執行 Runnable 和 Callable 任務,還可以執行 Fork/Join 型任務 —— ForkJoinTask —— 從而滿足并行地實現分治算法的需要;
ThreadPoolExecutor 中任務的執行順序是按照其在共享隊列中的順序來執行的,所以后面的任務需要等待前面任務執行完畢后才能執行,而 ForkJoinPool 每個線程有自己的任務隊列,并在此基礎上實現了 Work-Stealing 的功能,使得在某些情況下 ForkJoinPool 能更大程度的提高并發效率。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66494.html
摘要:第二步執行任務并合并結果。使用兩個類來完成以上兩件事情我們要使用框架,必須首先創建一個任務。用于有返回結果的任務。如果任務順利執行完成了,則設置任務狀態為,如果出現異常,則紀錄異常,并將任務狀態設置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的...
摘要:對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。是叉子分叉的意思,即將大任務分解成并行的小任務,是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。使用方法會阻塞并等待子任務執行完并得到其結果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執行任務框架,思想是將大任務分解成小任務,然后小任務又可以繼續分解,然后每個小...
摘要:這減輕了手動重復執行相同基準測試的痛苦,并簡化了獲取結果的流程。處理項目的代碼并從標有注釋的方法處生成基準測試程序。用和運行該基準測試得到以下結果。同時,和的基線測試結果也有略微的不同。 Java 8 已經發布一段時間了,許多開發者已經開始使用 Java 8。本文也將討論最新發布在 JDK 中的并發功能更新。事實上,JDK 中已經有多處java.util.concurrent 改動,但...
摘要:分區函數返回一個布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當遍歷到流中第個元素時,這個函數執行時會有兩個參數保存歸約結果的累加器已收集了流中的前個項目,還有第個元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:概述簡介并行流就是把一個內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流中將并行進行了優化,我們可以很容易的對數據進行并行操作,可以聲明性地通過與在并行流與順序流之間進行切換。 1. 概述 1.1 簡介 并行流就是把一個內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流 Java 8 中將并行進行了優化,我們可以很容易的對數據進行并行操作,Stream API 可以聲明性...
閱讀 1068·2023-04-26 02:02
閱讀 2410·2021-09-26 10:11
閱讀 3562·2019-08-30 13:10
閱讀 3751·2019-08-29 17:12
閱讀 727·2019-08-29 14:20
閱讀 2195·2019-08-28 18:19
閱讀 2241·2019-08-26 13:52
閱讀 964·2019-08-26 13:43