摘要:共享內(nèi)存相信對并發(fā)有所了解的同學(xué)都應(yīng)該知道在推出后,對內(nèi)存管理有了更高標(biāo)準(zhǔn)的規(guī)范了,這使我們開發(fā)并發(fā)程序也有更好的標(biāo)準(zhǔn)了,不會有一些模糊的定義導(dǎo)致的無法確定的錯誤。
通過前幾篇的學(xué)習(xí),相信大家對Akka應(yīng)該有所了解了,都說解決并發(fā)哪家強,JVM上面找Akka,那么Akka到底在解決并發(fā)問題上幫我們做了什么呢?
共享內(nèi)存眾所周知,在處理并發(fā)問題上面,最核心的一部分就是如何處理共享內(nèi)存,很多時候我們都需要花費很多時間和精力在共享內(nèi)存上,那么在學(xué)習(xí)Akka對共享內(nèi)存是如何管理之前,我們先來看看Java中是怎么處理這個問題的。
Java共享內(nèi)存相信對Java并發(fā)有所了解的同學(xué)都應(yīng)該知道在Java5推出JSR 133后,Java對內(nèi)存管理有了更高標(biāo)準(zhǔn)的規(guī)范了,這使我們開發(fā)并發(fā)程序也有更好的標(biāo)準(zhǔn)了,不會有一些模糊的定義導(dǎo)致的無法確定的錯誤。
首先來看看一下Java內(nèi)存模型的簡單構(gòu)圖:
主內(nèi)存部分?jǐn)?shù)據(jù)的拷貝,線程對自己工作內(nèi)存的操作速度遠遠快于對主內(nèi)存的操作,但這也往往會引起共享變量不一致的問題,比如以下一個場景:
int a = 0; public void setA() { a = a + 1; }
上面是一個很簡單的例子,a是一個全局變量,然后我們有一個方法去修改這個值,每次增加一,假如我們用100個線程去運行這段代碼,那a最終的結(jié)果會是多少呢?
100?顯然不一定,它可能是80,90,或者其他數(shù),這就造成共享變量不一致的問題,那么為什么會導(dǎo)致這個問題呢,就是我們上面所說的,線程去修改a的時候可能就只是修改了自己工作內(nèi)存中a的副本,但并沒有將a的值及時的刷新到主內(nèi)存中,這便會導(dǎo)致其他線程可能讀到未被修改a的值,最終出現(xiàn)變量不一致問題。
那么Java中是怎么處理這種問題,如何保證共享變量的一致性的呢?
同步機制大體上Java中有3類同步機制,但它們所解決的問題并不相同,我們先來看一看這三種機制:
final關(guān)鍵詞
volatile關(guān)鍵詞
synchronized關(guān)鍵詞(這里代表了所有類似監(jiān)視鎖的機制)
寫過Java程序的同學(xué)對這個關(guān)鍵詞應(yīng)該再熟悉不過了,其基本含義就是不可變,不可變變量,比如:
final int a = 10; final String b = "hello";
不可變的含義在于當(dāng)你對這些變量或者對象賦初值后,不能再重新去賦值,但對于對象來說,我們不能修改的是它的引用,但是對象內(nèi)的內(nèi)容還是可以修改的。下面是一個簡單的例子:
final User u = new User(1,"a"); u.id = 2; //可以修改 u = new User(2,"b"); //不可修改
所以在利用final關(guān)鍵詞用來保證共享變量的一致性時一定要了解清楚自己的需求,選擇合適的方法,另外final變量必須在定義或者構(gòu)建對象的時候進行初始化,不然會報錯。
2.volatile關(guān)鍵詞很多同學(xué)在遇到共享變量不一致的問題后,都會說我在聲明變量前加一個volatile就好了,但事實真是這樣嘛?答案顯然不是。那我們來看看volatile到底為我們做了什么。
前面我們說過每個線程都有自己的工作內(nèi)存,很多時候線程去修改一個變量的值只是修改了自己工作內(nèi)存中副本的值,這便會導(dǎo)致主內(nèi)存的值并不是最新的,其他線程讀取到的變量便會出現(xiàn)問題。volatile幫我們解決了這個問題,它有兩個特點:
線程每次都會去主內(nèi)存中讀取變量
線程每次修改變量后的值都會及時更新到主內(nèi)存中去
舉個例子:
volatile int a = 0; public void setA() { a = a + 1; }
現(xiàn)在線程在執(zhí)行這段代碼時,都會強制去主內(nèi)存中讀取變量的值,修改后也會馬上更新到主內(nèi)存中去,但是這真的能解決共享變量不一致的問題嘛,其實不然,比如我們有這么一個場景:兩個線程同時讀取了主內(nèi)存中變量最新的值,這是我們兩個線程都去執(zhí)行修改操作,最后結(jié)果會是什么呢?這里就留給大家自己去思考了,其實也很簡單的。
那么volatile在什么場景下能保證線程安全,按照官方來說,有以下兩個條件:
對變量的寫操作不依賴于當(dāng)前值
該變量沒有包含在具有其他變量的不變式中
多的方面這里我就不展開了,推薦兩篇我覺得寫的還不錯的文章:volatile的使用及其原理volatile的適用場景
3.synchronized關(guān)鍵詞很多同學(xué)在學(xué)習(xí)Java并發(fā)過程中最先接觸的就是synchronized關(guān)鍵詞了,它確實能解決我們上述的并發(fā)問題,那它到時如何幫我們保證共享變量的一致性的呢?
簡而言之的說,線程在訪問請求用synchronized關(guān)鍵詞修飾的方法,代碼塊都會要求獲得一個監(jiān)視器鎖,當(dāng)線程獲得了監(jiān)視器鎖后,它才有權(quán)限去執(zhí)行相應(yīng)的方法或代碼塊,并在執(zhí)行結(jié)束后釋放監(jiān)視器鎖,這便能保證共享內(nèi)存的一致性了,因為本文主要是講Akka的共享內(nèi)存,過多的篇幅就不展開了,這里推薦一篇解析synchronized原理很不錯的文章,有興趣的同學(xué)可以去看看:Synchronized及其實現(xiàn)原理
Akka共享內(nèi)存Akka中的共享內(nèi)存是基于Actor模型的,Actor模型提倡的是:通過通訊來實現(xiàn)共享內(nèi)存,而不是用共享內(nèi)存來實現(xiàn)通訊,這點是跟Java解決共享內(nèi)存最大的區(qū)別,舉個例子:
在Java中我們要去操作共享內(nèi)存中數(shù)據(jù)時,每個線程都需要不斷的獲取共享內(nèi)存的監(jiān)視器鎖,然后將操作后的數(shù)據(jù)暴露給其他線程訪問使用,用共享內(nèi)存來實現(xiàn)各個線程之間的通訊,而在Akka中我們可以將共享可變的變量作為一個Actor內(nèi)部的狀態(tài),利用Actor模型本身串行處理消息的機制來保證變量的一致性。
當(dāng)然要使用Akka中的機制也必須滿足一下兩條原則:
消息的發(fā)送必須先于消息的接收
同一個Actor對一條消息的處理先于下一條消息處理
第二個原則很好理解,就是上面我們說的Actor內(nèi)部是串行處理消息,那我們來看看第一個原則,為什么要保證消息的發(fā)送先于消息的接收,是為了防止我們在創(chuàng)建消息的時候發(fā)生了不確定的錯誤,接收者將可能接收到不正確的消息,導(dǎo)致發(fā)生奇怪的異常,主要表現(xiàn)為消息對象未初始化完整時,若沒有這條規(guī)則保證,Actor收到的消息便會不完整。
通過前面的學(xué)習(xí)我們知道Actor是一種比線程更輕量級,抽象程度更高的一種結(jié)構(gòu),它幫我們規(guī)避了我們自己去操作線程,那么Akka底層到底是怎么幫我們?nèi)ケWC共享內(nèi)存的一致性的呢?
一個Actor它可能會有很多線程同時向它發(fā)送消息,之前我們也說到Actor本身是串行處理的消息的,那它是如何保障這種機制的呢?
MailboxMailbox在Actor模型是一個很重要的概念,我們都知道向一個Actor發(fā)送的消息首先都會被存儲到它所對應(yīng)的Mailbox中,那么我們先來看看MailBox的定義結(jié)構(gòu)(本文所引用的代碼都在akka.dispatch.Mailbox.scala中,有興趣的同學(xué)也可以去研究一下):
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}
很清晰Mailbox內(nèi)部維護了一個messageQueue這樣的消息隊列,并繼承了Scala自身定義的ForkJoinTask任務(wù)執(zhí)行類和我們很熟悉的Runnable接口,由此可以看出,Mailbox底層還是利用Java中的線程進行處理的。那么我們先來看看它的run方法:
override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } }
為了配合理解,我們這里先來看一下定義:
@inline final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline final def isClosed: Boolean = currentStatus == Closed
這里我們可以看出Mailbox本身會維護一個狀態(tài)Mailbox.Status,是一個Int變量,而且是可變的,并且用到volatile來保證了它的可見性:
@volatile protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
現(xiàn)在我們在回去看上面的代碼,run方法的執(zhí)行過程,首先它會去讀取MailBox此時的狀態(tài),因為是一個Volatile read,所以能保證讀取到的是最新的值,然后它會先處理任何的系統(tǒng)消息,這部分不需要我們太過關(guān)心,之后便是執(zhí)行我們發(fā)送的消息,這里我們需要詳細(xì)看一下processMailbox()的實現(xiàn):
@tailrec private final def processMailbox( left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { val next = dequeue() //去出下一條消息 if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next if (Thread.interrupted()) throw new InterruptedException("Interrupted while processing actor messages") processAllSystemMessages() if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) processMailbox(left - 1, deadlineNs) //遞歸處理下一條消息 } }
從上述代碼中我們可以清晰的看到,當(dāng)滿足消息處理的情況下就會進行消息處理,從消息隊列列取出下一條消息就是上面的dequeue(),然后將消息發(fā)給具體的Actor進行處理,接下去又是處理系統(tǒng)消息,然后判斷是否還有滿足情況需要下一條消息,若有則再次進行處理,可以看成一個遞歸操作,@tailrec也說明了這一點,它表示的是讓編譯器進行尾遞歸優(yōu)化。
現(xiàn)在我們來看一下一條消息從發(fā)送到最終處理在Akka中到底是怎么執(zhí)行的,下面的內(nèi)容是我通過閱讀Akka源碼加自身理解得出的,這里先畫了一張流程圖:
消息的大致流程我都在圖中給出了,還有一些細(xì)節(jié),必須序列化消息,獲取狀態(tài)等就沒有具體說明有興趣的同學(xué)可以自己去閱讀以下Akka的源碼,個人覺得Akka的源碼閱讀性還是很好的,比如:
基本沒有方法超過20行
不會有過多的注釋,但關(guān)鍵部分會給出,更能加深自己的理解
當(dāng)然也有一些困擾,我們在不了解各個類,接口之間的關(guān)系時,閱讀體驗就會變得很糟糕,當(dāng)然我用IDEA很快就解決了這個問題。
我們這里來看看關(guān)鍵的部分:Actor是如何保證串行處理消息的?
上圖中有一根判定,是否已有線程在執(zhí)行任務(wù)?我們來看看這個判定的具體邏輯:
@tailrec final def setAsScheduled(): Boolean = { //是否有線程正在調(diào)度執(zhí)行該MailBox的任務(wù) val s = currentStatus /* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. */ if ((s & shouldScheduleMask) != Open) false else updateStatus(s, s | Scheduled) || setAsScheduled() }
從注釋和代碼的邏輯上我們可以看出當(dāng)已有線程在執(zhí)行返回false,若沒有則去更改狀態(tài)為以調(diào)度,直到被其他線程搶占或者更改成功,其中updateStatus()是線程安全的,我們可以看一下它的實現(xiàn),是一個CAS操作:
@inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
到這里我們應(yīng)該可以大致清楚Actor內(nèi)部是如何保證共享內(nèi)存的一致性了,Actor接收消息是多線程的,但處理消息是單線程的,利用MailBox中的Status來保障這一機制。
總結(jié)通過上面的內(nèi)容我們可以總結(jié)出以下幾點:
Akka并不是說用了什么特殊魔法來保證并發(fā)的,底層使用的還是Java和JVM的同步機制
Akka并沒有使用任何的鎖機制,這就避免了死鎖的可能性
Akka并發(fā)執(zhí)行的處理并沒有使用線程切換,不僅提高了線程的使用效率,也大大減少了線程切換消耗
Akka為我們提供了更高層次的并發(fā)抽象模型,讓我們不必關(guān)心底層的實現(xiàn),只需著重實現(xiàn)業(yè)務(wù)邏輯就行,遵循它的規(guī)范,讓框架幫我們處理一切難點吧
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70070.html
摘要:是一個構(gòu)建在上,基于模型的的并發(fā)框架,為構(gòu)建伸縮性強,有彈性的響應(yīng)式并發(fā)應(yīng)用提高更好的平臺。上述例子中的信件就相當(dāng)于中的消息,與之間只能通過消息通信。當(dāng)然模型比這要復(fù)雜的多,這里主要是簡潔的闡述一下模型的概念。模型的出現(xiàn)解決了這個問題。 Akka是一個構(gòu)建在JVM上,基于Actor模型的的并發(fā)框架,為構(gòu)建伸縮性強,有彈性的響應(yīng)式并發(fā)應(yīng)用提高更好的平臺。本文主要是個人對Akka的學(xué)習(xí)和應(yīng)...
摘要:原文鏈接解決了什么問題使用模型來克服傳統(tǒng)面向?qū)ο缶幊棠P偷木窒扌?,并?yīng)對高并發(fā)分布式系統(tǒng)所帶來的挑戰(zhàn)。在某些情況,這個問題可能會變得更糟糕,工作線程發(fā)生了錯誤但是其自身卻無法恢復(fù)。 這段時間由于忙畢業(yè)前前后后的事情,拖更了很久,表示非常抱歉,回歸后的第一篇文章主要是看到了Akka最新文檔中寫的What problems does the actor model solve?,閱讀完后覺...
摘要:關(guān)于三者的一些概括總結(jié)離線分析框架,適合離線的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計算框架,適合在線離線快速的大數(shù)據(jù)處理流式計算框架,適合在線的實時的大數(shù)據(jù)處理我是一個以架構(gòu)師為年之內(nèi)目標(biāo)的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計算.做了相應(yīng)補充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲的分布式,它們最終目的都是為了實現(xiàn)計算的分布式:數(shù)據(jù)在各個計算機節(jié)點上流動,同...
摘要:源碼鏈接進階持久化插件有同學(xué)可能會問,我對不是很熟悉亦或者覺得單機存儲并不是安全,有沒有支持分布式數(shù)據(jù)存儲的插件呢,比如某爸的云數(shù)據(jù)庫答案當(dāng)然是有咯,良心的我當(dāng)然是幫你們都找好咯。 這次把這部分內(nèi)容提到現(xiàn)在寫,是因為這段時間開發(fā)的項目剛好在這一塊遇到了一些難點,所以準(zhǔn)備把經(jīng)驗分享給大家,我們在使用Akka時,會經(jīng)常遇到一些存儲Actor內(nèi)部狀態(tài)的場景,在系統(tǒng)正常運行的情況下,我們不需要...
閱讀 3393·2023-04-26 01:46
閱讀 2918·2023-04-25 20:55
閱讀 5491·2021-09-22 14:57
閱讀 2982·2021-08-27 16:23
閱讀 1720·2019-08-30 14:02
閱讀 2071·2019-08-26 13:44
閱讀 653·2019-08-26 12:08
閱讀 2965·2019-08-26 11:47