国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RocketMQ架構(gòu)原理解析(二):消息存儲(chǔ)

番茄西紅柿 / 3349人閱讀

摘要:此處補(bǔ)充說(shuō)明下,不論是還是都不提供指定區(qū)間的刷盤(pán)策略,只提供一個(gè)方法,所以無(wú)法精確控制落盤(pán)數(shù)據(jù)的大小。

一、概述

由前文可知,RocketMQ有幾個(gè)非常重要的概念:

  • broker 服務(wù)端,負(fù)責(zé)存儲(chǔ)、收發(fā)消息
  • producer 客戶端1,負(fù)責(zé)產(chǎn)生消息
  • consumer 客服端2,負(fù)責(zé)消費(fèi)消息

既然是消息隊(duì)列,那消息的存儲(chǔ)的重要程度不言而喻,本節(jié)我們聚焦broker服務(wù)端,看下消息在broker端是如何存儲(chǔ)的,它的落盤(pán)策略是怎樣的,又是如何保證高效

另:后文的RocketMQ都是基于版本4.9.3

二、寫(xiě)入流程

RocketMQ的普通單消息寫(xiě)入流程如下
消息寫(xiě)入流程

簡(jiǎn)單可以分為三大塊:

  • 寫(xiě)入前準(zhǔn)備
  • 加鎖后消息寫(xiě)入
  • 消息落盤(pán)及集群同步

2.1 準(zhǔn)備

其實(shí)消息的寫(xiě)入準(zhǔn)備工作也比較好理解,主要是消息狀態(tài)的檢查以及各類存儲(chǔ)狀態(tài)的檢查,可以參看上圖中的流程

根據(jù)上圖,在準(zhǔn)備階段前,RocketMQ會(huì)判斷操作系統(tǒng)的Page Cache是否繁忙,他是怎么做到的呢?其實(shí)Java本身沒(méi)有提供接口或函數(shù)來(lái)查看Page Cache的狀態(tài),但如果磁盤(pán)帶寬已經(jīng)打滿,在Page Cache要將數(shù)據(jù)刷disk時(shí),很有可能便陷入了阻塞,導(dǎo)致Page Cache資源緊張。而當(dāng)我們的程序又有新的消息要寫(xiě)入Page Cache時(shí),反向阻塞寫(xiě)入請(qǐng)求,我們說(shuō)這時(shí)Page Cache就產(chǎn)生了回壓,也就是Page Cache相當(dāng)繁忙,請(qǐng)求已經(jīng)不能及時(shí)處理了。RocketMQ判斷Page Cache是否繁忙的條件也很簡(jiǎn)單,就是監(jiān)控某個(gè)請(qǐng)求加鎖后,寫(xiě)入是否超過(guò)1秒,如果超時(shí)的話,新的請(qǐng)求會(huì)快速失敗

2.2 消息協(xié)議

RocketMQ有一套相對(duì)復(fù)雜的消息協(xié)議編碼,大部分協(xié)議中的內(nèi)容都是在加鎖前拼接生成
rmq消息存儲(chǔ)格式

大部分消息協(xié)議項(xiàng)都是定長(zhǎng)字段,變長(zhǎng)字段如下:

  • 1、born inet 產(chǎn)生消息的producer的IP信息 ipv4占用4byte,ipv6占16byte
  • 2、broker inet 接收消息的broker的IP信息 ipv4占用4byte,ipv6占16byte
  • 3、msg content 消息內(nèi)容 變長(zhǎng)字段(1-21億)byte
  • 4、topic content 消息內(nèi)容 變長(zhǎng)字段(1-127)byte
  • 5、properties content 屬性內(nèi)容 變長(zhǎng)字段(0-32767)byte

2.3 加鎖

此處rmq提供了2種加鎖方式

  • 1、基于AQS的ReentrantLock (默認(rèn)方式)
  • 2、基于CAS的自旋鎖,加鎖不成功的話,會(huì)無(wú)限重試

無(wú)論采用哪種策略,都是獨(dú)占鎖,即同一時(shí)刻只允許一個(gè)線程加鎖成功。具體采用哪種方式,可通過(guò)配置修改。

