国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Cobar源碼解析(二)

pkwenda / 3411人閱讀

摘要:如果數據庫檢測到是連續的,則表明沒有串包,如果不連續,則表示串包,數據庫會直接丟棄該連接。源碼分析上一節我們分析到,當一個前端連接過來,并不是直接和綁定,而是先插入到線程的注冊隊列中這樣能釋放的壓力處理更多前端連接。

報文格式

這一節我們來講Cobar Handshake的過程。

MySQL服務端和客戶端交互的所有的包格式都是統一的,報文格式如下圖:

MySQL報文的消息頭共有4個字節,前3字節表示的是實際數據的長度(不包含消息頭),并且字節是按照小端模式排放的。

第四個字節MySQL為了防止串包用的,其原理是每收到一個報文,都在sequence id上加1。如果數據庫檢測到sequence id是連續的,則表明沒有串包,如果不連續,則表示串包,數據庫會直接丟棄該連接。

小端模式就是低位字節排放在內存的低地址端,高位字節排放在內存的高地址端。

大端模式則相反。

下面是Handshake包的結構,括號內表示該字段的字節數:

seed部分是加密種子,分為前后兩個部分,通過隨機數生成。

源碼分析

上一節我們分析到,當一個前端連接過來,并不是直接和selector綁定,而是先插入到R線程的注冊隊列中,這樣能釋放NIOAcceptor的壓力,處理更多前端連接。所以,連接和selector的綁定過程是在R線程中進行的,由register方法實現,代碼如下:

private void register(Selector selector) {
            NIOConnection c = null;
            while ((c = registerQueue.poll()) != null) {
                try {
                    c.register(selector);
                } catch (Throwable e) {
                    c.error(ErrorCode.ERR_REGISTER, e);
                }
            }
        }

實際的綁定操作是由NIOConnectionregister方法實現的,NIOConnection接口的抽象類是AbstractConnection,我們來看它實現的register方法:

@Override
    public void register(Selector selector) throws IOException {
        try {
            // 該連接只監聽socket可讀事件
            processKey = channel.register(selector, SelectionKey.OP_READ, this);
            isRegistered = true;
        } finally {
            if (isClosed.get()) {
                clearSelectionKey();
            }
        }
    }

我們發現,前端連接注冊選擇器時,只監聽了可讀事件。這是考慮到,JavaNIO屬于水平觸發LT(只要滿足條件,就觸發一個事件),使用水平觸發時,如果應用程序不需要寫就不要關注socket可寫的事件,否則就會無限次地立即返回write ready notification,長期關注socket可寫事件會出現CPU打滿的情況,所以在使用JDK的NIO編程時,如果沒有數據往外寫,就取消寫事件,有數據往外寫時再注冊寫事件。

FrontendConnection繼承了AbstractConnection,它又重新實現了register方法,代碼如下:

@Override
    public void register(Selector selector) throws IOException {
        // 調用父類的register方法
        super.register(selector);
        if (!isClosed.get()) {
            // 生成認證數據
            byte[] rand1 = RandomUtil.randomBytes(8);
            byte[] rand2 = RandomUtil.randomBytes(12);

            // 保存認證數據
            byte[] seed = new byte[rand1.length + rand2.length];
            System.arraycopy(rand1, 0, seed, 0, rand1.length);
            System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
            this.seed = seed;

            // 發送握手數據包
            HandshakePacket hs = new HandshakePacket();
            hs.packetId = 0;
            hs.protocolVersion = Versions.PROTOCOL_VERSION;
            hs.serverVersion = Versions.SERVER_VERSION;
            hs.threadId = id;
            hs.seed = rand1;
            hs.serverCapabilities = getServerCapabilities();
            hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
            hs.serverStatus = 2;
            hs.restOfScrambleBuff = rand2;
            // 異步寫入Handshake包
            hs.write(this);
        }
    }

該方法生成了HandShake包,和上面結構圖相一致,關鍵是最后異步寫入HandShake包的write方法,代碼如下:

