摘要:主要的實現實際上運行還是一個,它對做了一個封裝,讓開發人員可以從其中獲取返回值是有狀態的共種狀態,四種狀態變換的可能和的區別通過方法調用有返回值可以拋異常結果的實現原理判斷狀態非狀態則直接進入返回結果處于狀態,則進入等待流程獲
主要的實現FutureTask
# FutureTask實際上運行還是一個runnable,它對callable做了一個封裝,讓開發人員可以從其中獲取返回值; FutrueTask是有狀態的 共7種狀態,四種狀態變換的可能 NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> COMPLETING -> NORMAL NEW -> INTERRUPTING -> INTERRUPTEDCallable和runnable的區別
0. 通過call方法調用; 1. 有返回值 2. 可以拋異常get結果的實現原理
1. 判斷狀態; 2. 非NEW,COMPLETING狀態則直接 進入report返回結果; 3. 處于NEW,COMPLETING狀態,則進入等待awaitDone();3.x awaitDone 流程
3.1. 獲取等待的超時時間deadline; 3.2. 進入自旋 3.3. 判斷線程是否被中斷:如果被中斷則移出等待waiters隊列;并拋出異常; 3.4. 判斷FutrueTask狀態:如果">COMPLETING",代表執行完成,進入report; 3.5. 判斷FutrueTask狀態:如果"=COMPLETING",讓出CPU執行Thread.yield(); 3.6. 為當前線程創建一個node節點; 3.7. 將當前線程WaitNode加入等待隊列waiters中; 3.8. 判斷是否超時; 3.9. 通過LockSupport.park掛起線程,等待運行許可; 4. report返回執行結果:如果一切正常就返回執行結果,否則返回Exception;run具體執行原理如下:
1. 判斷狀態是否正常,避免重復執行; 2. 調用callable的call()方法; 3. 修改執行狀態;保存執行結果;并通知正在等待get的線程; ## 3.x通知機制finishCompletion 3.1. 獲取所有waiters的集合; 3.2. 通過cas 拿到執行權; 3.3. 循環遍歷所有等待的線程,通過LockSupport.unpark 喚醒其執行;Callable和Future的實現原理(JDK8源碼分析) 1. cancel 取消執行
public boolean cancel(boolean mayInterruptIfRunning) { // 判斷狀態:只有剛創建的情況下才能取消 // mayInterruptIfRunning:是否中斷當前正在運行這個FutureTask的線程; if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 如果要中斷當前線程,則對runner發布interrupt信號; if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改狀態為:已經通知線程進行中斷 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 通知其他在等待結果的線程 finishCompletion(); } return true; }2. run
public void run() { // 判斷狀態及設置futuretask歸屬的線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable3. getc = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執行Callable result = c.call(); // 標記為執行成功 ran = true; } catch (Throwable ex) { result = null; // 標記為執行不成功 ran = false; // 設置為異常狀態,并通知其他在等待結果的線程 setException(ex); } // 如果執行成功,修改狀態為正常,并通知其他在等待結果的線程 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果狀態為準備發起中斷信號或者已經發出中斷信號,則讓出CPU(Thread.yield()) if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果還沒執行完,則等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 通過report取結果 return report(s); }3.1 report 取執行結果
private V report(int s) throws ExecutionException { Object x = outcome; // 如果一切正常,則返回x(x是callable執行的結果outcome) if (s == NORMAL) return (V)x; // 如果被取消,則拋出已取消異常 if (s >= CANCELLED) throw new CancellationException(); // 否則拋出執行異常 throw new ExecutionException((Throwable)x); }3.2 awaitDone 等待FutureTask執行結束
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 記錄等待超時的時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 多個在等待結果的線程,通過一個鏈表進行保存,waitNode就是每個線程在鏈表中的節點; WaitNode q = null; boolean queued = false; // 死循環...也可以說是自旋鎖同步 for (;;) { // 判斷當前這個調用get的線程是否被中斷 if (Thread.interrupted()) { // 將當前線程移出隊列 removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果狀態非初創或執行完畢了,則跳出循環,通過report()取執行結果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果狀態等于已執行,讓出CPU執行,等待狀態變為正常結束 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果當前線程還沒有創建對象的waitNode節點,則創建一個 else if (q == null) q = new WaitNode(); // 如果當前線程對應的waitNode還沒有加入到等待鏈表中,則加入進去; else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有設置等待超時時間,則通過parkNanos掛起當前線程,等待繼續執行的信號 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 通過park掛起當前線程,等待task執行結束后給它發一個繼續執行的信號(unpark) else LockSupport.park(this); } }4. finishCompletion 通知所有在等待結果的線程
private void finishCompletion() { // assert state > COMPLETING; // 遍歷所有正在等待執行結果的線程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // unpark,發布一個讓它繼續執行的“許可” LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69216.html
摘要:表示一個異步任務的結果,就是向線程池提交一個任務后,它會返回對應的對象。它們分別提供兩個重要的功能阻塞當前線程等待一段時間直到完成或者異常終止取消任務。此時,線程從中返回,然后檢查當前的狀態已經被改變,隨后退出循環。 0 引言 前段時間需要把一個C++的項目port到Java中,因此時隔三年后重新熟悉了下Java。由于需要一個通用的線程池,自然而然就想到了Executors。 用了...
摘要:零前期準備文章異常啰嗦且繞彎。版本版本簡介是中默認的實現類,常與結合進行多線程并發操作。所以方法的主體其實就是去喚醒被阻塞的線程。本文僅為個人的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充 零 前期準備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...
摘要:本文的源碼基于。人如其名,包含了和兩部分。而將一個任務的狀態設置成終止態只有三種方法我們將在下文的源碼解析中分析這三個方法。將棧中所有掛起的線程都喚醒后,下面就是執行方法這個方法是一個空方 前言 系列文章目錄 有了上一篇對預備知識的了解之后,分析源碼就容易多了,本篇我們就直接來看看FutureTask的源碼。 本文的源碼基于JDK1.8。 Future和Task 在深入分析源碼之前,我...
摘要:從而可以啟動和取消異步計算任務查詢異步計算任務是否完成和獲取異步計算任務的返回結果。原理分析在分析中我們沒有看它的父類,其中有一個方法,返回一個,說明該方法可以獲取異步任務的返回結果。 FutureTask介紹 FutureTask是一種可取消的異步計算任務。它實現了Future接口,代表了異步任務的返回結果。從而FutureTask可以啟動和取消異步計算任務、查詢異步計算任務是否完成...
閱讀 2919·2021-11-17 09:33
閱讀 1639·2021-10-12 10:13
閱讀 2462·2021-09-22 15:48
閱讀 2338·2019-08-29 17:19
閱讀 2596·2019-08-26 11:50
閱讀 1572·2019-08-26 10:37
閱讀 1738·2019-08-23 16:54
閱讀 2925·2019-08-23 14:14