兩種加鎖適用不同的場(chǎng)景,方式1在高并發(fā)場(chǎng)景下,能保持平穩(wěn)的系統(tǒng)性能,但在低并發(fā)下表現(xiàn)一般;而方式二正好相反,在高并發(fā)場(chǎng)景下,因?yàn)椴捎米孕瑫?huì)浪費(fèi)大量的cpu,但在低并發(fā)時(shí),卻可以獲得很高的性能。

所以官方文檔中,為了提高性能,建議用戶在同步刷盤(pán)的時(shí)候采用獨(dú)占鎖,異步刷盤(pán)的時(shí)候采用自旋鎖。這個(gè)是根據(jù)加鎖時(shí)間長(zhǎng)短決定的

2.4 鎖內(nèi)操作

上文提到,寫(xiě)入消息的鎖是獨(dú)占鎖,也就意味著同一時(shí)刻,只能有一個(gè)線程進(jìn)入,我們看一下鎖內(nèi)都做了哪些操作

  • 1、拿到或創(chuàng)建文件操作對(duì)象MappedFile
    • 此處涉及點(diǎn)較多,我們?cè)谖募?xiě)入大節(jié)詳細(xì)展開(kāi)
  • 2、二次整理要落盤(pán)的消息格式
    • 之前已經(jīng)整理過(guò)消息協(xié)議了,為什么此處還要進(jìn)行二次整理?因?yàn)橹耙恍┫f(xié)議在沒(méi)有加鎖的時(shí)候,還無(wú)法確定。主要是以下三項(xiàng)內(nèi)容:
      • a、queueOffset 隊(duì)列偏移量,此值需要最終返回,且需要保證嚴(yán)格遞增,所以需要在鎖內(nèi)進(jìn)行
      • b、physicalOffset 物理偏移量,也就是全局文件的位置,注:此位置是全局文件的偏移量,不是當(dāng)前文件的偏移量,所以其值可能會(huì)大于1G
      • c、storeTimestamp 存儲(chǔ)時(shí)間戳,此處在鎖內(nèi)進(jìn)行,主要是為了保證消息投遞的時(shí)間嚴(yán)格保序
  • 3、記錄寫(xiě)入信息
    • 記錄當(dāng)前文件寫(xiě)入情況:比如已寫(xiě)入字節(jié)數(shù)、存儲(chǔ)時(shí)間等

三、文件開(kāi)辟及寫(xiě)入

3.1 文件開(kāi)辟

文件的開(kāi)辟是異步進(jìn)行,有獨(dú)立的線程專門(mén)負(fù)責(zé)開(kāi)辟文件。我們可以先看下文件開(kāi)辟的簡(jiǎn)單模型
異步創(chuàng)建MappFile

也就是putMsg的線程會(huì)將開(kāi)辟文件的請(qǐng)求委托給allocate file線程,然后進(jìn)入阻塞,待allocate file線程將文件開(kāi)辟完畢后,再喚醒putMsg線程

那此處我們便產(chǎn)生了2點(diǎn)疑問(wèn):

  • 1、putMsg把開(kāi)辟文件的請(qǐng)求交給了allocate file線程,直到allocate file線程開(kāi)辟完畢后才會(huì)喚醒putMsg線程,其實(shí)并沒(méi)有起到異步開(kāi)辟節(jié)省時(shí)間的目的,直接在putMsg線程中開(kāi)辟文件不好嗎?
  • 2、創(chuàng)建文件本身感覺(jué)并不耗時(shí),不管是拿到文件的FileChannnel還是MappedByteBuffer,都是一件很快的操作,費(fèi)盡周章的異步開(kāi)辟真的有必要嗎?

這兩個(gè)疑問(wèn)將逐步說(shuō)明

3.1.1 開(kāi)啟堆外緩沖池

至此我們要引入一個(gè)非常重要的配置變量transientStorePoolEnable,該配置項(xiàng)只在異步刷盤(pán)(FlushDiskType == AsyncFlush)的場(chǎng)景下,才會(huì)生效

如果配置項(xiàng)中,將transientStorePoolEnable置為false,便稱為“開(kāi)啟堆外緩沖池”。那么這個(gè)變量到底起到什么作用呢?

transientStorePoolEnable類型創(chuàng)建MappFile

系統(tǒng)啟動(dòng)時(shí),會(huì)默認(rèn)開(kāi)辟5個(gè)(參數(shù)transientStorePoolSize控制)堆外內(nèi)存DirectByteBuffer,循環(huán)利用。寫(xiě)消息時(shí),消息都暫存至此,通過(guò)線程CommitRealTimeService將數(shù)據(jù)定時(shí)刷到page cache,當(dāng)數(shù)據(jù)flush到disk后,再將DirectByteBuffer歸還給緩沖池

