摘要:什么是協程大多數的開發人員可能對進程,線程這兩個名字比較熟悉。但是為了追求最大力度的發揮硬件的性能和提升軟件的速度,出現了協程或者叫纖程,或者綠色線程。原理利用字節碼增強,將普通的代碼轉換為支持協程的代碼。
什么是協程
大多數的開發人員可能對進程,線程這兩個名字比較熟悉。但是為了追求最大力度的發揮硬件的性能和提升軟件的速度,出現了協程或者叫纖程(Fiber),或者綠色線程(GreenThread)。那我們來聊下什么是協程,以及在java中是怎么體現和運用協程的。
在說協程之前,我們先來回想下,現在大多數的程序中,都是使用了多線程技術來解決一些需要長時間阻塞的場景。JAVA中每個線程棧默認1024K,沒有辦法開成千上萬個線程,而且就算通過JVM參數調小,CPU也無法分配時間片給每個線程,大多數的線程還是在等待中,所以我們一般會使用
Runtime.getRuntime().availableProcessors()來配置線程數的大小(或者會根據實際情況調整,就不展開討論了),但是就算是我們開了新的線程,該線程也可能是在等待系統IO的返回或者網絡IO的返回,而且線程的切換有著大量的開銷。
為了解決上面說的問題,大家可能會想到回調?,F在很多框架都是基于回調來解決那些耗時的操作。但層數嵌套多了反而會引起反人類的回調地獄,并且回調后就丟失原函數的上下文。其中的代表呢就比如說nodeJs。
終于可以來聊聊協程。它的基本原理是:在某個點掛起當前的任務,并且保存棧信息,去執行另一個任務;等完成或達到某個條件時,在還原原來的棧信息并繼續執行。上面提到的幾個點大家會想到JVM的結構,棧, 程序計數器等等,但是JVM原生是不支持這樣的操作的(至少java是不支持的,kotlin是可以的)。因此如果要在純java代碼里需要使用協程的話需要引入第三方包,如kilim,Quasar。而kilim已經很久未更新了,那么我們來看看Quasar。
Quasar原理
1、利用字節碼增強,將普通的java代碼轉換為支持協程的代碼。
2、在調用pausable方法的時候,如果pause了就保存當前方法棧的State,停止執行當前協程,將控制權交給調度器
3、調度器負責調度就緒的協程
4、協程resume的時候,自動恢復State,根據協程的pc計數跳轉到上次執行的位置,繼續執行。
這些第三方的框架大部分實現是一致的。通過對字節碼直接操作,在編譯期把你寫的代碼變為支持協程的版本,并在運行時把你所有需要用到協程的部分由他來控制和調度,同時也支持在運行期這樣做。
Quasar中使用了拋異常的方式來中斷線程,但是 實際上如果我們捕獲了這個異常就會產生問題,所以一般是以這種方式來注冊:
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這里不應該捕獲到異常.
throw new AssertionError(s);
}
}
在調度方面,Quasar中默認使用了JDK7以上才有的ForkJoinPool,它的優勢就在于空閑的線程會去從其他線程任務隊列尾部”偷取”任務來自己處理,因此也叫work-stealing功能。這個功能可以大大的利用CPU資源,不讓線程白白空閑著。
Quasar模塊
Fiber
Fiber可以認為是一個微線程,使用方式基本上和Thread相同,啟動start:
new Fiber
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
new Fiber
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();
其實它更類似于一個CallBack,是可以攜帶返回值的,并且可以拋異常SuspendExecution,InterruptedException。你也可以向其中傳遞SuspendableRunnable 或 SuspendableCallable 給Fiber的構造函數。你甚至可以像線程一樣調用join(),或者get()來阻塞線程等待他完成。
當Fiber比較大的時候,Fiber可以在調用parkAndSerialize 方法時被序列化,在調用unparkSerialized時被反序列化。
從以上我們可以看出Fiber與Thread非常類似,極大的減少了遷移的成本。
FiberScheduler
FiberScheduler是Quasar框架中核心的任務調度器,負責管理任務的工作者線程WorkerThread,之前提到的他是一個FiberForkJoinScheduler。
ForkJoinPool的默認初始化個數為Runtime.getRuntime().availableProcessors()。
instrumentation
當一個類被加載時,Quasar的instrumentation模塊 (使用 Java agent時) 搜索suspendable 方法。每一個suspendable 方法 f通過下面的方式 instrument:
它搜索對其它suspendable方法的調用。對suspendable方法g的調用,一些代碼會在這個調用g的前后被插入,它們會保存和恢復fiber棧本地變量的狀態,記錄這個暫停點。在這個“suspendable function chain”的最后,我們會發現對Fiber.park的調用。park暫停這個fiber,扔出 SuspendExecution異常。
當g block的時候,SuspendExecution異常會被Fiber捕獲。 當Fiber被喚醒(使用unpark), 方法f會被調用, 執行記錄顯示它被block在g的調用上,所以程序會立即跳到f調用g的那一行,然后調用它。最終我們會到達暫停點,然后繼續執行。當g返回時, f中插入的代碼會恢復f的本地變量。
過程聽起來很復雜,但是它只會帶來3% ~ 5%的性能的損失。
下面看一個簡單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調用m2和m3,所以也聲明拋出這個異常,最后這個異常會被Fiber所捕獲:
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = "m1"; System.out.println("m1 begin"); m = m2(); m = m3(); System.out.println("m1 end"); System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return "m2";
}
static String m3() throws SuspendExecution, InterruptedException {
return "m3";
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { m1(); } }).start();
}
}
// 反編譯后的代碼
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我并沒有更深入的去了解Quasar的實現細節以及調度算法,有興趣的讀者可以翻翻它的代碼。
實戰
public class Helloworld {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m = "m1"; //System.out.println("m1 begin"); m = m2(); //System.out.println("m1 end"); //System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3(); Strand.sleep(1000); return m;
}
//or define in META-INF/suspendables
@Suspendable
static String m3() {
List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList()); return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000; testThreadpool(count); testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(200); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { es.submit(() -> { long start = System.currentTimeMillis(); try { m1(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SuspendExecution suspendExecution) { suspendExecution.printStackTrace(); } start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); }); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("thread pool took: " + t + ", latency: " + l + " ms"); es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { long start = System.currentTimeMillis(); m1(); start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); } }).start(); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("fiber took: " + t + ", latency: " + l + " ms");
}
}
OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms
可以看到很明顯的時間差距,存在多線程阻塞的情況下,協程的性能非常的好,但是。如果把sleep這段去掉,Fiber的性能反而更差:
這說明Fiber并不意味著它可以在所有的場景中都可以替換Thread。當fiber的代碼經常會被等待其它fiber阻塞的時候,就應該使用fiber。
對于那些需要CPU長時間計算的代碼,很少遇到阻塞的時候,就應該首選thread
擴展
其實協程這個概念在其他的語言中有原生的支持,如:
kotlin 1.30之后已經穩定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...~
在這些語言中協程就看起來至少沒這么奇怪或者難以理解了,而且開發起開也相比java簡單很多。
總結
協程的概念也不算是很新了,但是在像Java這樣的語言或者特定的領域并不是很火,也并沒有完全普及。不是很明白是它的學習成本高,還是說應用場景是在太小了。但是當我聽到這個概念的時候確實是挺好奇,也挺好奇的。也希望之后會有更多的框架和特性來簡化我們苦逼程序員的開發~~
參考文獻
http://docs.paralleluniverse....
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74561.html
摘要:本周提交的一份增強建議草案要求將虛擬線程作為標準版的一部分進行預覽。虛擬線程目的是更好地支持編寫和維護高吞吐量并發應用程序。該提案指出,使用虛擬線程不需要學習新的編程模型。我們知道 Go 語言最大亮點之一就是原生支持并發,這得益于 Go 語言的協程機制。一個 go 語句就可以發起一個協程 (goroutin)。 協程本質上是一種用戶態線程,它不需要操作系統來進行調度,而是由用戶程序自行管理...
摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務器的帶寬只有,某個資源的下載速度是,系統啟動個線程下載該資源并不會導致下載速度編程,所以在并發編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Jav...
摘要:很長一段時間,我都很天真的認為,特別是以為代表的庫,才是協程的樂土。里是沒法實現協程,更別說實現這樣可以的協程的。咱真的是太井底之蛙了。不完全列表如下還有一個據作者說是最的這些協程庫的實現方式都是類似的,都是通過字節碼生成達到的目的。 很長一段時間,我都很天真的認為python,特別是以gevent為代表的庫,才是協程的樂土。Java里是沒法實現協程,更別說實現stackless py...
摘要:線程線程,是程序執行流的最小單元。由于線程之間的相互制約,致使線程在運行中呈現出間斷性。線程的狀態機線程也有就緒阻塞和運行三種基本狀態。在單個程序中同時運行多個線程完成不同的工作,稱為多線程。可以視為不同線程競爭一把鎖。 進程線程協程 進程 進程是一個實體。每一個進程都有它自己的地址空間, 文本區域(text region) 數據區域(data region) 堆棧(stack re...
閱讀 1891·2021-11-17 09:33
閱讀 6484·2021-10-12 10:20
閱讀 2306·2021-09-22 15:50
閱讀 1793·2021-09-22 15:10
閱讀 626·2021-09-10 10:51
閱讀 630·2021-09-10 10:50
閱讀 3048·2021-08-11 11:19
閱讀 1786·2019-08-30 15:55