摘要:概述在簡易框架需求與設(shè)計(jì)這篇文章中已經(jīng)給出了協(xié)議的具體細(xì)節(jié),協(xié)議類型為二進(jìn)制協(xié)議,如下協(xié)議的解碼我們稱為,編碼我們成為,下文我們將直接使用和術(shù)語。直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼協(xié)議編碼器
概述
在《簡易RPC框架:需求與設(shè)計(jì)》這篇文章中已經(jīng)給出了協(xié)議的具體細(xì)節(jié),協(xié)議類型為二進(jìn)制協(xié)議,如下:
------------------------------------------------------------------------ | magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) | ------------------------------------------------------------------------ | status (1byte) | id (8bytes) | body length (4bytes) | ------------------------------------------------------------------------ | | | body ($body_length bytes) | | | ------------------------------------------------------------------------
協(xié)議的解碼我們稱為 decode,編碼我們成為 encode,下文我們將直接使用 decode 和 encode 術(shù)語。
decode 的本質(zhì)就是講接收到的一串二進(jìn)制報(bào)文,轉(zhuǎn)化為具體的消息對象,在 Java 中,就是將這串二進(jìn)制報(bào)文所包含的信息,用某種類型的對象存儲起來。
encode 則是將存儲了信息的對象,轉(zhuǎn)化為具有相同含義的一串二進(jìn)制報(bào)文,然后網(wǎng)絡(luò)收發(fā)模塊再將報(bào)文發(fā)出去。
無論是 rpc 客戶端還是服務(wù)端,都需要有一個(gè) decode 和 encode 的邏輯。
消息類型rpc 客戶端與服務(wù)端之間的通信,需要通過發(fā)送不同類型的消息來實(shí)現(xiàn),例如:client 向 server 端發(fā)送的消息,可能是請求消息,可能是心跳消息,可能是認(rèn)證消息,而 server 向 client 發(fā)送的消息,一般就是響應(yīng)消息。
利用 Java 中的枚舉類型,可以將消息類型進(jìn)行如下定義:
/** * 消息類型 * * @author beanlam * @version 1.0 */ public enum MessageType { REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE( (byte) 0x04), UNKNOWN((byte) 0xFF); private byte code; MessageType(byte code) { this.code = code; } public static MessageType valueOf(byte code) { for (MessageType instance : values()) { if (instance.code == code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }
在這個(gè)類中設(shè)計(jì)了 valueOf 方法,方便進(jìn)行具體的 byte 字節(jié)與具體的消息枚舉類型之間的映射和轉(zhuǎn)換。
調(diào)用狀態(tài)設(shè)計(jì)client 主動發(fā)起的一次 rpc 調(diào)用,要么成功,要么失敗,server 端有責(zé)任告知 client 此次調(diào)用的結(jié)果,client 也有責(zé)任去感知調(diào)用失敗的原因,因?yàn)椴灰欢ㄊ?server 端造成的失敗,可能是因?yàn)?client 端在對消息進(jìn)行預(yù)處理的時(shí)候,例如序列化,就已經(jīng)出錯(cuò)了,這種錯(cuò)誤也應(yīng)該作為一次調(diào)用的調(diào)用結(jié)果返回給 client 調(diào)用者。因此引入一個(gè)調(diào)用狀態(tài),與消息類型一樣,它也借助了 Java 語言里的枚舉類型來實(shí)現(xiàn),并實(shí)現(xiàn)了方便的 valueOf 方法:
/** * 調(diào)用狀態(tài) * * @author beanlam * @version 1.0 */ public enum InvocationStatus { OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT( (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE( (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR( (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED( (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY( (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR( (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF); private byte code; InvocationStatus(byte code) { this.code = code; } public static InvocationStatus valueOf(byte code) { for (InvocationStatus instance : values()) { if (code == instance.code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }消息實(shí)體設(shè)計(jì)
我們將 client 往 server 端發(fā)送的統(tǒng)一稱為 rpc 請求消息,一個(gè)請求對應(yīng)著一個(gè)響應(yīng),因此在 client 和 server 端間流動的信息大體上其實(shí)就只有兩種,即要么是請求,要么是響應(yīng)。我們將會定義兩個(gè)類,分別是 RpcRequest 和 RpcResponse 來代表請求消息和響應(yīng)消息。
另外由于無論是請求消息還是響應(yīng)消息,它們都有一些共同的屬性,例如說“調(diào)用上下文ID”,或者消息類型。因此會再定義一個(gè) RpcMessage 類,作為父類。
RpcMessage/** * rpc消息 * * @author beanlam * @version 1.0 */ public class RpcMessage { private MessageType type; private long contextId; private Object data; public long getContextId() { return this.contextId; } public void setContextId(long id) { this.contextId = id; } public Object getData() { return this.data; } public void setData(Object data) { this.data = data; } public void setType(byte code) { this.type = MessageType.valueOf(code); } public MessageType getType() { return this.type; } public void setType(MessageType type) { this.type = type; } @Override public String toString() { return "[messageType=" + type.name() + ", contextId=" + contextId + ", data=" + data + "]"; } }RpcRequest
import java.util.concurrent.atomic.AtomicLong; /** * rpc請求消息 * * @author beanlam * @version 1.0 */ public class RpcRequest extends RpcMessage { private static final AtomicLong ID_GENERATOR = new AtomicLong(0); public RpcRequest() { this(ID_GENERATOR.incrementAndGet()); } public RpcRequest(long contextId) { setContextId(contextId); setType(MessageType.REQUEST); } }RpcResponse
/** * * rpc響應(yīng)消息 * * @author beanlam * @version 1.0 */ public class RpcResponse extends RpcMessage { private InvocationStatus status = InvocationStatus.OK; public RpcResponse(long contextId) { setContextId(contextId); setType(MessageType.RESPONSE); } public InvocationStatus getStatus() { return this.status; } public void setStatus(InvocationStatus status) { this.status = status; } @Override public String toString() { return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]"; } }netty 編解碼介紹
netty 是一個(gè) NIO 框架,應(yīng)該這么說,netty 是一個(gè)有良好設(shè)計(jì)思想的 NIO 框架。一個(gè) NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優(yōu)秀而且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實(shí)現(xiàn)的一個(gè)簡易 NIO 框架,可以在這里看到。
netty 的主要特點(diǎn)有:微內(nèi)核設(shè)計(jì)、責(zé)任鏈模式的業(yè)務(wù)邏輯處理、內(nèi)存和資源泄露的檢測等。其中編解碼在 netty 中,都被設(shè)計(jì)成責(zé)任鏈上的一個(gè)一個(gè) Handler。
decode 對于 netty 來說,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。
借助 netty 來實(shí)現(xiàn)協(xié)議編解碼,實(shí)際上就是去在這兩個(gè)handler里面實(shí)現(xiàn)編解碼的邏輯。
decode在實(shí)現(xiàn) decode 邏輯時(shí)需要注意的一個(gè)問題是,由于二進(jìn)制報(bào)文是在網(wǎng)絡(luò)上發(fā)送的,因此一個(gè)完整的報(bào)文可能經(jīng)過多個(gè)分組來發(fā)送的,什么意思呢,就是當(dāng)有報(bào)文進(jìn)來后,要確認(rèn)報(bào)文是否完整,decode邏輯代碼不能假設(shè)收到的報(bào)文就是一個(gè)完整報(bào)文,一般稱這為“TCP半包問題”。同樣,報(bào)文是連著報(bào)文發(fā)送的,意味著decode代碼邏輯還要負(fù)責(zé)在一長串二進(jìn)制序列中,分割出一個(gè)一個(gè)獨(dú)立的報(bào)文,這稱之為“TCP粘包問題”。
netty 本身有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。不過一般情況下我們不會直接去用它,因?yàn)槲覀兊膮f(xié)議比較簡單,自己在代碼里處理一下就可以了。
完整的 decode 代碼邏輯如下所示:
import cn.com.agree.ats.rpc.message.*; import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory; import cn.com.agree.ats.util.logfacade.IPuppetLogger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 協(xié)議解碼器 * * @author beanlam * @version 1.0 */ public class ProtocolDecoder extends ByteToMessageDecoder { private static final IPuppetLogger logger = AbstractPuppetLoggerFactory .getInstance(ProtocolDecoder.class); private boolean magicChecked = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
可以看到,我們解決半包問題的時(shí)候,是判斷有沒有收到我們期望收到的報(bào)文,如果沒有,直接在 decode 方法里面 return,等有更多的報(bào)文被收到的時(shí)候,netty 會自動幫我們調(diào)起 decode 方法。而我們解決粘包問題的思路也很清晰,那就是一次只處理一個(gè)報(bào)文,不去動后面的報(bào)文內(nèi)容。
還需要注意的是,在 netty 中,對于 ByteBuf 的 get 是不會消費(fèi)掉報(bào)文的,而 read 是會消費(fèi)掉報(bào)文的。當(dāng)不確定報(bào)文是否收完整的時(shí)候,我們都是用 get開頭的方法去試探性地驗(yàn)證報(bào)文是否接收完全,當(dāng)確定報(bào)文接收完全后,我們才用 read 開頭的方法去消費(fèi)這段報(bào)文。
encode直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼:
/** * * 協(xié)議編碼器 * * @author beanlam * @version 1.0 */ public class ProtocolEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) throws Exception { byte status; byte[] data = (byte[]) rpcMessage.getData(); if (rpcMessage instanceof RpcRequest) { RpcRequest request = (RpcRequest) rpcMessage; status = InvocationStatus.OK.getCode(); } else { RpcResponse response = (RpcResponse) rpcMessage; status = response.getStatus().getCode(); } out.writeShort(ProtocolMetaData.MAGIC); out.writeByte(ProtocolMetaData.VERSION); out.writeByte(rpcMessage.getType().getCode()); out.writeByte(status); out.writeLong(rpcMessage.getContextId()); out.writeInt(data.length); out.writeBytes(data); } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72065.html
摘要:由于我們還未談到具體的調(diào)用機(jī)制,因此暫且認(rèn)為就是把一個(gè)包含了調(diào)用信息的對象,從經(jīng)過序列化,變成一串二進(jìn)制流,發(fā)送到了端。 概述 在上一篇文章《簡易RPC框架:基于 netty 的協(xié)議編解碼》中談到對于協(xié)議的 decode 和 encode,在談 decode 之前,必須先要知道 encode 的過程是什么,它把什么東西轉(zhuǎn)化成了二進(jìn)制協(xié)議。由于我們還未談到具體的 RPC 調(diào)用機(jī)制,因此暫...
摘要:是一個(gè)分布式服務(wù)框架,以及治理方案。手寫注意要點(diǎn)手寫注意要點(diǎn)基于上文中對于協(xié)議的理解,如果我們自己去實(shí)現(xiàn),需要考慮哪些技術(shù)呢其實(shí)基于圖的整個(gè)流程應(yīng)該有一個(gè)大概的理解。基于手寫實(shí)現(xiàn)基于手寫實(shí)現(xiàn)理解了協(xié)議后,我們基于來實(shí)現(xiàn)一個(gè)通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。[1]詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...
摘要:是一個(gè)面向字節(jié)流的協(xié)議,它是性質(zhì)是流式的,所以它并沒有分段。可基于分隔符解決。編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡(luò)中傳輸持久化存儲。 showImg(https://segmentfault.com/img/remote/1460000015895049); 前言 記得前段時(shí)間我們生產(chǎn)上的一個(gè)網(wǎng)關(guān)出現(xiàn)了故障。 這個(gè)網(wǎng)關(guān)邏輯非常簡單,就是接收客戶端的請求然后解析報(bào)文最后發(fā)送...
摘要:等之所以支持跨語言,是因?yàn)樗麄冏约憾x了一套結(jié)構(gòu)化數(shù)據(jù)存儲格式,如的,用于編解碼對象,作為各個(gè)語言通信的中間協(xié)議。 前段時(shí)間覺得自己一直用別人的框架,站在巨人的肩膀上,也該自己造造輪子了 一時(shí)興起 就著手寫起了RPC框架 這里寫了系列博客拿給大家分享下 這篇是開篇的思路篇 項(xiàng)目最終的代碼放在了我的github上https://github.com/wephone/Me... 歡迎sta...
摘要:豐富的緩存數(shù)據(jù)結(jié)構(gòu)使用它自己的緩存來表示字節(jié)序列而不是的。針對有一個(gè)定義良好的事件模型。有一些協(xié)議是多層的建立在其他低級協(xié)議基礎(chǔ)上。此外,甚至不是完全線程安全的。協(xié)議由標(biāo)準(zhǔn)化為。協(xié)議緩存整合是一個(gè)高效二進(jìn)制協(xié)議的快速實(shí)現(xiàn)。 Chapter 2、結(jié)構(gòu)概覽 這一節(jié)我們將確認(rèn)Netty提供的核心功能是什么,以及它們怎么構(gòu)成一個(gè)完整的網(wǎng)絡(luò)應(yīng)用開發(fā)堆棧。 1、豐富的緩存數(shù)據(jù)結(jié)構(gòu) Netty使用它...
閱讀 1968·2021-11-19 09:40
閱讀 2155·2021-10-09 09:43
閱讀 3306·2021-09-06 15:00
閱讀 2823·2019-08-29 13:04
閱讀 2779·2019-08-26 11:53
閱讀 3543·2019-08-26 11:46
閱讀 2333·2019-08-26 11:38
閱讀 402·2019-08-26 11:27