而開(kāi)辟過(guò)程是在broker啟動(dòng)時(shí)進(jìn)行的;如上圖所示,空間一旦開(kāi)辟完畢后,文件都是預(yù)先創(chuàng)建好的,使用時(shí)直接返回文件引用即可,相當(dāng)高效。但首次啟動(dòng)需要大量開(kāi)辟堆外內(nèi)存空間,會(huì)拉長(zhǎng)broker的啟動(dòng)時(shí)長(zhǎng)。我們看一下這塊開(kāi)辟的源碼

/** * Its a heavy init method. */public void init() {    for (int i = 0; i < poolSize; i++) {        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);        ......        availableBuffers.offer(byteBuffer);    }}

注釋中也標(biāo)識(shí)了這是個(gè)重量級(jí)的方法,主要耗時(shí)點(diǎn)在ByteBuffer.allocateDirect(fileSize),其實(shí)開(kāi)辟內(nèi)存并不耗時(shí),耗時(shí)集中在為內(nèi)存區(qū)域賦0操作,以下是JDK中DirectByteBuffer源碼:

DirectByteBuffer(int cap) {                   // package-private    super(-1, 0, cap, cap);    ......    long base = 0;    try {        base = unsafe.allocateMemory(size);    } catch (OutOfMemoryError x) {        Bits.unreserveMemory(size, cap);        throw x;    }    unsafe.setMemory(base, size, (byte) 0);    ......}

我們發(fā)現(xiàn)在開(kāi)辟完內(nèi)存后,開(kāi)始執(zhí)行了賦0操作unsafe.setMemory(base, size, 0)。其實(shí)可以利用反射巧妙地繞過(guò)這個(gè)耗時(shí)點(diǎn)

private static Field addr;private static Field capacity;static {    try {        addr = Buffer.class.getDeclaredField("address");        addr.setAccessible(true);        capacity = Buffer.class.getDeclaredField("capacity");        capacity.setAccessible(true);    } catch (NoSuchFieldException e) {        e.printStackTrace();    }}public static ByteBuffer newFastByteBuffer(int cap) {    long address = unsafe.allocateMemory(cap);    ByteBuffer bb = ByteBuffer.allocateDirect(0).order(ByteOrder.nativeOrder());    try {        addr.setLong(bb, address);        capacity.setInt(bb, cap);    } catch (IllegalAccessException e) {        return null;    }    bb.clear();    return bb;}

3.1.2 關(guān)閉堆外緩沖池

關(guān)閉堆外內(nèi)存池的話,就會(huì)啟動(dòng)MappedByteBuffer

常規(guī)類型創(chuàng)建MappFile

  • a、首次啟動(dòng)
    • 第一次啟動(dòng)的時(shí)候,allocate線程會(huì)先后創(chuàng)建2個(gè)文件,第一個(gè)文件創(chuàng)建完畢后,便會(huì)返回putMsg線程并喚醒它,然后allocate線程進(jìn)而繼續(xù)異步創(chuàng)建下一個(gè)文件
  • b、后續(xù)啟動(dòng)
    • 后續(xù)請(qǐng)求allocate線程都會(huì)將已經(jīng)創(chuàng)建好的文件直接返回給putMsg線程,然后繼續(xù)異步創(chuàng)建下一個(gè)文件,這樣便真正實(shí)現(xiàn)了異步創(chuàng)建文件的效果

3.1.3 文件預(yù)熱

我們?cè)倩仡櫼幌卤菊聞傞_(kāi)始提出的2個(gè)疑問(wèn):

  • 1、putMsg把開(kāi)辟文件的請(qǐng)求交給了allocate file線程,直到allocate file線程開(kāi)辟完畢后才會(huì)喚醒putMsg線程,其實(shí)并沒(méi)有起到異步開(kāi)辟節(jié)省時(shí)間的目的,直接在putMsg線程中開(kāi)辟文件不好嗎?
  • 2、創(chuàng)建文件本身感覺(jué)并不耗時(shí),不管是拿到文件的FileChannnel還是MappedByteBuffer,都是一件很快的操作,費(fèi)盡周章的異步開(kāi)辟真的有必要嗎?

