摘要:本文介紹和點評上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線程阻塞,導致等待。基本模型這是最簡單的模型,創(chuàng)建線程來執(zhí)行一個任務,完畢后銷毀線程。響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領域的的編程模型。
本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗有限,難免粗陋,還請高手多多指教。
我們知道程序分為同步風格和異步風格。
可以寫成同步風格用多個線程來并發(fā)執(zhí)行。
也可以寫成異步風格以支持更為靈活的調(diào)度。
異步更適合并發(fā)編程。
為什么要異步異步的目的:充分利用計算資源。
同步使線程阻塞,導致等待。
異步是非阻塞的,無需等待。
如果發(fā)生了不必要的等待,就會浪費資源,使程序變慢。
比如這樣的程序:
val res1 = get("http://server1") val res2 = get("http://server2") compute(res1, res2)
按照同步編程風格,一定要先拿到res1,才能開始拿res2。
按照異步編程風格,res1和res2互不依賴,發(fā)起對res1的獲取后,不必等待結(jié)果,而是馬上發(fā)起對res2的獲取,到了compute的時候,才需要阻塞等待兩個數(shù)據(jù)。
這是一種“順序解耦”。有時候我們并不要求某些操作按順序執(zhí)行!那么為什么要強制其順序呢?異步風格讓我們能放棄強制,解放資源,減少不必要的等待。
如果異步操作能并行,程序性能就提升了,如果不能并行,程序性能就沒有提升。在當今的硬件條件下,一般都能并行,所以異步成為了趨勢。
怎么個并行法?這要從計算機架構說起了。讓我們把任何有處理能力的硬件看做一個處理單元——CPU顯然是主要的處理單元,I/O設備也是處理單元,比如說網(wǎng)卡、內(nèi)存控制器、硬盤控制器。CPU可以向一或多個I/O設備發(fā)出請求,當設備在準備數(shù)據(jù)時,CPU可以做其他事情(設備就緒后會用中斷通知CPU),這時就有n個硬件在并行了!況且CPU本就是多核的,能做并行計算。除此之外,在分布式系統(tǒng)中,能同時調(diào)動多臺計算機配合完成任務,也是并行。
因此,讓CPU等待、每次只請求一個I/O設備、不利用多核、不利用其他空閑的計算機,都是比較浪費的。
下面我們來分析常見的并發(fā)編程模型。
基本模型 Thread這是最簡單的模型,創(chuàng)建線程來執(zhí)行一個任務,完畢后銷毀線程。當任務數(shù)量大時,會創(chuàng)建大量的線程。
大家都知道大量的線程會降低性能,但是你真的清楚性能開銷在哪里嗎?我試列舉一下:
創(chuàng)建線程
創(chuàng)建一個線程是比較耗時間的。需要請求操作系統(tǒng)、分配棧空間、初始化等工作。
上下文切換
大家都知道的,操作系統(tǒng)基本概念,不再贅述。值得注意的是,WAITING狀態(tài)的線程(多見于I/O等待)幾乎不會被調(diào)度,因此并不導致過多的上下文切換。
CPU cache miss
大量線程頻繁切換,勢必要訪問不同的數(shù)據(jù),打亂了空間局部性,導致CPU cache miss增加,需要經(jīng)常訪問更慢的內(nèi)存,會明顯影響CPU密集型程序的性能,這點大家恐怕沒想到吧。
內(nèi)存占用
線程會增加內(nèi)存占用,線程的棧空間通常占1MB,1000個就是1GB。而且在棧上引用了很多對象,暫時不能回收,你說有多少個GB?
資源占用
一些有限的資源,如鎖、數(shù)據(jù)庫連接、文件句柄等,當線程被掛起或阻塞,就暫時無人可用了,浪費!還有死鎖風險!
那么分配多少線程好呢?
對于I/O密集型程序:一個經(jīng)驗數(shù)值是兩倍于數(shù)據(jù)庫連接數(shù),例如你有30個數(shù)據(jù)庫連接,就開60個線程;我還有個經(jīng)驗數(shù)值是500以下,超過500就慢一些,如果調(diào)用棧特別深,這個數(shù)值還要下調(diào)。
對于CPU密集型程序:我的經(jīng)驗數(shù)值是略多于CPU核心數(shù) (理論上是等于,但你難免有幾個阻塞操作)。除了核心數(shù),還要考慮CPU cache的大小,最好實際測試一下。舉個例子,某司內(nèi)部的自動重構程序在Intel i7 3630QM CPU上測試,3~4個線程效果最好。
傳統(tǒng)的網(wǎng)絡程序是每個會話占用一個連接、一個線程。I/O多路復用(I/O multiplexing:多個會話共用一個連接)是應C10K問題而生的,C10K就是1萬個連接。1萬個連接是很耗系統(tǒng)資源的,何況還有1萬個線程。從上文的分析可知,C1K的時候就可以開始運用I/O多路復用了。
Thread Pool預留一些可反復使用的線程在一個池里,反復地接受任務。線程數(shù)量可能是固定的,也可能是一定范圍內(nèi)變動的,依所選擇的線程池的實現(xiàn)而定。
這個模型是極其常用的,例如Tomcat就是用線程池來處理請求的。
注意——盡量不要阻塞任務線程;若實在無法避免,多開一些線程——每阻塞一個線程,線程池就少一個可用的線程。
Java典型的線程池有Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newFixedThreadPool Executors.newScheduledThreadPool等等,也可以直接new ThreadPoolExecutor(可指定線程數(shù)的上限和下限)。
Scala沒有增加新的線程池種類,但有個blocking方法能告訴線程池某個調(diào)用會阻塞,需要臨時增加1個線程。
FutureFuture是一個未來將會有值的對象,相當于一個占位符(提貨憑證!)。
將任務投入線程池執(zhí)行時,可為任務綁定一個Future,憑此Future即可在未來取得任務執(zhí)行結(jié)果。未來是什么時候呢?要通過檢查Future內(nèi)部的狀態(tài)來獲知——任務完成時會修改這個狀態(tài),將執(zhí)行結(jié)果存進去。
最初的代碼示例可改寫為:
// 兩個future是并行的 val f1 = Future { get("http://server1") } val f2 = Future { get("http://server2") } compute(f1.get(), f2.get())高級模型 Rx
Rx (Reactive Extensions)是響應式編程的一種具體形式。響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。
我們知道Java 8提供了Stream類型,代表一個有限或無限的數(shù)據(jù)流,可應用map, filter, collect等操作。Rx類似于Stream,也是有限或無限的數(shù)據(jù)流,只不過數(shù)據(jù)操作可以委托給線程池異步執(zhí)行。(Rx也像是生產(chǎn)者/消費者模型的延伸,增加了分發(fā)和轉(zhuǎn)換的能力。對數(shù)據(jù)流進行連接組合,這邊生產(chǎn),那邊分發(fā)和轉(zhuǎn)換,源源不斷交給消費者。)
以RxJava為例:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
以Reactor為例:
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println);
由代碼可見,對數(shù)據(jù)流的操作很像是對集合的函數(shù)式操作,subscribe就是異步的forEach,doOnNext就是有返回值的異步的forEach。
主流實現(xiàn)有RxJava、Reactor、Akka Streams,API各有不同。但是它們都在靠攏Reactive Streams規(guī)范,想必會變得越來越相似。
async-awaitasync-await是一種特殊語法,能自動把同步風格代碼轉(zhuǎn)換成異步風格代碼。正確運用,就能使代碼在阻塞時自動讓出控制權。
C#內(nèi)置的async-await是最完整的實現(xiàn)。Scala通過Async庫提供這個語法,代碼大概是這樣:
val future = async { println("Begin blocking") await { async {Thread.sleep(1000)} } println("End blocking") }
代碼會被自動轉(zhuǎn)換成多種future的組合形式。無需特意處理,能并行的部分都會自動并行。
FiberFiber是協(xié)程的仿制品。一般多線程是搶占式調(diào)度,你一個任務跑得好好的突然把你暫停;協(xié)程是協(xié)作式的,你一個任務阻塞或完成時要主動讓出控制權,讓調(diào)度器換入另一個任務。
async-await自動把代碼轉(zhuǎn)換成可自動讓出控制權的形式,已經(jīng)有協(xié)程的雛形了。Fiber更加智能,連async-await語法都不用了,只要把代碼寫在Fiber里面,就像寫在Thread里面一樣,自動異步化了。
async-await只能暫存當前作用域(轉(zhuǎn)換成閉包),F(xiàn)iber則能暫存整個執(zhí)行棧(每個作用域只是一個棧幀)。當然了,運用嵌套的async-await也能暫存整個執(zhí)行棧,我更贊同如此,因為能更好地控制內(nèi)存占用。
JVM上主流的實現(xiàn)是Quasar,通過java-agent改寫字節(jié)碼來實現(xiàn),在需要讓出控制權時拋出異常打斷控制流(不必擔心異常方面的性能開銷),保存執(zhí)行棧,然后換入另一個任務。
Java示例:
new Fiber() { @Override protected V run() throws SuspendExecution, InterruptedException { // your code } }.start();
Kotlin示例:
fiber @Suspendable { // your code }
代碼中調(diào)用的任何會阻塞的方法都要標記@Suspendable,讓Quasar知道調(diào)這個方法時要暫停當前Fiber并執(zhí)行另一個Fiber,同時用另外的線程池執(zhí)行會阻塞的方法。
Actor起源于電信領域的Erlang的編程模型。actor是任務處理單元:每個actor只處理一個任務,每個任務同時只有一個actor處理(如果有大任務,就要分解成小任務),actor之間用消息來通信。
在Erlang中,每個actor是一個輕量級進程,有獨立的內(nèi)存空間(所以通信只能靠消息),因此有獨立的垃圾回收,不會stop the world。
actor可以發(fā)了消息就不管了(tell),這是典型的異步;也可以發(fā)了消息等回應(ask),返回值是一個Future,實際上是創(chuàng)建了一個新的actor在悄悄等待回應,仍然是異步。
actor可以透明地分布在不同機器上,消息可以發(fā)給本機的actor,也可以發(fā)給遠程的actor。
JVM上唯一成熟的實現(xiàn)是Akka,JVM不能給每個actor獨立的內(nèi)存,垃圾回收仍可能stop the world。
actor顯然是一個對象,擁有狀態(tài)和行為。
actor也可被視為一個閉包,擁有函數(shù)和上下文(整個對象的狀態(tài)都是上下文)。
actor每次能接收并處理一個消息,處理中可以發(fā)送消息給自己或另一個actor,然后掛起或結(jié)束。
為什么要發(fā)送消息給自己呢?因為正在處理消息時是不能掛起的,只能在“一個消息之后,下一個消息之前”的間隙中掛起。
假設你收到一個A消息,執(zhí)行前半段業(yè)務邏輯,要做一次I/O再執(zhí)行后半段業(yè)務邏輯。做I/O時應當結(jié)束當前處理,當IO完成時給自己發(fā)一個B消息,下次再讓你在處理B消息時完成剩余業(yè)務邏輯。前后邏輯要分開寫,共享變量要聲明為actor的對象字段。
偽代碼如下:
class MyActor extends BasicActor { var halfDoneResult: XXX = None def receive(): Receive = { case A => { halfDoneResult = 前半段邏輯() doIO(halfDoneResult).onComplete { self ! B() } } case B => 后半段邏輯(halfDoneResult) } }
當actor的狀態(tài)要徹底改變時,可以用become操作徹底改變actor的行為。從面向?qū)ο缶幊痰脑O計模式來看,這是state pattern,從函數(shù)式編程來看,這是把一個函數(shù)變換成另一個函數(shù)。
由此可見,actor模型就是把函數(shù)表示成了更容易控制的對象,以便于滿足一些并發(fā)或分布式方面的架構約束。
這段邏輯假如改寫成async-await或fiber,偽代碼如下所示,簡單多了:
def logicInAsync() = async { val halfDoneResult = 前半段邏輯() await { doIO(halfDoneResult) } 后半段邏輯(halfDoneResult) } def logicInFiber() = fiber { val halfDoneResult = 前半段邏輯() doIO(halfDoneResult) 后半段邏輯(halfDoneResult) }Actor與分布式架構
可以看出,相比于async-await或Fiber,actor就是一種狀態(tài)機,是較為底層、不易用的編程模型。但是actor附帶了成熟的分布式能力。
我感覺actor很像異步版的EJB。EJB中有stateless session bean和stateful session bean,actor也可按stateless和stateful來分類。
PayPal的支付系統(tǒng)就是基于Akka的,還為此編寫并開源了一個Squbs框架。業(yè)務邏輯仍是用actor實現(xiàn),Squbs只增加了集成和運維方面的支持(這個也重要)。然而我對此技術路線(業(yè)務邏輯基于actor)持審慎態(tài)度,接下來就分類說明我的意見:
無狀態(tài)的分布式架構我認為,此架構只需要三種通信模型:消息隊列、同步RPC、異步RPC。
消息隊列:異步的,只管發(fā)送消息,不等待返回結(jié)果(即使需要知道結(jié)果,讓consumer向sender回發(fā)一個消息即可,會異步觸發(fā)sender這邊的回調(diào))。消息可能觸發(fā)遠端的一個任務,也可能觸發(fā)更多消息的發(fā)出,也可能什么都不觸發(fā)。
同步RPC:同步的,向遠程結(jié)點發(fā)送消息,保持當前的執(zhí)行棧,同步等待回復。執(zhí)行棧一直占著線程。簡單易懂而廣泛流行的模型。
異步RPC:異步的,向遠程結(jié)點發(fā)送消息,保持當前的執(zhí)行棧,異步等待回復。執(zhí)行棧可暫時被換出線程,收到回復時再切回。
消息隊列、同步RPC都不需要Akka出場,自有各種MQ、RPC框架來解決。至于異步RPC,GRPC是一個跨語言的RPC框架,也可建造一個基于WebSocket協(xié)議的RPC框架。如果無需跨語言,也可讓Akka出場,但不是直接基于Akka編程——而是在Akka之上構建一個RPC層。如果功力較高,可直接基于Netty構建RPC層。
actor進行“請求-響應”往返通信時,在收到響應之前,請求端的actor要掛起、暫存在內(nèi)存中。協(xié)程進行這種通信時,則是請求端的執(zhí)行棧要掛起、暫存在內(nèi)存中。
有狀態(tài)的分布式架構這是actor的龍興之地, 也是最合適的用武之地。
以即時聊天(IM)為例,用actor怎么實現(xiàn)呢?
如果每個actor對應一個人,1萬人只需要1萬個actor,1萬個連接。用戶A對用戶B說話時,actor A收到消息,轉(zhuǎn)發(fā)給actor B,由actor B發(fā)送給用戶B,反之亦然。
如果每個actor對應一個會話,最多需要1億(1萬×1萬)個actor,連接數(shù)不到1億(同一臺服務器與某個用戶的連接可供相關會話共用),但也過多了。
因此選擇第一種實現(xiàn):每個actor對應一個人,actor要記得它對應哪個人、消息往來情況如何,這就是“狀態(tài)”!如果10萬用戶在線,就要10萬連接(這與IO多路復用無關,對吧?),單機顯然hold不住,需要多機。如果用actor A和actor B不在同一臺機器,就要遠程通信了。對基于Akka的程序來說,本地通信或遠程通信是透明的,贊!
其實不用actor也能實現(xiàn),一切狀態(tài)和關系都能用數(shù)據(jù)結(jié)構來表達,只不過actor可能更方便一些。
總而言之,Akka模仿Erlang,精心設計了業(yè)務無關的actor的概念,然而越是精心設計的業(yè)務無關的概念越有可能不符合多變的業(yè)務需求:)。如果問我用不用actor,我只能說,看情況吧。也希望有哪位英雄能介紹一兩個非actor不可的場景。
再與RPC對比現(xiàn)在,假設有一個微服務架構,在眾多服務中有A、B、C三個服務,調(diào)用順序是A->B->C。RPC只能以A->B->C的方向請求,再以C->B->A的方向響應;actor則能讓C直接發(fā)送響應給A。但如果C要直接回復A,就要與A建立連接,使網(wǎng)絡拓撲和依賴管理都變復雜了——如非必要,勿增復雜。
為了避免,利用MQ來發(fā)送響應?MQ就像一個聊天服務,讓分布各處的服務能彼此聊天。IM、actor、MQ,一切都聯(lián)系起來了,有沒有感受到妙不可言的意境?
但是壓力集中到了MQ的broker,網(wǎng)絡也多了一跳(publisher->broker->consumer),對性能有所影響。
結(jié)語本文介紹、點評了JVM上多種常見的并發(fā)模型,并試圖建立模型之間的聯(lián)系,最后以分布式架構為例加以分析。
那么應用程序要怎么寫呢?看文檔吧,各種庫或框架都希望有人來用,滿足它們吧!
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/11758.html
摘要:本文介紹和點評上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線程阻塞,導致等待。基本模型這是最簡單的模型,創(chuàng)建線程來執(zhí)行一個任務,完畢后銷毀線程。響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領域的的編程模型。 本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗...
摘要:并發(fā)編程的挑戰(zhàn)并發(fā)編程的目的是為了讓程序運行的更快,但是,并不是啟動更多的線程就能讓程序最大限度的并發(fā)執(zhí)行。的實現(xiàn)原理與應用在多線程并發(fā)編程中一直是元老級角色,很多人都會稱呼它為重量級鎖。 并發(fā)編程的挑戰(zhàn) 并發(fā)編程的目的是為了讓程序運行的更快,但是,并不是啟動更多的線程就能讓程序最大限度的并發(fā)執(zhí)行。如果希望通過多線程執(zhí)行任務讓程序運行的更快,會面臨非常多的挑戰(zhàn):(1)上下文切換(2)死...
摘要:我的是忙碌的一年,從年初備戰(zhàn)實習春招,年三十都在死磕源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實習。因為我心理很清楚,我的目標是阿里。所以在收到阿里之后的那晚,我重新規(guī)劃了接下來的學習計劃,將我的短期目標更新成拿下阿里轉(zhuǎn)正。 我的2017是忙碌的一年,從年初備戰(zhàn)實習春招,年三十都在死磕JDK源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實習offer。然后五月懷著忐忑的心情開始了螞蟻金...
摘要:死亡狀態(tài)線程退出有可能是正常執(zhí)行完成也有可能遇見異常退出。類有新建與死亡狀態(tài)返回其余狀態(tài)返回判斷線程是否存活。線程因某些原因進入阻塞狀態(tài)。執(zhí)行同步代碼塊的過程中執(zhí)行了當前線程放棄開始睡眠進入就緒狀態(tài)但是不會釋放鎖。 【java內(nèi)存模型簡介 JVM中存在一個主存區(qū)(Main Memory或Java Heap Memory),Java中所有變量都是存在主存中的,對于所有線程進行共享,而每個...
閱讀 1767·2021-11-18 13:20
閱讀 1159·2021-10-11 10:59
閱讀 2994·2021-08-24 10:01
閱讀 3505·2019-08-29 14:21
閱讀 3356·2019-08-29 14:15
閱讀 3521·2019-08-26 12:23
閱讀 3347·2019-08-26 11:46
閱讀 3353·2019-08-26 11:35