摘要:是一個面向字節流的協議,它是性質是流式的,所以它并沒有分段。可基于分隔符解決。編解碼的主要目的就是為了可以編碼成字節流用于在網絡中傳輸持久化存儲。
前言
記得前段時間我們生產上的一個網關出現了故障。
這個網關邏輯非常簡單,就是接收客戶端的請求然后解析報文最后發送短信。
但這個請求并不是常見的 HTTP ,而是利用 Netty 自定義的協議。
有個前提是:網關是需要讀取一段完整的報文才能進行后面的邏輯。
問題是有天突然發現網關解析報文出錯,查看了客戶端的發送日志也沒發現問題,最后通過日志發現收到了許多不完整的報文,有些還多了。
于是想會不會是 TCP 拆、粘包帶來的問題,最后利用 Netty 自帶的拆包工具解決了該問題。
這便有了此文。
TCP 協議問題雖然解決了,但還是得想想原因,為啥會這樣?打破砂鍋問到底才是一個靠譜的程序員。
這就得從 TCP 這個協議說起了。
TCP 是一個面向字節流的協議,它是性質是流式的,所以它并沒有分段。就像水流一樣,你沒法知道什么時候開始,什么時候結束。
所以他會根據當前的套接字緩沖區的情況進行拆包或是粘包。
下圖展示了一個 TCP 協議傳輸的過程:
發送端的字節流都會先傳入緩沖區,再通過網絡傳入到接收端的緩沖區中,最終由接收端獲取。
當我們發送兩個完整包到接收端的時候:
正常情況會接收到兩個完整的報文。
但也有以下的情況:
接收到的是一個報文,它是由發送的兩個報文組成的,這樣對于應用程序來說就很難處理了(這樣稱為粘包)。
還有可能出現上面這樣的雖然收到了兩個包,但是里面的內容卻是互相包含,對于應用來說依然無法解析(拆包)。
對于這樣的問題只能通過上層的應用來解決,常見的方式有:
在報文末尾增加換行符表明一條完整的消息,這樣在接收端可以根據這個換行符來判斷消息是否完整。
將消息分為消息頭、消息體。可以在消息頭中聲明消息的長度,根據這個長度來獲取報文(比如 808 協議)。
規定好報文長度,不足的空位補齊,取的時候按照長度截取即可。
以上的這些方式我們在 Netty 的 pipline 中里加入對應的解碼器都可以手動實現。
但其實 Netty 已經幫我們做好了,完全可以開箱即用。
比如:
LineBasedFrameDecoder 可以基于換行符解決。
DelimiterBasedFrameDecoder 可基于分隔符解決。
FixedLengthFrameDecoder 可指定長度解決。
字符串拆、粘包下面來模擬一下最簡單的字符串傳輸。
還是在之前的
https://github.com/crossoverJie/netty-action
進行演示。
在 Netty 客戶端中加了一個入口可以循環發送 100 條字符串報文到接收端:
/** * 向服務端發消息 字符串 * @param stringReqVO * @return */ @ApiOperation("客戶端發送消息,字符串") @RequestMapping(value = "sendStringMsg", method = RequestMethod.POST) @ResponseBody public BaseResponsesendStringMsg(@RequestBody StringReqVO stringReqVO){ BaseResponse res = new BaseResponse(); for (int i = 0; i < 100; i++) { heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ; } // 利用 actuator 來自增 counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT); SendMsgResVO sendMsgResVO = new SendMsgResVO() ; sendMsgResVO.setMsg("OK") ; res.setCode(StatusEnum.SUCCESS.getCode()) ; res.setMessage(StatusEnum.SUCCESS.getMessage()) ; return res ; } /** * 發送消息字符串 * * @param msg */ public void sendStringMsg(String msg) { ByteBuf message = Unpooled.buffer(msg.getBytes().length) ; message.writeBytes(msg.getBytes()) ; ChannelFuture future = channel.writeAndFlush(message); future.addListener((ChannelFutureListener) channelFuture -> LOGGER.info("客戶端手動發消息成功={}", msg)); }
服務端直接打印即可:
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { LOGGER.info("收到msg={}", msg); }
順便提一下,這里加的有一個字符串的解碼器:.addLast(new StringDecoder()) 其實就是把消息解析為字符串。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List
在 Swagger 中調用了客戶端的接口用于給服務端發送了 100 次消息:
正常情況下接收端應該打印 100 次 hello 才對,但是查看日志會發現:
收到的內容有完整的、多的、少的、拼接的;這也就對應了上面提到的拆包、粘包。
該怎么解決呢?這便可采用之前提到的 LineBasedFrameDecoder 利用換行符解決。
利用 LineBasedFrameDecoder 解決問題LineBasedFrameDecoder 解碼器使用非常簡單,只需要在 pipline 鏈條上添加即可。
//字符串解析,換行防拆包 .addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder())
構造函數中傳入了 1024 是指報的長度最大不超過這個值,具體可以看下文的源碼分析。
然后我們再進行一次測試看看結果:
注意,由于 LineBasedFrameDecoder 解碼器是通過換行符來判斷的,所以在發送時,一條完整的消息需要加上 。
最終的結果:
仔細觀察日志,發現確實沒有一條被拆、粘包。
LineBasedFrameDecoder 的原理目的達到了,來看看它的實現原理:
第一步主要就是 findEndOfLine 方法去找到當前報文中是否存在分隔符,存在就會返回分隔符所在的位置。
判斷是否需要丟棄,默認為 false ,第一次走這個邏輯(下文會判斷是否需要改為 true)。
如果報文中存在換行符,就會將數據截取到那個位置。
如果不存在換行符(有可能是拆包、粘包),就看當前報文的長度是否大于預設的長度。大于則需要緩存這個報文長度,并將 discarding 設為 true。
如果是需要丟棄時,判斷是否找到了換行符,存在則需要丟棄掉之前記錄的長度然后截取數據。
如果沒有找到換行符,則將之前緩存的報文長度進行累加,用于下次拋棄。
從這個邏輯中可以看出就是尋找報文中是否包含換行符,并進行相應的截取。
由于是通過緩沖區讀取的,所以即使這次沒有換行符的數據,只要下一次的報文存在換行符,上一輪的數據也不會丟。
高效的編碼方式 Google Protocol上面提到的其實就是在解碼中進行操作,我們也可以自定義自己的拆、粘包工具。
編解碼的主要目的就是為了可以編碼成字節流用于在網絡中傳輸、持久化存儲。
Java 中也可以實現 Serializable 接口來實現序列化,但由于它性能等原因在一些 RPC 調用中用的很少。
而 Google Protocol 則是一個高效的序列化框架,下面來演示在 Netty 中如何使用。
安裝首先第一步自然是安裝:
在官網下載對應的包。
本地配置環境變量:
當執行 protoc --version 出現以下結果表明安裝成功:
定義自己的協議格式接著是需要按照官方要求的語法定義自己的協議格式。
比如我這里需要定義一個輸入輸出的報文格式:
BaseRequestProto.proto:
syntax = "proto2"; package protocol; option java_package = "com.crossoverjie.netty.action.protocol"; option java_outer_classname = "BaseRequestProto"; message RequestProtocol { required int32 requestId = 2; required string reqMsg = 1; }
BaseResponseProto.proto:
syntax = "proto2"; package protocol; option java_package = "com.crossoverjie.netty.action.protocol"; option java_outer_classname = "BaseResponseProto"; message ResponseProtocol { required int32 responseId = 2; required string resMsg = 1; }
再通過
protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto
protoc 命令將剛才定義的協議格式轉換為 Java 代碼,并生成在 /dev 目錄。
只需要將生成的代碼拷貝到我們的項目中,同時引入依賴:
com.google.protobuf protobuf-java 3.4.0
利用 Protocol 的編解碼也非常簡單:
public class ProtocolUtil { public static void main(String[] args) throws InvalidProtocolBufferException { BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder() .setRequestId(123) .setReqMsg("你好啊") .build(); byte[] encode = encode(protocol); BaseRequestProto.RequestProtocol parseFrom = decode(encode); System.out.println(protocol.toString()); System.out.println(protocol.toString().equals(parseFrom.toString())); } /** * 編碼 * @param protocol * @return */ public static byte[] encode(BaseRequestProto.RequestProtocol protocol){ return protocol.toByteArray() ; } /** * 解碼 * @param bytes * @return * @throws InvalidProtocolBufferException */ public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException { return BaseRequestProto.RequestProtocol.parseFrom(bytes); } }
利用 BaseRequestProto 來做一個演示,先編碼再解碼最后比較最終的結果是否相同。答案肯定是一致的。
利用 protoc 命令生成的 Java 文件里已經幫我們把編解碼全部都封裝好了,只需要簡單調用就行了。
可以看出 Protocol 創建對象使用的是構建者模式,對使用者來說清晰易讀,更多關于構建器的內容可以參考這里。
更多關于 Google Protocol 內容請查看官方開發文檔。
結合 NettyNetty 已經自帶了對 Google protobuf 的編解碼器,也是只需要在 pipline 中添加即可。
server 端:
// google Protobuf 編解碼 .addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance())) .addLast(new ProtobufEncoder())
客戶端:
// google Protobuf 編解碼 .addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance())) .addLast(new ProtobufEncoder())
稍微注意的是,在構建 ProtobufDecoder 時需要顯式指定解碼器需要解碼成什么類型。
我這里服務端接收的是 BaseRequestProto,客戶端收到的是服務端響應的 BaseResponseProto 所以就設置了對應的實例。
同樣的提供了一個接口向服務端發送消息,當服務端收到了一個特殊指令時也會向客戶端返回內容:
@Override protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception { LOGGER.info("收到msg={}", msg.getReqMsg()); if (999 == msg.getRequestId()){ BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder() .setResponseId(1000) .setResMsg("服務端響應") .build(); ctx.writeAndFlush(responseProtocol) ; } }
在 swagger 中調用相關接口:
在日志可以看到服務端收到了消息,同時客戶端也收到了返回:
雖說 Netty 封裝了 Google Protobuf 相關的編解碼工具,其實查看它的編碼工具就會發現也是利用上文提到的 api 實現的。
Protocol 拆、粘包Google Protocol 的使用確實非常簡單,但還是有值的注意的地方,比如它依然會有拆、粘包問題。
不妨模擬一下:
連續發送 100 次消息看服務端收到的怎么樣:
會發現服務端在解碼的時候報錯,其實就是被拆、粘包了。
這點 Netty 自然也考慮到了,所以已經提供了相關的工具。
//拆包解碼 .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufVarint32LengthFieldPrepender())
只需要在服務端和客戶端加上這兩個編解碼工具即可,再來發送一百次試試。
查看日志發現沒有出現一次異常,100 條信息全部都接收到了。
這個編解碼工具可以簡單理解為是在消息體中加了一個 32 位長度的整形字段,用于表明當前消息長度。
總結網絡這塊同樣是計算機的基礎,由于近期在做相關的工作所以接觸的比較多,也算是給大學補課了。
后面會接著更新 Netty 相關的內容,最后會產出一個高性能的 HTTP 以及 RPC 框架,敬請期待。
上文相關的代碼:
https://github.com/crossoverJie/netty-action
號外最近在總結一些 Java 相關的知識點,感興趣的朋友可以一起維護。
地址: https://github.com/crossoverJie/Java-Interview
歡迎關注公眾號一起交流:
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/76612.html
摘要:如果什么事都沒得做,它也不會死循環,它會將線程休眠起來,直到下一個事件來了再繼續干活,這樣的一個線程稱之為線程。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。 Netty到底是什么 從HTTP說起 有了Netty,你可以實現自己的HTTP服務器,FTP服務器,UDP服務器,RPC服務器,WebSocket服務器,Redis的Proxy服務器,MySQL的P...
摘要:的方法,的默認實現會判斷是否是類型注意自動拆箱,自動裝箱問題。適應自旋鎖鎖競爭是下的,會經過用戶態到內核態的切換,是比較花時間的。在中引入了自適應的自旋鎖,說明自旋的時間不固定,要不要自旋變得越來越聰明。 前言 只有光頭才能變強 之前在刷博客的時候,發現一些寫得比較好的博客都會默默收藏起來。最近在查閱補漏,有的知識點比較重要的,但是在之前的博客中還沒有寫到,于是趁著閑整理一下。 文本的...
摘要:趁實習前的這段業余時間,我實現了一個輕量級的分布式框架,名字叫做,代碼量不大,但是麻雀雖小卻五臟俱全。目前支持和兩種序列化框架。使用實現服務注冊與發現功能。代碼實現是我學習驗證過程中誕生的一個輕量級分布式框架,代碼放在了。 遠程過程調用(Remote Procedure Call,RPC)是一個計算機通信協議。該協議允許運行于一臺計算機的程序調用另一臺計算機的子程序,而程序員無需額外地...
閱讀 1117·2021-11-16 11:45
閱讀 3134·2021-10-13 09:40
閱讀 725·2019-08-26 13:45
閱讀 1225·2019-08-26 13:32
閱讀 2181·2019-08-26 13:23
閱讀 923·2019-08-26 12:16
閱讀 2834·2019-08-26 11:37
閱讀 1764·2019-08-26 10:32