第一個(gè)問(wèn)題已經(jīng)迎刃而解,即allocate線程通過(guò)異步創(chuàng)建下一個(gè)文件的方式,實(shí)現(xiàn)真正異步

本節(jié)討論的便是第二個(gè)問(wèn)題,其實(shí)如果只是單純創(chuàng)建文件的話,的確是非??斓模恢劣谠偈褂卯惒讲僮?。但RocketMQ對(duì)于新建文件有個(gè)文件預(yù)熱(通過(guò)配置warmMapedFileEnable啟停)功能,當(dāng)然目的是為了磁盤(pán)提速,我么先看下源碼

org.apache.rocketmq.store.MappedFile#warmMappedFile

for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {    byteBuffer.put(i, (byte) 0);    // force flush when flush disk type is sync    if (type == FlushDiskType.SYNC_FLUSH) {        if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {            flush = i;            mappedByteBuffer.force();        }    }}

簡(jiǎn)單來(lái)說(shuō),就是將MappedByteBuffer每隔4K就寫(xiě)入一個(gè)0 byte,然后將整個(gè)文件撐滿;如果刷盤(pán)策略是同步刷盤(pán)的話,還需要調(diào)用mappedByteBuffer.force(),當(dāng)然這個(gè)操作是相當(dāng)相當(dāng)耗時(shí)的,所以也就需要我們進(jìn)行異步處理。這樣也就解釋了第二個(gè)問(wèn)題

但文件預(yù)熱真的有效嗎?我們不妨做個(gè)簡(jiǎn)單的基準(zhǔn)測(cè)試

public class FileWriteCompare {    private static String filePath = "/Users/likangning/test/index3.data";    private static int fileSize = 1024 * 1024 * 1024;    private static boolean warmFile = true;    private static int batchSize = 4096;    @Test    public void test() throws Exception {        File file = new File(filePath);        if (file.exists()) {            file.delete();        }        file.createNewFile();        FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ);        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(batchSize);        long beginTime = System.currentTimeMillis();        mappedByteBuffer.position(0);        while (mappedByteBuffer.remaining() >= batchSize) {            byteBuffer.position(batchSize);            byteBuffer.flip();            mappedByteBuffer.put(byteBuffer);        }        System.out.println("time cost is : " + (System.currentTimeMillis() - beginTime));    }}

簡(jiǎn)單來(lái)說(shuō)就是通過(guò)MappedByteBuffer寫(xiě)入1G文件,在我本地電腦上,平均耗時(shí)在 550ms 左右

然后在MappedByteBuffer寫(xiě)文件前加入預(yù)熱操作

private void warmFile(MappedByteBuffer mappedByteBuffer) {    if (!warmFile) {        return;    }    int pageSize = 4096;    long begin = System.currentTimeMillis();    for (int i = 0, j = 0; i < fileSize; i += pageSize, j++) {        mappedByteBuffer.put(i, (byte) 0);    }    System.out.println("warm file time cost " + (System.currentTimeMillis() - begin));}

耗時(shí)情況如下:

warm file time cost 492time cost is : 125

預(yù)熱后,寫(xiě)文件的耗時(shí)縮短了很多,但預(yù)熱本身的耗時(shí)也幾乎等同于文件寫(xiě)入的耗時(shí)了

