摘要:本文介紹和點評上的等并發編程模型。異步更適合并發編程。同步使線程阻塞,導致等待?;灸P瓦@是最簡單的模型,創建線程來執行一個任務,完畢后銷毀線程。響應式編程是一種面向數據流和變化傳播的編程模式。起源于電信領域的的編程模型。
本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發編程模型。本人經驗有限,難免粗陋,還請高手多多指教。
我們知道程序分為同步風格和異步風格。
可以寫成同步風格用多個線程來并發執行。
也可以寫成異步風格以支持更為靈活的調度。
異步更適合并發編程。
為什么要異步異步的目的:充分利用計算資源。
同步使線程阻塞,導致等待。
異步是非阻塞的,無需等待。
如果發生了不必要的等待,就會浪費資源,使程序變慢。
比如這樣的程序:
val res1 = get("http://server1") val res2 = get("http://server2") compute(res1, res2)
按照同步編程風格,一定要先拿到res1,才能開始拿res2。
按照異步編程風格,res1和res2互不依賴,發起對res1的獲取后,不必等待結果,而是馬上發起對res2的獲取,到了compute的時候,才需要阻塞等待兩個數據。
這是一種“順序解耦”。有時候我們并不要求某些操作按順序執行!那么為什么要強制其順序呢?異步風格讓我們能放棄強制,解放資源,減少不必要的等待。
如果異步操作能并行,程序性能就提升了,如果不能并行,程序性能就沒有提升。在當今的硬件條件下,一般都能并行,所以異步成為了趨勢。
怎么個并行法?這要從計算機架構說起了。讓我們把任何有處理能力的硬件看做一個處理單元——CPU顯然是主要的處理單元,I/O設備也是處理單元,比如說網卡、內存控制器、硬盤控制器。CPU可以向一或多個I/O設備發出請求,當設備在準備數據時,CPU可以做其他事情(設備就緒后會用中斷通知CPU),這時就有n個硬件在并行了!況且CPU本就是多核的,能做并行計算。除此之外,在分布式系統中,能同時調動多臺計算機配合完成任務,也是并行。
因此,讓CPU等待、每次只請求一個I/O設備、不利用多核、不利用其他空閑的計算機,都是比較浪費的。
下面我們來分析常見的并發編程模型。
基本模型 Thread這是最簡單的模型,創建線程來執行一個任務,完畢后銷毀線程。當任務數量大時,會創建大量的線程。
大家都知道大量的線程會降低性能,但是你真的清楚性能開銷在哪里嗎?我試列舉一下:
創建線程
創建一個線程是比較耗時間的。需要請求操作系統、分配??臻g、初始化等工作。
上下文切換
大家都知道的,操作系統基本概念,不再贅述。值得注意的是,WAITING狀態的線程(多見于I/O等待)幾乎不會被調度,因此并不導致過多的上下文切換。
CPU cache miss
大量線程頻繁切換,勢必要訪問不同的數據,打亂了空間局部性,導致CPU cache miss增加,需要經常訪問更慢的內存,會明顯影響CPU密集型程序的性能,這點大家恐怕沒想到吧。
內存占用
線程會增加內存占用,線程的??臻g通常占1MB,1000個就是1GB。而且在棧上引用了很多對象,暫時不能回收,你說有多少個GB?
資源占用
一些有限的資源,如鎖、數據庫連接、文件句柄等,當線程被掛起或阻塞,就暫時無人可用了,浪費!還有死鎖風險!
那么分配多少線程好呢?
對于I/O密集型程序:一個經驗數值是兩倍于數據庫連接數,例如你有30個數據庫連接,就開60個線程;我還有個經驗數值是500以下,超過500就慢一些,如果調用棧特別深,這個數值還要下調。
對于CPU密集型程序:我的經驗數值是略多于CPU核心數 (理論上是等于,但你難免有幾個阻塞操作)。除了核心數,還要考慮CPU cache的大小,最好實際測試一下。舉個例子,某司內部的自動重構程序在Intel i7 3630QM CPU上測試,3~4個線程效果最好。
傳統的網絡程序是每個會話占用一個連接、一個線程。I/O多路復用(I/O multiplexing:多個會話共用一個連接)是應C10K問題而生的,C10K就是1萬個連接。1萬個連接是很耗系統資源的,何況還有1萬個線程。從上文的分析可知,C1K的時候就可以開始運用I/O多路復用了。
Thread Pool預留一些可反復使用的線程在一個池里,反復地接受任務。線程數量可能是固定的,也可能是一定范圍內變動的,依所選擇的線程池的實現而定。
這個模型是極其常用的,例如Tomcat就是用線程池來處理請求的。
注意——盡量不要阻塞任務線程;若實在無法避免,多開一些線程——每阻塞一個線程,線程池就少一個可用的線程。
Java典型的線程池有Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newScheduledThreadPool等等,也可以直接new ThreadPoolExecutor(可指定線程數的上限和下限)。
Scala沒有增加新的線程池種類,但有個blocking方法能告訴線程池某個調用會阻塞,需要臨時增加1個線程。
FutureFuture是一個未來將會有值的對象,相當于一個占位符(提貨憑證?。?/p>
將任務投入線程池執行時,可為任務綁定一個Future,憑此Future即可在未來取得任務執行結果。未來是什么時候呢?要通過檢查Future內部的狀態來獲知——任務完成時會修改這個狀態,將執行結果存進去。
最初的代碼示例可改寫為:
// 兩個future是并行的 val f1 = Future { get("http://server1") } val f2 = Future { get("http://server2") } compute(f1.get(), f2.get())高級模型 Rx
Rx (Reactive Extensions)是響應式編程的一種具體形式。響應式編程是一種面向數據流和變化傳播的編程模式。
我們知道Java 8提供了Stream類型,代表一個有限或無限的數據流,可應用map, filter, collect等操作。Rx類似于Stream,也是有限或無限的數據流,只不過數據操作可以委托給線程池異步執行。(Rx也像是生產者/消費者模型的延伸,增加了分發和轉換的能力。對數據流進行連接組合,這邊生產,那邊分發和轉換,源源不斷交給消費者。)
以RxJava為例:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
以Reactor為例:
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println);
由代碼可見,對數據流的操作很像是對集合的函數式操作,subscribe就是異步的forEach,doOnNext就是有返回值的異步的forEach。
主流實現有RxJava、Reactor、Akka Streams,API各有不同。但是它們都在靠攏Reactive Streams規范,想必會變得越來越相似。
async-awaitasync-await是一種特殊語法,能自動把同步風格代碼轉換成異步風格代碼。正確運用,就能使代碼在阻塞時自動讓出控制權。
C#內置的async-await是最完整的實現。Scala通過Async庫提供這個語法,代碼大概是這樣:
val future = async { println("Begin blocking") await { async {Thread.sleep(1000)} } println("End blocking") }
代碼會被自動轉換成多種future的組合形式。無需特意處理,能并行的部分都會自動并行。
FiberFiber是協程的仿制品。一般多線程是搶占式調度,你一個任務跑得好好的突然把你暫停;協程是協作式的,你一個任務阻塞或完成時要主動讓出控制權,讓調度器換入另一個任務。
async-await自動把代碼轉換成可自動讓出控制權的形式,已經有協程的雛形了。Fiber更加智能,連async-await語法都不用了,只要把代碼寫在Fiber里面,就像寫在Thread里面一樣,自動異步化了。
async-await只能暫存當前作用域(轉換成閉包),Fiber則能暫存整個執行棧(每個作用域只是一個棧幀)。當然了,運用嵌套的async-await也能暫存整個執行棧,我更贊同如此,因為能更好地控制內存占用。
JVM上主流的實現是Quasar,通過java-agent改寫字節碼來實現,在需要讓出控制權時拋出異常打斷控制流(不必擔心異常方面的性能開銷),保存執行棧,然后換入另一個任務。
Java示例:
new Fiber() { @Override protected V run() throws SuspendExecution, InterruptedException { // your code } }.start();
Kotlin示例:
fiber @Suspendable { // your code }
代碼中調用的任何會阻塞的方法都要標記@Suspendable,讓Quasar知道調這個方法時要暫停當前Fiber并執行另一個Fiber,同時用另外的線程池執行會阻塞的方法。
Actor起源于電信領域的Erlang的編程模型。actor是任務處理單元:每個actor只處理一個任務,每個任務同時只有一個actor處理(如果有大任務,就要分解成小任務),actor之間用消息來通信。
在Erlang中,每個actor是一個輕量級進程,有獨立的內存空間(所以通信只能靠消息),因此有獨立的垃圾回收,不會stop the world。
actor可以發了消息就不管了(tell),這是典型的異步;也可以發了消息等回應(ask),返回值是一個Future,實際上是創建了一個新的actor在悄悄等待回應,仍然是異步。
actor可以透明地分布在不同機器上,消息可以發給本機的actor,也可以發給遠程的actor。
JVM上唯一成熟的實現是Akka,JVM不能給每個actor獨立的內存,垃圾回收仍可能stop the world。
actor顯然是一個對象,擁有狀態和行為。
actor也可被視為一個閉包,擁有函數和上下文(整個對象的狀態都是上下文)。
actor每次能接收并處理一個消息,處理中可以發送消息給自己或另一個actor,然后掛起或結束。
為什么要發送消息給自己呢?因為正在處理消息時是不能掛起的,只能在“一個消息之后,下一個消息之前”的間隙中掛起。
假設你收到一個A消息,執行前半段業務邏輯,要做一次I/O再執行后半段業務邏輯。做I/O時應當結束當前處理,當IO完成時給自己發一個B消息,下次再讓你在處理B消息時完成剩余業務邏輯。前后邏輯要分開寫,共享變量要聲明為actor的對象字段。
偽代碼如下:
class MyActor extends BasicActor { var halfDoneResult: XXX = None def receive(): Receive = { case A => { halfDoneResult = 前半段邏輯() doIO(halfDoneResult).onComplete { self ! B() } } case B => 后半段邏輯(halfDoneResult) } }
當actor的狀態要徹底改變時,可以用become操作徹底改變actor的行為。從面向對象編程的設計模式來看,這是state pattern,從函數式編程來看,這是把一個函數變換成另一個函數。
由此可見,actor模型就是把函數表示成了更容易控制的對象,以便于滿足一些并發或分布式方面的架構約束。
這段邏輯假如改寫成async-await或fiber,偽代碼如下所示,簡單多了:
def logicInAsync() = async { val halfDoneResult = 前半段邏輯() await { doIO(halfDoneResult) } 后半段邏輯(halfDoneResult) } def logicInFiber() = fiber { val halfDoneResult = 前半段邏輯() doIO(halfDoneResult) 后半段邏輯(halfDoneResult) }Actor與分布式架構
可以看出,相比于async-await或Fiber,actor就是一種狀態機,是較為底層、不易用的編程模型。但是actor附帶了成熟的分布式能力。
我感覺actor很像異步版的EJB。EJB中有stateless session bean和stateful session bean,actor也可按stateless和stateful來分類。
PayPal的支付系統就是基于Akka的,還為此編寫并開源了一個Squbs框架。業務邏輯仍是用actor實現,Squbs只增加了集成和運維方面的支持(這個也重要)。然而我對此技術路線(業務邏輯基于actor)持審慎態度,接下來就分類說明我的意見:
無狀態的分布式架構我認為,此架構只需要三種通信模型:消息隊列、同步RPC、異步RPC。
消息隊列:異步的,只管發送消息,不等待返回結果(即使需要知道結果,讓consumer向sender回發一個消息即可,會異步觸發sender這邊的回調)。消息可能觸發遠端的一個任務,也可能觸發更多消息的發出,也可能什么都不觸發。
同步RPC:同步的,向遠程結點發送消息,保持當前的執行棧,同步等待回復。執行棧一直占著線程。簡單易懂而廣泛流行的模型。
異步RPC:異步的,向遠程結點發送消息,保持當前的執行棧,異步等待回復。執行??蓵簳r被換出線程,收到回復時再切回。
消息隊列、同步RPC都不需要Akka出場,自有各種MQ、RPC框架來解決。至于異步RPC,GRPC是一個跨語言的RPC框架,也可建造一個基于WebSocket協議的RPC框架。如果無需跨語言,也可讓Akka出場,但不是直接基于Akka編程——而是在Akka之上構建一個RPC層。如果功力較高,可直接基于Netty構建RPC層。
actor進行“請求-響應”往返通信時,在收到響應之前,請求端的actor要掛起、暫存在內存中。協程進行這種通信時,則是請求端的執行棧要掛起、暫存在內存中。
有狀態的分布式架構這是actor的龍興之地, 也是最合適的用武之地。
以即時聊天(IM)為例,用actor怎么實現呢?
如果每個actor對應一個人,1萬人只需要1萬個actor,1萬個連接。用戶A對用戶B說話時,actor A收到消息,轉發給actor B,由actor B發送給用戶B,反之亦然。
如果每個actor對應一個會話,最多需要1億(1萬×1萬)個actor,連接數不到1億(同一臺服務器與某個用戶的連接可供相關會話共用),但也過多了。
因此選擇第一種實現:每個actor對應一個人,actor要記得它對應哪個人、消息往來情況如何,這就是“狀態”!如果10萬用戶在線,就要10萬連接(這與IO多路復用無關,對吧?),單機顯然hold不住,需要多機。如果用actor A和actor B不在同一臺機器,就要遠程通信了。對基于Akka的程序來說,本地通信或遠程通信是透明的,贊!
其實不用actor也能實現,一切狀態和關系都能用數據結構來表達,只不過actor可能更方便一些。
總而言之,Akka模仿Erlang,精心設計了業務無關的actor的概念,然而越是精心設計的業務無關的概念越有可能不符合多變的業務需求:)。如果問我用不用actor,我只能說,看情況吧。也希望有哪位英雄能介紹一兩個非actor不可的場景。
再與RPC對比現在,假設有一個微服務架構,在眾多服務中有A、B、C三個服務,調用順序是A->B->C。RPC只能以A->B->C的方向請求,再以C->B->A的方向響應;actor則能讓C直接發送響應給A。但如果C要直接回復A,就要與A建立連接,使網絡拓撲和依賴管理都變復雜了——如非必要,勿增復雜。
為了避免,利用MQ來發送響應?MQ就像一個聊天服務,讓分布各處的服務能彼此聊天。IM、actor、MQ,一切都聯系起來了,有沒有感受到妙不可言的意境?
但是壓力集中到了MQ的broker,網絡也多了一跳(publisher->broker->consumer),對性能有所影響。
結語本文介紹、點評了JVM上多種常見的并發模型,并試圖建立模型之間的聯系,最后以分布式架構為例加以分析。
那么應用程序要怎么寫呢?看文檔吧,各種庫或框架都希望有人來用,滿足它們吧!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66259.html
摘要:本文介紹和點評上的等并發編程模型。異步更適合并發編程。同步使線程阻塞,導致等待。基本模型這是最簡單的模型,創建線程來執行一個任務,完畢后銷毀線程。響應式編程是一種面向數據流和變化傳播的編程模式。起源于電信領域的的編程模型。 本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發編程模型。本人經驗...
摘要:并發編程的挑戰并發編程的目的是為了讓程序運行的更快,但是,并不是啟動更多的線程就能讓程序最大限度的并發執行。的實現原理與應用在多線程并發編程中一直是元老級角色,很多人都會稱呼它為重量級鎖。 并發編程的挑戰 并發編程的目的是為了讓程序運行的更快,但是,并不是啟動更多的線程就能讓程序最大限度的并發執行。如果希望通過多線程執行任務讓程序運行的更快,會面臨非常多的挑戰:(1)上下文切換(2)死...
摘要:我的是忙碌的一年,從年初備戰實習春招,年三十都在死磕源碼,三月份經歷了阿里五次面試,四月順利收到實習。因為我心理很清楚,我的目標是阿里。所以在收到阿里之后的那晚,我重新規劃了接下來的學習計劃,將我的短期目標更新成拿下阿里轉正。 我的2017是忙碌的一年,從年初備戰實習春招,年三十都在死磕JDK源碼,三月份經歷了阿里五次面試,四月順利收到實習offer。然后五月懷著忐忑的心情開始了螞蟻金...
摘要:死亡狀態線程退出有可能是正常執行完成也有可能遇見異常退出。類有新建與死亡狀態返回其余狀態返回判斷線程是否存活。線程因某些原因進入阻塞狀態。執行同步代碼塊的過程中執行了當前線程放棄開始睡眠進入就緒狀態但是不會釋放鎖。 【java內存模型簡介 JVM中存在一個主存區(Main Memory或Java Heap Memory),Java中所有變量都是存在主存中的,對于所有線程進行共享,而每個...
閱讀 3752·2021-10-13 09:39
閱讀 3803·2021-09-24 09:48
閱讀 1202·2021-09-01 10:30
閱讀 2533·2019-08-30 15:55
閱讀 1786·2019-08-29 16:39
閱讀 2304·2019-08-26 13:55
閱讀 3057·2019-08-26 12:23
閱讀 1642·2019-08-26 11:59