摘要:遠程調(diào)用開篇目標介紹之后解讀遠程調(diào)用模塊的內(nèi)容如何編排介紹中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析。十該類就是遠程調(diào)用的上下文,貫穿著整個調(diào)用,例如調(diào)用,然后調(diào)用。十五該類是系統(tǒng)上下文,僅供內(nèi)部使用。
遠程調(diào)用——開篇
目標:介紹之后解讀遠程調(diào)用模塊的內(nèi)容如何編排、介紹dubbo-rpc-api中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析。前言
最近我面臨著一個選擇,因為dubbo 2.7.0-release出現(xiàn)在了倉庫里,最近一直在進行2.7.0版本的code review,那我之前說這一系列的文章都是講述2.6.x版本的源代碼,我現(xiàn)在要不要選擇直接開始講解2.7.0的版本的源碼呢?我最后還是決定繼續(xù)講解2.6.x,因為我覺得還是有很多公司在用著2.6.x的版本,并且對于升級2.7.0的計劃應(yīng)該還沒那么快,并且在了解2.6.x版本的原理后,再去了解2.7.0新增的特性會更加容易,也能夠品位到設(shè)計者的意圖。當然在結(jié)束2.6.x的重要模塊講解后,我也會對2.7.0的新特性以及實現(xiàn)原理做一個全面的分析,2.7.0作為dubbo社區(qū)的畢業(yè)版,更加強大,敬請期待。
前面講了很多的內(nèi)容,現(xiàn)在開始將遠程調(diào)用RPC,好像又回到我第一篇文章 《dubbo源碼解析(一)Hello,Dubbo》,在這篇文章開頭我講到了什么叫做RPC,再通俗一點講,就是我把一個項目的兩部分代碼分開來,分別放到兩臺機器上,當我部署在A服務(wù)器上的應(yīng)用想要調(diào)用部署在B服務(wù)器上的應(yīng)用等方法,由于不存在同一個內(nèi)存空間,不能直接調(diào)用。而其實整個dubbo都在做遠程調(diào)用的事情,它涉及到很多內(nèi)容,比如配置、代理、集群、監(jiān)控等等,那么這次講的內(nèi)容是只關(guān)心一對一的調(diào)用,dubbo-rpc遠程調(diào)用模塊抽象各種協(xié)議,以及動態(tài)代理,Proxy層和Protocol層rpc的核心,我將會在本系列中講到。下面我們來看兩張官方文檔的圖:
暴露服務(wù)的時序圖:
你會發(fā)現(xiàn)其中有我們以前講到的Transporter、Server、Registry,而這次的系列將會講到的就是紅色框框內(nèi)的部分。
引用服務(wù)時序圖
在引用服務(wù)時序圖中,對應(yīng)的也是紅色框框的部分。
當閱讀完該系列后,希望能對這個調(diào)用鏈有所感悟。接下來看看dubbo-rpc的包結(jié)構(gòu):
可以看到有很多包,很規(guī)整,其中dubbo-rpc-api是對協(xié)議、暴露、引用、代理等的抽象和實現(xiàn),是rpc整個設(shè)計的核心內(nèi)容。其他的包則是dubbo支持的9種協(xié)議,在官方文檔也能查看介紹,并且包括一種本地調(diào)用injvm。那么我們再來看看dubbo-rpc-api中包結(jié)構(gòu):
filter包:在進行服務(wù)引用時會進行一系列的過濾。其中包括了很多過濾器。
listener包:看上面兩張服務(wù)引用和服務(wù)暴露的時序圖,發(fā)現(xiàn)有兩個listener,其中的邏輯實現(xiàn)就在這個包內(nèi)
protocol包:這個包實現(xiàn)了協(xié)議的一些公共邏輯
proxy包:實現(xiàn)了代理的邏輯。
service包:其中包含了一個需要調(diào)用的方法等封裝抽象。
support包:包括了工具類
最外層的實現(xiàn)。
下面的篇幅設(shè)計,本文會講解最外層的源碼和service下的源碼,support包下的源碼我會穿插在其他用到的地方一并講解,filter、listener、protocol、proxy以及各類協(xié)議的實現(xiàn)各自用一篇來講。
源碼分析 (一)Invokerpublic interface Invokerextends Node { /** * get service interface. * 獲得服務(wù)接口 * @return service interface. */ Class getInterface(); /** * invoke. * 調(diào)用下一個會話域 * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; }
該接口是實體域,它是dubbo的核心模型,其他模型都向它靠攏,或者轉(zhuǎn)化成它,它代表了一個可執(zhí)行體,可以向它發(fā)起invoke調(diào)用,這個有可能是一個本地的實現(xiàn),也可能是一個遠程的實現(xiàn),也可能是一個集群的實現(xiàn)。它代表了一次調(diào)用
(二)Invocationpublic interface Invocation { /** * get method name. * 獲得方法名稱 * @return method name. * @serial */ String getMethodName(); /** * get parameter types. * 獲得參數(shù)類型 * @return parameter types. * @serial */ Class>[] getParameterTypes(); /** * get arguments. * 獲得參數(shù) * @return arguments. * @serial */ Object[] getArguments(); /** * get attachments. * 獲得附加值集合 * @return attachments. * @serial */ MapgetAttachments(); /** * get attachment by key. * 獲得附加值 * @return attachment value. * @serial */ String getAttachment(String key); /** * get attachment by key with default value. * 獲得附加值 * @return attachment value. * @serial */ String getAttachment(String key, String defaultValue); /** * get the invoker in current context. * 獲得當前上下文的invoker * @return invoker. * @transient */ Invoker> getInvoker(); }
Invocation 是會話域,它持有調(diào)用過程中的變量,比如方法名,參數(shù)等。
(三)Exporterpublic interface Exporter{ /** * get invoker. * 獲得對應(yīng)的實體域invoker * @return invoker */ Invoker getInvoker(); /** * unexport. * 取消暴露 * *
* getInvoker().destroy(); *
*/ void unexport(); }
該接口是暴露服務(wù)的接口,定義了兩個方法分別是獲得invoker和取消暴露服務(wù)。
(四)ExporterListener@SPI public interface ExporterListener { /** * The exporter exported. * 暴露服務(wù) * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#export(Invoker) */ void exported(Exporter> exporter) throws RpcException; /** * The exporter unexported. * 取消暴露 * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Exporter#unexport() */ void unexported(Exporter> exporter); }
該接口是服務(wù)暴露的監(jiān)聽器接口,定義了兩個方法是暴露和取消暴露,參數(shù)都是Exporter類型的。
(五)Protocol@SPI("dubbo") public interface Protocol { /** * Get default port when user doesn"t config the port. * 獲得默認的端口 * @return default port */ int getDefaultPort(); /** * Export service for remote invocation:
* 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();
* 2. export() must be idempotent, that is, there"s no difference between invoking once and invoking twice when * export the same URL
* 3. Invoker instance is passed in by the framework, protocol needs not to care
* 暴露服務(wù)方法, * @paramService type 服務(wù)類型 * @param invoker Service invoker 服務(wù)的實體域 * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */ @Adaptive Exporter export(Invoker invoker) throws RpcException; /** * Refer a remote service:
* 1. When user calls `invoke()` method of `Invoker` object which"s returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object
* 2. It"s protocol"s responsibility to implement `Invoker` which"s returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation.
* 3. When there"s check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * 引用服務(wù)方法 * @paramService type 服務(wù)類型 * @param type Service class 服務(wù)類名 * @param url URL address for the remote service * @return invoker service"s local proxy * @throws RpcException when there"s any error while connecting to the service provider */ @Adaptive Invoker refer(Class type, URL url) throws RpcException; /** * Destroy protocol:
* 1. Cancel all services this protocol exports and refers
* 2. Release all occupied resources, for example: connection, port, etc.
* 3. Protocol can continue to export and refer new service even after it"s destroyed. */ void destroy(); }
該接口是服務(wù)域接口,也是協(xié)議接口,它是一個可擴展的接口,默認實現(xiàn)的是dubbo協(xié)議。定義了四個方法,關(guān)鍵的是服務(wù)暴露和引用兩個方法。
(六)Filter@SPI public interface Filter { /** * do invoke filter. **
* // before filter * Result result = invoker.invoke(invocation); * // after filter * return result; *
* * @param invoker service * @param invocation invocation. * @return invoke result. * @throws RpcException * @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation) */ Result invoke(Invoker> invoker, Invocation invocation) throws RpcException; }
該接口是invoker調(diào)用時過濾器接口,其中就只有一個invoke方法。在該方法中對調(diào)用進行過濾
(七)InvokerListener@SPI public interface InvokerListener { /** * The invoker referred * 在服務(wù)引用的時候進行監(jiān)聽 * @param invoker * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#refer(Class, com.alibaba.dubbo.common.URL) */ void referred(Invoker> invoker) throws RpcException; /** * The invoker destroyed. * 銷毀實體域 * @param invoker * @see com.alibaba.dubbo.rpc.Invoker#destroy() */ void destroyed(Invoker> invoker); }
該接口是實體域的監(jiān)聽器,定義了兩個方法,分別是服務(wù)引用和銷毀的時候執(zhí)行的方法。
(八)Result該接口是實體域執(zhí)行invoke的結(jié)果接口,里面定義了獲得結(jié)果異常以及附加值等方法。比較好理解我就不貼代碼了。
(九)ProxyFactory@SPI("javassist") public interface ProxyFactory { /** * create proxy. * 創(chuàng)建一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY})T getProxy(Invoker invoker) throws RpcException; /** * create proxy. * 創(chuàng)建一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) T getProxy(Invoker invoker, boolean generic) throws RpcException; /** * create invoker. * 創(chuàng)建一個實體域 * @param * @param proxy * @param type * @param url * @return invoker */ @Adaptive({Constants.PROXY_KEY}) Invoker getInvoker(T proxy, Class type, URL url) throws RpcException; }
該接口是代理工廠接口,它也是個可擴展接口,默認實現(xiàn)javassist,dubbo提供兩種動態(tài)代理方法分別是javassist/jdk,該接口定義了三個方法,前兩個方法是通過invoker創(chuàng)建代理,最后一個是通過代理來獲得invoker。
(十)RpcContext該類就是遠程調(diào)用的上下文,貫穿著整個調(diào)用,例如A調(diào)用B,然后B調(diào)用C。在服務(wù)B上,RpcContext在B之前將調(diào)用信息從A保存到B。開始調(diào)用C,并在B調(diào)用C后將調(diào)用信息從B保存到C。RpcContext保存了調(diào)用信息。
public class RpcContext { /** * use internal thread local to improve performance * 本地上下文 */ private static final InternalThreadLocalLOCAL = new InternalThreadLocal () { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 服務(wù)上下文 */ private static final InternalThreadLocal SERVER_LOCAL = new InternalThreadLocal () { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 附加值集合 */ private final Map attachments = new HashMap (); /** * 上下文值 */ private final Map values = new HashMap (); /** * 線程結(jié)果 */ private Future> future; /** * url集合 */ private List urls; /** * 當前的url */ private URL url; /** * 方法名稱 */ private String methodName; /** * 參數(shù)類型集合 */ private Class>[] parameterTypes; /** * 參數(shù)集合 */ private Object[] arguments; /** * 本地地址 */ private InetSocketAddress localAddress; /** * 遠程地址 */ private InetSocketAddress remoteAddress; /** * 實體域集合 */ @Deprecated private List > invokers; /** * 實體域 */ @Deprecated private Invoker> invoker; /** * 會話域 */ @Deprecated private Invocation invocation; // now we don"t use the "values" map to hold these objects // we want these objects to be as generic as possible /** * 請求 */ private Object request; /** * 響應(yīng) */ private Object response;
該類中最重要的是它的一些屬性,因為該上下文就是用來保存信息的。方法我就不介紹了,因為比較簡單。
(十一)RpcException/** * 不知道異常 */ public static final int UNKNOWN_EXCEPTION = 0; /** * 網(wǎng)絡(luò)異常 */ public static final int NETWORK_EXCEPTION = 1; /** * 超時異常 */ public static final int TIMEOUT_EXCEPTION = 2; /** * 基礎(chǔ)異常 */ public static final int BIZ_EXCEPTION = 3; /** * 禁止訪問異常 */ public static final int FORBIDDEN_EXCEPTION = 4; /** * 序列化異常 */ public static final int SERIALIZATION_EXCEPTION = 5;
該類是rpc調(diào)用拋出的異常類,其中封裝了五種通用的錯誤碼。
(十二)RpcInvocation/** * 方法名稱 */ private String methodName; /** * 參數(shù)類型集合 */ private Class>[] parameterTypes; /** * 參數(shù)集合 */ private Object[] arguments; /** * 附加值 */ private Mapattachments; /** * 實體域 */ private transient Invoker> invoker;
該類實現(xiàn)了Invocation接口,是rpc的會話域,其中的方法比較簡單,主要是封裝了上述的屬性。
(十三)RpcResult/** * 結(jié)果 */ private Object result; /** * 異常 */ private Throwable exception; /** * 附加值 */ private Mapattachments = new HashMap ();
該類實現(xiàn)了Result接口,是rpc的結(jié)果實現(xiàn)類,其中關(guān)鍵是封裝了以上三個屬性。
(十四)RpcStatus該類是rpc的一些狀態(tài)監(jiān)控,其中封裝了許多的計數(shù)器,用來記錄rpc調(diào)用的狀態(tài)。
1.屬性/** * uri對應(yīng)的狀態(tài)集合,key為uri,value為RpcStatus對象 */ private static final ConcurrentMapSERVICE_STATISTICS = new ConcurrentHashMap (); /** * method對應(yīng)的狀態(tài)集合,key是uri,第二個key是方法名methodName */ private static final ConcurrentMap > METHOD_STATISTICS = new ConcurrentHashMap >(); /** * 已經(jīng)沒用了 */ private final ConcurrentMap values = new ConcurrentHashMap (); /** * 活躍狀態(tài) */ private final AtomicInteger active = new AtomicInteger(); /** * 總的數(shù)量 */ private final AtomicLong total = new AtomicLong(); /** * 失敗的個數(shù) */ private final AtomicInteger failed = new AtomicInteger(); /** * 總調(diào)用時長 */ private final AtomicLong totalElapsed = new AtomicLong(); /** * 總調(diào)用失敗時長 */ private final AtomicLong failedElapsed = new AtomicLong(); /** * 最大調(diào)用時長 */ private final AtomicLong maxElapsed = new AtomicLong(); /** * 最大調(diào)用失敗時長 */ private final AtomicLong failedMaxElapsed = new AtomicLong(); /** * 最大調(diào)用成功時長 */ private final AtomicLong succeededMaxElapsed = new AtomicLong(); /** * Semaphore used to control concurrency limit set by `executes` * 信號量用來控制`execution`設(shè)置的并發(fā)限制 */ private volatile Semaphore executesLimit; /** * 用來控制`execution`設(shè)置的許可證 */ private volatile int executesPermits;
以上是該類的屬性,可以看到保存了很多的計數(shù)器,分別用來記錄了失敗調(diào)用成功調(diào)用等累計數(shù)。
2.beginCount/** * 開始計數(shù) * @param url */ public static void beginCount(URL url, String methodName) { // 對該url對應(yīng)對活躍計數(shù)器加一 beginCount(getStatus(url)); // 對該方法對活躍計數(shù)器加一 beginCount(getStatus(url, methodName)); } /** * 以原子方式加1 * @param status */ private static void beginCount(RpcStatus status) { status.active.incrementAndGet(); }
該方法是增加計數(shù)。
3.endCountpublic static void endCount(URL url, String methodName, long elapsed, boolean succeeded) { // url對應(yīng)的狀態(tài)中計數(shù)器減一 endCount(getStatus(url), elapsed, succeeded); // 方法對應(yīng)的狀態(tài)中計數(shù)器減一 endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { // 活躍計數(shù)器減一 status.active.decrementAndGet(); // 總計數(shù)器加1 status.total.incrementAndGet(); // 總調(diào)用時長加上調(diào)用時長 status.totalElapsed.addAndGet(elapsed); // 如果最大調(diào)用時長小于elapsed,則設(shè)置最大調(diào)用時長 if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } // 如果rpc調(diào)用成功 if (succeeded) { // 如果成最大調(diào)用成功時長小于elapsed,則設(shè)置最大調(diào)用成功時長 if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { // 失敗計數(shù)器加一 status.failed.incrementAndGet(); // 失敗的過期數(shù)加上elapsed status.failedElapsed.addAndGet(elapsed); // 總調(diào)用失敗時長小于elapsed,則設(shè)置總調(diào)用失敗時長 if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } }
該方法是計數(shù)器減少。
(十五)StaticContext該類是系統(tǒng)上下文,僅供內(nèi)部使用。
/** * 系統(tǒng)名稱 */ private static final String SYSTEMNAME = "system"; /** * 系統(tǒng)上下文集合,僅供內(nèi)部使用 */ private static final ConcurrentMapcontext_map = new ConcurrentHashMap (); /** * 系統(tǒng)上下文名稱 */ private String name;
上面是該類的屬性,它還記錄了所有的系統(tǒng)上下文集合。
(十六)EchoServicepublic interface EchoService { /** * echo test. * 回聲測試 * @param message message. * @return message. */ Object $echo(Object message); }
該接口是回聲服務(wù)接口,定義了一個一個回聲測試的方法,回聲測試用于檢測服務(wù)是否可用,回聲測試按照正常請求流程執(zhí)行,能夠測試整個調(diào)用是否通暢,可用于監(jiān)控,所有服務(wù)自動實現(xiàn)該接口,只需將任意服務(wù)強制轉(zhuǎn)化為EchoService,就可以用了。
(十七)GenericException該方法是通用的異常類。
/** * 異常類名 */ private String exceptionClass; /** * 異常信息 */ private String exceptionMessage;
比較簡單,就封裝了兩個屬性。
(十八)GenericServicepublic interface GenericService { /** * Generic invocation * 通用的會話域 * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws Throwable potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; }
該接口是通用的服務(wù)接口,同樣定義了一個類似invoke的方法
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠程調(diào)用的開篇,介紹之后解讀遠程調(diào)用模塊的內(nèi)容如何編排、介紹dubbo-rpc-api中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析,其中的邏輯不負責,要關(guān)注的是其中的一些概念和dubbo如何去做暴露服務(wù)和引用服務(wù),其中很多的接口定義需要弄清楚。接下來我將開始對rpc模塊的過濾器進行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72826.html
摘要:源碼分析一該類繼承了類,是協(xié)議實現(xiàn)的核心。屬性默認端口號不支持協(xié)議的服務(wù)暴露,拋出異常可以看到不支持服務(wù)暴露。后記該部分相關(guān)的源碼解析地址該文章講解了遠程調(diào)用中關(guān)于協(xié)議實現(xiàn)的部分,邏輯比較簡單。 遠程調(diào)用——redis協(xié)議 目標:介紹redis協(xié)議的設(shè)計和實現(xiàn),介紹dubbo-rpc-redis的源碼。 前言 dubbo支持的redis協(xié)議是基于Redis的,Redis 是一個高效的 ...
摘要:而編碼器是講應(yīng)用程序的數(shù)據(jù)轉(zhuǎn)化為網(wǎng)絡(luò)格式,解碼器則是講網(wǎng)絡(luò)格式轉(zhuǎn)化為應(yīng)用程序,同時具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經(jīng)用把適配成了。 遠程通訊——開篇 目標:介紹之后解讀遠程通訊模塊的內(nèi)容如何編排、介紹dubbo-remoting-api中的包結(jié)構(gòu)設(shè)計以及最外層的的源碼解析。 前言 服務(wù)治理框架中可以大致分為服務(wù)通信和服務(wù)管理兩個...
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理??丛创a解析四十六消費端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:服務(wù)引用過程目標從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標:從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進行引用,這種方式更多的時候被用來做服務(wù)測試,不...
摘要:服務(wù)暴露過程目標從源碼的角度分析服務(wù)暴露過程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠程兩個過程。其中服務(wù)暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務(wù)暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過程 目標:從源碼的角度分析服務(wù)暴露過程。 前言 本來這一篇一個寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...
閱讀 3257·2021-10-27 14:20
閱讀 2531·2021-10-08 10:05
閱讀 1634·2021-09-09 09:33
閱讀 2906·2019-08-30 13:16
閱讀 1442·2019-08-29 18:34
閱讀 1176·2019-08-29 10:58
閱讀 1232·2019-08-28 18:22
閱讀 1229·2019-08-26 13:33