以上是沒(méi)有強(qiáng)制刷盤(pán)的測(cè)試效果,如果強(qiáng)制刷盤(pán)(#force)的話,個(gè)人經(jīng)驗(yàn)是文件預(yù)熱一定會(huì)帶來(lái)性能的提升。從前兩天結(jié)束的第二屆中間件性能挑戰(zhàn)賽來(lái)看,文件預(yù)熱至少帶來(lái)10%以上的提升。但是同非強(qiáng)制刷盤(pán)一樣,文件預(yù)熱操作實(shí)在是太重了

整體來(lái)看,文件預(yù)熱后的寫(xiě)入操作,確實(shí)能帶來(lái)性能上的提升,但是如果在系統(tǒng)壓力較大、磁盤(pán)吞吐緊張的場(chǎng)景下,勢(shì)必導(dǎo)致broker抖動(dòng),甚至請(qǐng)求超時(shí),反而得不償失。明白了此層概念后,再通過(guò)大量benchmark來(lái)決定是否開(kāi)啟此配置,做到有的放矢

3.2 文件寫(xiě)入

經(jīng)過(guò)以上整理分析后,文件寫(xiě)入將變得非常輕;不論是DirectByteBuffer還是MappedByteBuffer都可以抽象為ByteBuffer,進(jìn)而直接調(diào)用ByteBuffer.write()

四、刷盤(pán)策略

4.1 異步刷盤(pán)

異步刷盤(pán)策略

4.1.1 異步+關(guān)閉寫(xiě)緩沖

對(duì)應(yīng)如下配置

FlushDiskType == AsyncFlush && transientStorePoolEnable == false

異步刷盤(pán),且關(guān)閉緩沖池,對(duì)應(yīng)的異步刷盤(pán)線程是FlushRealTimeService

上文可知,次策略是通過(guò)MappedByteBuffer寫(xiě)入的數(shù)據(jù),所以此時(shí)數(shù)據(jù)已經(jīng)在 page cache 中了

我們總結(jié)一下刷盤(pán)的策略:

  • 1、固定頻率刷盤(pán)

不響應(yīng)中斷,固定500ms(可配置)刷盤(pán),但刷盤(pán)的時(shí)候,如果發(fā)現(xiàn)未落盤(pán)數(shù)據(jù)不足16K(可配置),那么將進(jìn)入下一個(gè)循環(huán),如果滿16K的話,會(huì)將所有未落盤(pán)的數(shù)據(jù)落盤(pán)。此處補(bǔ)充說(shuō)明下,不論是FileChannel還是MappedByteBuffer都不提供指定區(qū)間的刷盤(pán)策略,只提供一個(gè)force()方法,所以無(wú)法精確控制落盤(pán)數(shù)據(jù)的大小。

如果數(shù)據(jù)寫(xiě)入量很少,一直沒(méi)有填充滿16K,就不會(huì)落盤(pán)了嗎?不是的,此處兜底的方案是,線程發(fā)現(xiàn)距離上次無(wú)條件全量刷盤(pán)已經(jīng)超過(guò)10000ms(可配置),那么此時(shí)就會(huì)無(wú)條件觸發(fā)全量刷盤(pán)

  • 2、非固定頻率刷盤(pán)

與「固定頻率刷盤(pán)」比較相似,唯一不同點(diǎn)是,當(dāng)前刷盤(pán)策略是響應(yīng)中斷的,即每次有新的消息到來(lái)的時(shí)候,都會(huì)發(fā)送喚醒信號(hào),如果刷盤(pán)線程正好處在500ms等待期間的話,將被喚醒。但此處的喚醒并非嚴(yán)謹(jǐn)?shù)膯拘?,有可能發(fā)送了喚醒信號(hào),但刷盤(pán)線程并未成功響應(yīng),兜底方案便是500ms的重試。下面簡(jiǎn)單黏貼一下等待、喚醒的代碼,不再贅述

org.apache.rocketmq.common.ServiceThread

// 喚醒public void wakeup() {    if (hasNotified.compareAndSet(false, true)) {        waitPoint.countDown(); // notify    }}// 睡眠并響應(yīng)喚醒protected void waitForRunning(long interval) {    if (hasNotified.compareAndSet(true, false)) {        this.onWaitEnd();        return;    }    //entry to wait    waitPoint.reset();    try {        waitPoint.await(interval, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {        log.error("Interrupted", e);    } finally {        hasNotified.set(false);        this.onWaitEnd();    }}

綜上,數(shù)據(jù)在page cache中最長(zhǎng)的等待時(shí)間為(10000+500)ms

4.1.2 異步+開(kāi)啟寫(xiě)緩沖

對(duì)應(yīng)如下配置

FlushDiskType == AsyncFlush && transientStorePoolEnable == true

異步刷盤(pán),且開(kāi)啟緩沖池,對(duì)應(yīng)的異步刷盤(pán)線程是CommitRealTimeService