public void write(FrontendConnection c) {
        // 分配緩存
        ByteBuffer buffer = c.allocate();
        
        // 將HandShake包寫入緩存
        BufferUtil.writeUB3(buffer, calcPacketSize());
        buffer.put(packetId);
        buffer.put(protocolVersion);
        BufferUtil.writeWithNull(buffer, serverVersion);
        BufferUtil.writeUB4(buffer, threadId);
        BufferUtil.writeWithNull(buffer, seed);
        BufferUtil.writeUB2(buffer, serverCapabilities);
        buffer.put(serverCharsetIndex);
        BufferUtil.writeUB2(buffer, serverStatus);
        buffer.put(FILLER_13);
        // buffer.position(buffer.position() + 13);
        BufferUtil.writeWithNull(buffer, restOfScrambleBuff);
        
        // 將ByteBuffer中的數據異步寫入Socket
        c.write(buffer);
    }

我們再來看最后一行的write方法:

@Override
    public void write(ByteBuffer buffer) {
        // 檢查連接是否關閉,若關閉則將緩存回收
        if (isClosed.get()) {
            processor.getBufferPool().recycle(buffer);
            return;
        }
        if (isRegistered) {
            try {
                // 將緩存先插入對隊列中,其實就是一個循環數組,如數組已滿,則 wait;
                // 這個隊列是AbstractConnection的一個成員變量
                writeQueue.put(buffer);
            } catch (InterruptedException e) {
                error(ErrorCode.ERR_PUT_WRITE_QUEUE, e);
                return;
            }
            // 插入隊列后,調用NIOProcessor的postWrite方法,其實就是NIOReacor的postWrite方法
            processor.postWrite(this);
        } else {
            // 若連接未注冊,也回收緩存
            processor.getBufferPool().recycle(buffer);
            close();
        }
    }

我們看NIOReactor的postWrite方法:

final void postWrite(NIOConnection c) {
        reactorW.writeQueue.offer(c);
    }

其實是將連接插入到W線程的writeQueue阻塞隊列中,我們再來看W線程的run方法,

@Override
        public void run() {
            NIOConnection c = null;
            for (;;) {
                try {
                    if ((c = writeQueue.take()) != null) {
                        write(c);
                    }
                } catch (Throwable e) {
                    LOGGER.warn(name, e);
                }
            }
        }
        
private void write(NIOConnection c) {
            try {
                c.writeByQueue();
            } catch (Throwable e) {
                c.error(ErrorCode.ERR_WRITE_BY_QUEUE, e);
            }
        }

輪詢阻塞隊列,若隊列不為空,則取出連接,基于隊列寫方法writeByQueue將緩存中的數據寫入socket,下一節再分析writeByQueue方法。

總結

閱讀源碼后,發現Cobar從前端連接的accept并注冊selector到發送Handshake包都是異步,本質是將連接插入到R線程和W線程的阻塞隊列中,不立即進行注冊和寫操作,從而實現整個過程的異步化,提高了Cobar的吞吐量。

以上。

原文鏈接

https://segmentfault.com/a/11...

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70582.html

相關文章

  • Cobar源碼解析(一)

    摘要:的使用方法就不多介紹了,本文的主要內容是剖析的源代碼。而又有一個私有的靜態變量,以及獲取這個私有靜態變量的靜態方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個對象。 簡介 當業務的數據量和訪問量急劇增加的情況下,我們需要對數據進行水平拆分,從而降低單庫的壓力,并且數據的水平拆分需要對業務透明,屏蔽掉水平拆分的細節。并且,前端業務的高并發會導致后端的數據庫連接過多,從而DB...

    jiekechoo 評論0 收藏0
  • 【深度】| 值得收藏的阿里開源技術

    摘要:淘寶定制基于,是國內第一個優化定制且開源的服務器版虛擬機。數據庫開源數據庫是基于官方版本的一個分支,由阿里云數據庫團隊維護,目前也應用于阿里巴巴集團業務以及阿里云數據庫服務。淘寶服務器是由淘寶網發起的服務器項目。 Java JAVA 研發框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構建金融...

    econi 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<