摘要:啟動(dòng)一個(gè)線程,獲取阻塞隊(duì)列的元素,當(dāng)通道發(fā)生事件時(shí),隊(duì)列會(huì)被放入事件對象啟動(dòng)一個(gè)定時(shí)器,每個(gè)執(zhí)行一次,掃描,超時(shí)沒有獲取結(jié)果的會(huì)被移除掉客戶端跟服務(wù)器端差不多。而這個(gè)對象會(huì)在傳輸之前進(jìn)行編碼,消息接收到進(jìn)行解碼。
rocketMQ通信模塊
Rocketmq的通信層是基于通信框架netty 4.0.21.Final之上做了簡單的協(xié)議封裝,基本的類圖如下:
通訊模塊是怎么進(jìn)行的消息傳輸?shù)?/b>先來看看服務(wù)器端啟動(dòng)做了什么:
netty服務(wù)器啟動(dòng),監(jiān)聽在8888;netty設(shè)置了一個(gè)心跳檢測器IdleStateHandler,讀寫超時(shí)時(shí)間為120s,在120s后都沒有讀寫操作將會(huì)觸發(fā)相應(yīng)事件。
啟動(dòng)一個(gè)線程,獲取阻塞隊(duì)列eventQueue的元素,當(dāng)netty channel通道發(fā)生CONNECT, CLOSE,IDLE,EXCEPTION事件時(shí),隊(duì)列會(huì)被放入事件對象
啟動(dòng)一個(gè)定時(shí)器Timer,每個(gè)1s執(zhí)行一次,掃描ResponseFuture,超時(shí)沒有獲取結(jié)果的會(huì)被移除掉
客戶端跟服務(wù)器端差不多。
rocketmq提供了三種通信方式:
一、invokeSyncImpl 同步調(diào)用(主要實(shí)現(xiàn)參見NettyRemotingAbstract.invokeSyncImpl)
同步調(diào)用是指客戶端發(fā)起遠(yuǎn)程調(diào)用后,當(dāng)前線程會(huì)被阻塞,直到服務(wù)器端返回結(jié)果或發(fā)生超時(shí)異常,我們在發(fā)送消息時(shí)需要同步知道消息發(fā)送成功還是失敗,一般使用這種方式。
我們知道,netty是異步基于事件驅(qū)動(dòng)的,當(dāng)我們使用netty向遠(yuǎn)程服務(wù)器發(fā)送消息是通過channel.writeAndFlush方法,此方法是異步的,那我們?nèi)绾瓮降墨@取服務(wù)器的返回結(jié)果呢?這里的做法是在向服務(wù)器發(fā)送消息時(shí)設(shè)置一個(gè)唯一的序列號,本地會(huì)通過上下文保存一個(gè)ResponseFuture對象在Map中,key就是這個(gè)唯一的序列號,value就是這個(gè)ResponseFuture對象,ResponseFuture對象會(huì)設(shè)置一個(gè)CountDownLatch,每當(dāng)發(fā)送完消息后,就會(huì)調(diào)用CountDownLatch的await方法掛起當(dāng)前線程;當(dāng)服務(wù)器返回結(jié)果時(shí)也會(huì)攜帶之前客戶端傳遞過去的唯一序列號,這樣就可以找到ResponseFuture對象,再調(diào)用CountDownLatch的countDown方法,此時(shí)客戶端之前掛起的線程就會(huì)蘇醒過來,完成一次同步調(diào)用。
二、invokeAsyncImpl異步調(diào)用(主要實(shí)現(xiàn)參見NettyRemotingAbstract.invokeAsyncImpl)
客戶端發(fā)起遠(yuǎn)程調(diào)用前會(huì)先設(shè)置一個(gè)InvokeCallback類,當(dāng)然也是設(shè)置在ResponseFuture對象中,調(diào)用結(jié)束后不會(huì)等待結(jié)果,當(dāng)服務(wù)器返回時(shí)也是跟同步調(diào)用一樣會(huì)在新的線程里面先找到ResponseFuture,然后執(zhí)行回調(diào)接口也就是InvokeCallback的operationComplete方法。如果服務(wù)器返回結(jié)果超時(shí),也會(huì)進(jìn)行回調(diào),客戶端可以根據(jù)相關(guān)的狀態(tài)來執(zhí)行相關(guān)邏輯。
異步調(diào)用不會(huì)阻塞線程,調(diào)用后會(huì)立即返回,調(diào)用結(jié)果會(huì)在異步線程里面執(zhí)行回調(diào)來獲取,使用Async需要控制好節(jié)奏,不能發(fā)送的太快以防止壓垮服務(wù)器端。所以在invokeAsyncImpl方法里面設(shè)置了一個(gè)信號量,默認(rèn)是64個(gè),只有獲取到許可的請求才能真正發(fā)起遠(yuǎn)程調(diào)用。
三、invokeOnewayImpl 單向調(diào)用(主要實(shí)現(xiàn)參見NettyRemotingAbstract.invokeOnewayImpl)
客戶端發(fā)送請求后不會(huì)等待服務(wù)端返回的結(jié)果,并且會(huì)忽略服務(wù)端的處理結(jié)果;當(dāng)前線程調(diào)用完畢,調(diào)用方并不關(guān)心服務(wù)器端的處理結(jié)果,也不會(huì)被阻塞,跟異步調(diào)用一樣需要控制好節(jié)奏以防壓垮服務(wù)器端。在invokeOnewayImpl方法里面也設(shè)置了一個(gè)信號量,默認(rèn)是256個(gè),只有獲取到許可的請求才能真正發(fā)起遠(yuǎn)程調(diào)用。
三種通信方式的對比
調(diào)用方式 | 特點(diǎn) | 使用場景 |
---|---|---|
Sync | 同步阻塞 | 需要同步獲取結(jié)果的場景 |
Async | 異步不阻塞 | 當(dāng)前不需要結(jié)果,但是當(dāng)服務(wù)器處理完后,需要做一些其他事情 |
Oneway | 異步不阻塞 | 不要需要結(jié)果,不保證消息一定發(fā)送成功 |
RemotingCommand是rocketMQ消息傳輸?shù)拿浇?,所有的消息都?huì)包裝成RemotingCommand來進(jìn)行傳輸。而這個(gè)對象會(huì)在netty傳輸之前進(jìn)行編碼,消息接收到進(jìn)行解碼。
RemotingCommand是由頭部(header)和消息體(body)組成,消息發(fā)送的時(shí)候,頭部和消息體會(huì)分開進(jìn)行編碼。那么RemotingCommand是如何組成的呢?
RemotingCommand的核心字段:
public class RemotingCommand{ private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap頭部(header)extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body; }
請求頭接收方和發(fā)起方的含義略有不同,下面的表格詳細(xì)的說明:
字段名 | 類型 | Request | Resposne |
---|---|---|---|
code | int | 請求操作代碼,接收方根據(jù)不同的代碼做不同的操作 | 應(yīng)答結(jié)果代碼,0表示成功,非0表示各種錯(cuò)誤代碼 |
language | 枚舉 | 請求方實(shí)現(xiàn)的語言,默認(rèn)Java | 接收方實(shí)現(xiàn)的語言 |
version | int | 請求方版本 | 接收方版本 |
opaque | int | 請求方在同一連接上不同的請求標(biāo)識代碼,多線程連接服用使用 | 接收方不做修改,直接返回 |
flag | int | 通信層的標(biāo)志位 | 通信層的標(biāo)志位 |
remark | String | 傳輸自定義文本信息 | 錯(cuò)誤詳細(xì)描述 |
extFields | Map | 自定義字段 | 自定義字段 |
頭信息里面還包括了CommandCustomHeader的自定義的一些頭信息,會(huì)被通過反射的方式放在extFields字段里面
消息體消息體是直接變?yōu)閎yte數(shù)組,由客戶端自己序列化,這兩部分后一起放入netty傳輸?shù)腂yteBuffer中,一起傳輸?shù)浇邮斩?/p> 報(bào)文格式與序列化
length | header length | headerData | bodyData |
---|---|---|---|
4個(gè)字節(jié) | 4個(gè)字節(jié)(高一位字節(jié)表示序列化類型,低三位字節(jié)表示長度) |
length:表示整個(gè)數(shù)據(jù)包的長度 占4個(gè)字節(jié)
header length:表示header的長度(高一位字節(jié)表示序列化類型,低三位字節(jié)表示長度)
headerData的序列化有兩種方式:
json:使用fastjson進(jìn)行序列化
自定義:使用bytebuffer自定義序列化
Netty服務(wù)器端在啟動(dòng)時(shí)設(shè)置了TCP參數(shù)的含義SO_BACKLOG:1024
指定全連接隊(duì)列數(shù),linux系統(tǒng)在文件/proc/sys/net/core/somaxconn指定,默認(rèn)128;
還有一個(gè)半連接隊(duì)列數(shù),linux在文件/proc/sys/net/ipv4/tcp_max_syn_backlog指定
SO_REUSEADDR:true
重用處于time_wait狀態(tài)下的連接
SO_KEEPALIVE:false
?;顧C(jī)制
TCP_NODELAY:true
關(guān)閉Nagle算法,Nagle算法可以降低網(wǎng)絡(luò)里小包的數(shù)量,從而提升網(wǎng)絡(luò)性能,關(guān)閉可以提高實(shí)時(shí)性
SO_SNDBUF:65535
發(fā)送緩存區(qū)大小
SO_RCVBUF:65535
接受緩存區(qū)大小
SO_RCVLOWAT:接收緩存水位線
SO_SNDLOWAT:發(fā)送緩存水位線
它們一般被I/O復(fù)用系統(tǒng)調(diào)用用來判斷socket是否可讀或可寫。當(dāng)TCP接收緩沖區(qū)中可讀數(shù)據(jù)的總數(shù)大于其低水位標(biāo)記時(shí),I/O復(fù)用系統(tǒng)調(diào)用將通知應(yīng)用程序可以從對應(yīng)的socket上讀取數(shù)據(jù);當(dāng)TCP發(fā)送緩沖區(qū)中的空閑空間(可以寫入數(shù)據(jù)的空間)大于其低水位標(biāo)記時(shí),I/O復(fù)用系統(tǒng)調(diào)用將通知應(yīng)用程序可以往對應(yīng)的socket上寫入數(shù)據(jù)
在netty中好像沒有看到有設(shè)置這兩個(gè)參數(shù)
CONNECT_TIMEOUT_MILLIS:3000
連接超時(shí)時(shí)間
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77012.html
摘要:具體可以參考消息隊(duì)列之具體可以參考實(shí)戰(zhàn)之快速入門十分鐘入門阿里中間件團(tuán)隊(duì)博客是一個(gè)分布式的可分區(qū)的可復(fù)制的基于發(fā)布訂閱的消息系統(tǒng)主要用于大數(shù)據(jù)領(lǐng)域當(dāng)然在分布式系統(tǒng)中也有應(yīng)用。目前市面上流行的消息隊(duì)列就是阿里借鑒的原理用開發(fā)而得。 我自己總結(jié)的Java學(xué)習(xí)的系統(tǒng)知識點(diǎn)以及面試問題,目前已經(jīng)開源,會(huì)一直完善下去,歡迎建議和指導(dǎo)歡迎Star: https://github.com/Snail...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個(gè)分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊(duì)列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊(duì)列的發(fā)展史:并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個(gè)RocketMQ出來呢?是重復(fù)造輪子嗎?本文我們就帶大家來詳...
摘要:分布式高并發(fā)微服務(wù)問阿里京東螞蟻等大廠面試真題解析道跳槽漲薪必備精選面試題最新版大廠面試真題集點(diǎn)擊這里免費(fèi)領(lǐng)取點(diǎn)擊這里免費(fèi)領(lǐng)取 估計(jì)很多Java程序員平時(shí)主要的工作就是一些Web系統(tǒng)的業(yè)務(wù)開發(fā),對于服務(wù)端IO程序以及網(wǎng)絡(luò)通信編程做得并不多,但是對于高級或者資深程序員來說,IO通信以及服務(wù)端編...
閱讀 1827·2021-10-20 13:49
閱讀 1367·2019-08-30 15:52
閱讀 2873·2019-08-29 16:37
閱讀 1042·2019-08-29 10:55
閱讀 3077·2019-08-26 12:14
閱讀 1655·2019-08-23 17:06
閱讀 3240·2019-08-23 16:59
閱讀 2550·2019-08-23 15:42