首先需要明確一點(diǎn)的是,當(dāng)前配置下,在寫(xiě)入階段,數(shù)據(jù)是直接寫(xiě)入DirectByteBuffer的,這樣做的好處及弊端也非常鮮明。

  • 好處:數(shù)據(jù)不用寫(xiě)page cache,放入DirectByteBuffer后便很快返回,減少了用戶態(tài)與內(nèi)核態(tài)的切換開(kāi)銷(xiāo),性能非常高
  • 弊端:數(shù)據(jù)可靠性降為最低級(jí)別,即進(jìn)程掛掉的話,就會(huì)丟數(shù)據(jù)。因?yàn)閿?shù)據(jù)即沒(méi)有寫(xiě)入page cache,也沒(méi)有落盤(pán)至disk,僅僅是在進(jìn)程內(nèi)部維護(hù)了一塊臨時(shí)緩存,進(jìn)程重啟或crash掉的話,數(shù)據(jù)一定會(huì)丟失

值得一提的是,此種刷盤(pán)模式,寫(xiě)入動(dòng)作使用的是FileChannel,且僅僅調(diào)用FileChannel.write()方法將數(shù)據(jù)寫(xiě)入page cache,并沒(méi)有直接強(qiáng)制刷盤(pán),而是將強(qiáng)制落盤(pán)的任務(wù)轉(zhuǎn)交給FlushRealTimeService線程來(lái)操作,而FlushRealTimeService線程最終也會(huì)調(diào)用FileChannel進(jìn)行強(qiáng)制刷盤(pán)

在RocketMQ內(nèi)部,無(wú)論采用什么刷盤(pán)策略,都是單一操作對(duì)象在寫(xiě)入/讀取文件;即如果使用MappedByteBuffer寫(xiě)文件,那一定會(huì)通過(guò)MappedByteBuffer刷盤(pán),如果使用FileChannel寫(xiě)文件,那一定會(huì)通過(guò)FileChannel 刷盤(pán),不存在混合操作的情況

疑問(wèn):為什么RocketMQ不依賴操作系統(tǒng)的異步刷盤(pán),而費(fèi)勁周章的設(shè)計(jì)如此刷盤(pán)策略呢?

個(gè)人理解,作為一個(gè)成熟開(kāi)源的組件,數(shù)據(jù)的安全性至關(guān)重要,還是要盡可能保證數(shù)據(jù)穩(wěn)步有序落盤(pán);OS的異步刷盤(pán)固然好使,但RocketMQ對(duì)其把控較弱,當(dāng)操作系統(tǒng)crash或者斷電的時(shí)候,造成的數(shù)據(jù)丟失影響不可控

4.2 同步刷盤(pán)

需要說(shuō)明的是,如果FlushDiskType配置的是同步刷盤(pán)的話,那么此處數(shù)據(jù)一定已經(jīng)被MappedByteBuffer寫(xiě)入了pageCache,接下來(lái)要做的便是真正的落盤(pán)操作。與異步落盤(pán)相似,同步落盤(pán)要根據(jù)配置項(xiàng)Message.isWaitStoreMsgOK()(等待消息落盤(pán))來(lái)分別說(shuō)明

同步刷盤(pán)的落盤(pán)線程統(tǒng)一都是GroupCommitService

同步刷盤(pán)策略

4.2.1 不等待落盤(pán)ack

當(dāng)前模式如圖所示,整體流程比較簡(jiǎn)單,寫(xiě)入線程僅僅負(fù)責(zé)喚醒落盤(pán)線程,然后便執(zhí)行后續(xù)邏輯,線程不阻塞;落盤(pán)線程每次休息10ms(可被寫(xiě)入線程喚醒)后,如果發(fā)現(xiàn)有數(shù)據(jù)未落盤(pán),便將page cache中的數(shù)據(jù)強(qiáng)制force到磁盤(pán)

我們發(fā)現(xiàn),其實(shí)相比較異步刷盤(pán)來(lái)說(shuō),同步刷盤(pán)輪訓(xùn)的時(shí)間只有10ms,遠(yuǎn)小于異步刷盤(pán)的500ms,也是比較好理解的。但當(dāng)前模式寫(xiě)入線程不會(huì)阻塞,也就是不會(huì)等待消息真正存儲(chǔ)到disk后再返回,如果此時(shí)反生操作系統(tǒng)crash或者斷電,那未落盤(pán)的數(shù)據(jù)便會(huì)丟失

個(gè)人感覺(jué),將FlushDiskType已經(jīng)設(shè)置為Sync,表明數(shù)據(jù)會(huì)強(qiáng)制落盤(pán),卻又引入Message.isWaitStoreMsgOK(),來(lái)左右落盤(pán)策略,多多少少會(huì)給使用者造成使用及理解上的困惑

4.2.2 等待落盤(pán)ack

相比較上文,本小節(jié)便是數(shù)據(jù)需要真正存儲(chǔ)到disk后才進(jìn)行返回。寫(xiě)入線程在喚醒落盤(pán)線程后便進(jìn)入阻塞,直至落盤(pán)線程將數(shù)據(jù)刷到disk后再將其喚醒

不過(guò)這里需要處理一個(gè)邊界問(wèn)題,即舊CommitLog的tail,及新CommitLog的head。例如現(xiàn)在有2個(gè)寫(xiě)入線程將數(shù)據(jù)寫(xiě)入了page cache,而這2個(gè)請(qǐng)求一個(gè)落在前CommitLog的尾部,另外一個(gè)落在新CommitLog的頭部,這個(gè)時(shí)候,落盤(pán)線程需要檢測(cè)到這兩個(gè)消息的分布,然后依次將兩個(gè)CommitLog數(shù)據(jù)落盤(pán)

五、線程模型

2_線程模型

RocketMQ中所有的異步處理線程都繼承自抽象類org.apache.rocketmq.common.ServiceThread,此類定義了簡(jiǎn)單的喚醒、通知模型,但并不嚴(yán)格保證喚醒,而是通過(guò)輪訓(xùn)作為兜底方案。實(shí)測(cè)發(fā)現(xiàn)喚醒動(dòng)作在數(shù)據(jù)量較大時(shí),存在性能損耗,改為簡(jiǎn)單的輪詢落盤(pán)模式,性能提高明顯

六、結(jié)束語(yǔ)

本章我們聚焦分析了一條消息在broker端落地的全過(guò)程,但整個(gè)流程還是比較復(fù)雜的,不過(guò)有些部分沒(méi)有提及(比如說(shuō)消息在master落地后是如何同步至salve端的),主要是考慮這些部分跟存儲(chǔ)關(guān)聯(lián)度不是很強(qiáng),放在一起思路容易發(fā)散,這些部分會(huì)放在后文專門(mén)開(kāi)標(biāo)題闡述

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/123639.html

相關(guān)文章

  • 高并發(fā)異步解耦利器:RocketMQ究竟強(qiáng)在哪里?

    摘要:它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。是一個(gè)分布式消息中間件,具有低延遲高性能和可靠性萬(wàn)億級(jí)別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。上篇文章消息隊(duì)列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊(duì)列的發(fā)展史:并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個(gè)RocketMQ出來(lái)呢?是重復(fù)造輪子嗎?本文我們就帶大家來(lái)詳...

    tainzhi 評(píng)論0 收藏0
  • 消息中間件——RabbitMQ()各大主流消息中間件綜合對(duì)比介紹!

    摘要:主流消息中間件介紹是由出品,是一個(gè)完全支持和規(guī)范的實(shí)現(xiàn)。主流消息中間件介紹是阿里開(kāi)源的消息中間件,目前也已經(jīng)孵化為頂級(jí)項(xiàng)目。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif);showImg(https://img-blog.csdnimg.cn/20190718204938932.png?x-oss-process=...

    hiyang 評(píng)論0 收藏0
  • RocketMQ我們學(xué)到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色。下屬們匯報(bào)的消息,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)。 微信公眾號(hào):IT一刻鐘大型現(xiàn)實(shí)非嚴(yán)肅主義現(xiàn)場(chǎng)一刻鐘與你分享優(yōu)質(zhì)技術(shù)架構(gòu)與見(jiàn)聞,做一個(gè)有劇情的程序員關(guān)注可了解更多精彩內(nèi)容。問(wèn)題或建議,請(qǐng)公眾號(hào)留言...

    wangbjun 評(píng)論0 收藏0
  • RocketMQ我們學(xué)到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產(chǎn)者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色。下屬們匯報(bào)的消息,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)。 微信公眾號(hào):IT一刻鐘大型現(xiàn)實(shí)非嚴(yán)肅主義現(xiàn)場(chǎng)一刻鐘與你分享優(yōu)質(zhì)技術(shù)架構(gòu)與見(jiàn)聞,做一個(gè)有劇情的程序員關(guān)注可了解更多精彩內(nèi)容。問(wèn)題或建議,請(qǐng)公眾號(hào)留言...

    Arno 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<