国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四十六)消費端發(fā)送請求過程

fish / 3300人閱讀

摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。

2.7大揭秘——消費端發(fā)送請求過程
目標(biāo):從源碼的角度分析一個服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。
前言

前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個代理。供消費者調(diào)用服務(wù)的相關(guān)方法。本節(jié)將從調(diào)用方法開始講解內(nèi)部的整個調(diào)用鏈。我們就拿dubbo內(nèi)部的例子講。

ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
context.start();
DemoService demoService = context.getBean("demoService", DemoService.class);
String hello = demoService.sayHello("world");
System.out.println("result: " + hello);

這是dubbo-demo-xml-consumer內(nèi)的實例代碼。接下來我們就開始來看調(diào)用demoService.sayHello方法的時候,dubbo執(zhí)行了哪些操作。

執(zhí)行過程 (一)InvokerInvocationHandler的invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 獲得方法名稱
    String methodName = method.getName();
    // 獲得方法參數(shù)類型
    Class[] parameterTypes = method.getParameterTypes();
    // 如果該方法所在的類是Object類型,則直接調(diào)用invoke。
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 如果這個方法是toString,則直接調(diào)用invoker.toString()
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    // 如果這個方法是hashCode直接調(diào)用invoker.hashCode()
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    // 如果這個方法是equals,直接調(diào)用invoker.equals(args[0])
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }

    // 調(diào)用invoke
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

可以看到上面的源碼,首先對Object的方法進(jìn)行了處理,如果調(diào)用的方法不是這些方法,則先會 創(chuàng)建RpcInvocation,然后再調(diào)用invoke。

RpcInvocation的構(gòu)造方法
public RpcInvocation(Method method, Object[] arguments) {
    this(method.getName(), method.getParameterTypes(), arguments, null, null);
}
public RpcInvocation(String methodName, Class[] parameterTypes, Object[] arguments, Map attachments, Invoker invoker) {
    // 設(shè)置方法名
    this.methodName = methodName;
    // 設(shè)置參數(shù)類型
    this.parameterTypes = parameterTypes == null ? new Class[0] : parameterTypes;
    // 設(shè)置參數(shù)
    this.arguments = arguments == null ? new Object[0] : arguments;
    // 設(shè)置附加值
    this.attachments = attachments == null ? new HashMap() : attachments;
    // 設(shè)置invoker實體
    this.invoker = invoker;
}

創(chuàng)建完RpcInvocation后,就是調(diào)用invoke。先進(jìn)入的是ListenerInvokerWrapper的invoke。

(二)MockClusterInvoker的invoke

可以參考《dubbo源碼解析(四十一)集群——Mock》的(二)MockClusterInvoker,降級后的返回策略的實現(xiàn),根據(jù)配置的不同來決定不用降級還是強(qiáng)制服務(wù)降級還是失敗后再服務(wù)降級。

(三)AbstractClusterInvoker的invoke

可以參考《dubbo源碼解析(三十五)集群——cluster》的(一)AbstractClusterInvoker,該類是一個抽象類,其中封裝了一些公用的方法,AbstractClusterInvoker的invoke也只是做了一些公用操作。主要的邏輯在doInvoke中。

(四)FailoverClusterInvoker的doInvoke

可以參考《dubbo源碼解析(三十五)集群——cluster》的(十二)FailoverClusterInvoker,該類實現(xiàn)了失敗重試的容錯策略。

(五)InvokerWrapper的invoke

可以參考《dubbo源碼解析(二十二)遠(yuǎn)程調(diào)用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實現(xiàn)實際的功能增強(qiáng)。

(六)ProtocolFilterWrapper的內(nèi)部類CallbackRegistrationInvoker的invoke
public Result invoke(Invocation invocation) throws RpcException {
    // 調(diào)用攔截器鏈的invoke
    Result asyncResult = filterInvoker.invoke(invocation);

    // 把異步返回的結(jié)果加入到上下文中
    asyncResult.thenApplyWithContext(r -> {
        // 循環(huán)各個過濾器
        for (int i = filters.size() - 1; i >= 0; i--) {
            Filter filter = filters.get(i);
            // onResponse callback
            // 如果該過濾器是ListenableFilter類型的
            if (filter instanceof ListenableFilter) {
                // 強(qiáng)制類型轉(zhuǎn)化
                Filter.Listener listener = ((ListenableFilter) filter).listener();
                if (listener != null) {
                    // 如果內(nèi)部類listener不為空,則調(diào)用回調(diào)方法onResponse
                    listener.onResponse(r, filterInvoker, invocation);
                }
            } else {
                // 否則,直接調(diào)用filter的onResponse,做兼容。
                filter.onResponse(r, filterInvoker, invocation);
            }
        }
        // 返回異步結(jié)果
        return r;
    });

    // 返回異步結(jié)果
    return asyncResult;
}

這里看到先是調(diào)用攔截器鏈的invoke方法。下面的邏輯是把異步返回的結(jié)果放到上下文中,具體的ListenableFilter以及內(nèi)部類的設(shè)計,還有thenApplyWithContext等方法我會在異步的實現(xiàn)中講到。

(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        // 依次調(diào)用各個過濾器,獲得最終的返回結(jié)果
        asyncResult = filter.invoke(next, invocation);
    } catch (Exception e) {
        // onError callback
        // 捕獲異常,如果該過濾器是ListenableFilter類型的
        if (filter instanceof ListenableFilter) {
            // 獲得內(nèi)部類Listener
            Filter.Listener listener = ((ListenableFilter) filter).listener();
            if (listener != null) {
                //調(diào)用onError,回調(diào)錯誤信息
                listener.onError(e, invoker, invocation);
            }
        }
        // 拋出異常
        throw e;
    }
    // 返回結(jié)果
    return asyncResult;
}

該方法中是對異常的捕獲,調(diào)用內(nèi)部類Listener的onError來回調(diào)錯誤信息。接下來看它經(jīng)過了哪些攔截器。

(八)ConsumerContextFilter的invoke
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得上下文,設(shè)置invoker,會話域,本地地址和原創(chuàng)地址
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(NetUtils.getLocalHost(), 0)
            .setRemoteAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());
    // 如果會話域是RpcInvocation,則設(shè)置invoker
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        // 移除服務(wù)端的上下文
        RpcContext.removeServerContext();
        // 調(diào)用下一個過濾器
        return invoker.invoke(invocation);
    } finally {
        // 清空上下文
        RpcContext.removeContext();
    }
}
static class ConsumerContextListener implements Listener {
    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) {
        // 把結(jié)果中的附加值放入到上下文中
        RpcContext.getServerContext().setAttachments(appResponse.getAttachments());
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        // 不做任何處理
    }
}

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》,不過上面的源碼是最新的,而鏈接內(nèi)的源碼是2.6.x的,雖然做了一些變化,比如內(nèi)部類的的設(shè)計,后續(xù)的過濾器也有同樣的實現(xiàn),但是ConsumerContextFilter作用沒有變化,它依舊是在當(dāng)前的RpcContext中記錄本地調(diào)用的一次狀態(tài)信息。該過濾器執(zhí)行完成后,會回到ProtocolFilterWrapper的invoke中的

Result result = filter.invoke(next, invocation);

然后繼續(xù)調(diào)用下一個過濾器FutureFilter。

(九)FutureFilter的invoke
public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException {
    // 該方法是真正的調(diào)用方法的執(zhí)行
    fireInvokeCallback(invoker, invocation);
    // need to configure if there"s return value before the invocation in order to help invoker to judge if it"s
    // necessary to return future.
    return invoker.invoke(invocation);
}
class FutureListener implements Listener {
    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        if (result.hasException()) {
            // 處理異常結(jié)果
            fireThrowCallback(invoker, invocation, result.getException());
        } else {
            // 處理正常結(jié)果
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {

    }
}

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》中的(十四)FutureFilter,其中會有部分結(jié)構(gòu)不一樣,跟ConsumerContextFilter一樣,因為后續(xù)版本對Filter接口進(jìn)行了新的設(shè)計,增加了onResponse方法,把返回的執(zhí)行邏輯放到onResponse中去了。其他邏輯沒有很大變化。等該過濾器執(zhí)行完成后,還是回到ProtocolFilterWrapper的invoke中的,繼續(xù)調(diào)用下一個過濾器MonitorFilter。

(十)MonitorFilter的invoke
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 如果開啟監(jiān)控
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        // 設(shè)置監(jiān)控開始時間
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        // 獲得當(dāng)前的調(diào)用數(shù),并且增加
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
class MonitorListener implements Listener {

    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        // 如果開啟監(jiān)控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 執(zhí)行監(jiān)控,搜集數(shù)據(jù)
            collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
            // 減少當(dāng)前調(diào)用數(shù)
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        // 如果開啟監(jiān)控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 執(zhí)行監(jiān)控,搜集數(shù)據(jù)
            collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
            // 減少當(dāng)前調(diào)用數(shù)
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

可以看到該過濾器實際用來做監(jiān)控,監(jiān)控服務(wù)的調(diào)用數(shù)量等。其中監(jiān)控的邏輯不是本文重點,所以不細(xì)講。接下來調(diào)用的是ListenerInvokerWrapper的invoke。

(十一)ListenerInvokerWrapper的invoke
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

可以參考《dubbo源碼解析(二十一)遠(yuǎn)程調(diào)用——Listener》,這里用到了裝飾者模式,直接調(diào)用了invoker。該類里面做了服務(wù)啟動的監(jiān)聽器。我們直接關(guān)注下一個invoke。

(十二)AsyncToSyncInvoker的invoke
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);

    try {
        // 如果是同步的調(diào)用
        if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) {
            // 從異步結(jié)果中g(shù)et結(jié)果
            asyncResult.get();
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    // 返回異步結(jié)果
    return asyncResult;
}

AsyncToSyncInvoker類從名字上就很好理解,它的作用是把異步結(jié)果轉(zhuǎn)化為同步結(jié)果。新的改動中每個調(diào)用只要不是oneway方式調(diào)用都會先以異步調(diào)用開始,然后根據(jù)配置的情況如果是同步調(diào)用,則會在這個類中進(jìn)行異步結(jié)果轉(zhuǎn)同步的處理。當(dāng)然,這里先是執(zhí)行了invoke,然后就進(jìn)入下一個AbstractInvoker的invoke了。

(十三)AbstractInvoker的invoke
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let"s allow the current invoke to proceed
    // 如果服務(wù)引用銷毀,則打印告警日志,但是通過
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 會話域中加入該調(diào)用鏈
    invocation.setInvoker(this);
    // 把附加值放入會話域
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    // 把上下文的附加值放入會話域
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }

    // 從配置中得到是什么模式的調(diào)用,一共有FUTURE、ASYNC和SYNC
    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    // 加入編號
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        // 執(zhí)行調(diào)用鏈
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        // 獲得異常
        Throwable te = e.getTargetException();
        if (te == null) {
            // 創(chuàng)建默認(rèn)的異常異步結(jié)果
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                // 設(shè)置異常碼
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            // 創(chuàng)建默認(rèn)的異常異步結(jié)果
            return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
}

可以參考《dubbo源碼解析(二十二)遠(yuǎn)程調(diào)用——Protocol》的(三)AbstractInvoker。該方法做了一些公共的操作,比如服務(wù)引用銷毀的檢測,加入附加值,加入調(diào)用鏈實體域到會話域中等。然后執(zhí)行了doInvoke抽象方法。各協(xié)議自己去實現(xiàn)。然后就是執(zhí)行到doInvoke方法了。使用的協(xié)議不一樣,doInvoke的邏輯也有所不同,我這里舉的例子是使用dubbo協(xié)議,所以我就介紹DubboInvoker的doInvoke,其他自行查看具體的實現(xiàn)。此次的異步改造加入了InvokeMode,我會在后續(xù)中介紹這個。

(十四)DubboInvoker的doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
    // rpc會話域
    RpcInvocation inv = (RpcInvocation) invocation;
    // 獲得方法名
    final String methodName = RpcUtils.getMethodName(invocation);
    // 把path放入到附加值中
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    // 把版本號放入到附加值
    inv.setAttachment(VERSION_KEY, version);

    // 當(dāng)前的客戶端
    ExchangeClient currentClient;
    // 如果數(shù)組內(nèi)就一個客戶端,則直接取出
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 取模輪詢 從數(shù)組中取,當(dāng)取到最后一個時,從頭開始
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否是單向發(fā)送
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 獲得超時時間
        int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // 如果是單向發(fā)送
        if (isOneway) {
            // 是否等待消息發(fā)送,默認(rèn)不等待消息發(fā)出,將消息放入 IO 隊列,即刻返回。
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 單向發(fā)送只負(fù)責(zé)發(fā)送消息,不等待服務(wù)端應(yīng)答,所以沒有返回值
            currentClient.send(inv, isSent);
            // 設(shè)置future為null,因為單向發(fā)送沒有返回值
            RpcContext.getContext().setFuture(null);
            // 創(chuàng)建一個默認(rèn)的AsyncRpcResult
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 否則直接創(chuàng)建AsyncRpcResult
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            // 異步調(diào)用,返回CompletableFuture類型的future
            CompletableFuture responseFuture = currentClient.request(inv, timeout);
            // 當(dāng)調(diào)用結(jié)果完成時
            responseFuture.whenComplete((obj, t) -> {
                // 如果有異常
                if (t != null) {
                    // 拋出一個異常
                    asyncRpcResult.completeExceptionally(t);
                } else {
                    // 完成調(diào)用
                    asyncRpcResult.complete((AppResponse) obj);
                }
            });
            // 異步返回結(jié)果用CompletableFuture包裝,把future放到上下文中,
            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            // 返回結(jié)果
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(一)DubboInvoker,不過鏈接內(nèi)的文章的源碼是2.6.x版本的,而上述的源碼是最新版本的,其中就有對于異步的改動,比如加入了異步返回結(jié)果、除了單向調(diào)用,一律都先處理成AsyncRpcResult等。具體的AsyncRpcResult以及其中用到的CompletableFuture我會在下文介紹。

上述源碼中執(zhí)行currentClient.request或者currentClient.send,代表把請求放入channel中,交給channel來處理請求。最后來看一個currentClient.request,因為這其中涉及到了Future的構(gòu)建。

(十五)ReferenceCountExchangeClient的request
public CompletableFuture request(Object request, int timeout) throws RemotingException {
    return client.request(request, timeout);
}

ReferenceCountExchangeClient是一個記錄請求數(shù)的類,用了適配器模式,對ExchangeClient做了功能增強(qiáng)。

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(八)ReferenceCountExchangeClient。

(十六)HeaderExchangeClient的request
public CompletableFuture request(Object request, int timeout) throws RemotingException {
    return channel.request(request, timeout);
}

該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(四)HeaderExchangeClient。然后進(jìn)入HeaderExchangeChannel的request。

(十七)HeaderExchangeChannel的request

可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(二)HeaderExchangeChannel,在這個request方法中就可以看到

 // 創(chuàng)建DefaultFuture對象,可以從future中主動獲得請求對應(yīng)的響應(yīng)信息
    DefaultFuture future = new DefaultFuture(channel, req, timeout);

生成了需要的future。異步請求結(jié)果就是從這個future中獲取。關(guān)于DefaultFuture也可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(七)DefaultFuture。

后面channel.send方法就是跟遠(yuǎn)程通信有關(guān)了,例如使用netty作為通信實現(xiàn),則會使用netty實現(xiàn)的客戶端進(jìn)行通信。

(十八)AbstractPeer的send

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(一)AbstractPeer,其中send方法比較簡單,根據(jù)sent配置項去做消息發(fā)送。接下來看AbstractClient的send

(十九)AbstractClient的send

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(四)AbstractClient。

public void send(Object message, boolean sent) throws RemotingException {
    // 如果需要重連或者沒有鏈接,則連接
    if (needReconnect && !isConnected()) {
        connect();
    }
    // 獲得通道
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    // 通過通道發(fā)送消息
    channel.send(message, sent);
}

該方法中做了重連的邏輯,然后就是通過通道發(fā)送消息,dubbo有幾種通信的實現(xiàn),我這里就按照默認(rèn)的netty4實現(xiàn)來講解,所以下一步走到了NettyChannel的send。

(二十)NettyChannel的send

可以參考《dubbo源碼解析(十七)遠(yuǎn)程通信——Netty4》的(一)NettyChannel。這里其中先執(zhí)行了下面父類AbstractChannel的send,檢查了一下通道是否關(guān)閉,然后再走下面的邏輯。當(dāng)執(zhí)行writeAndFlush方法后,消息就被發(fā)送。

dubbo數(shù)據(jù)包可以查看《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(二十五)ExchangeCodec,后續(xù)關(guān)于netty發(fā)送消息,以及netty出站數(shù)據(jù)在發(fā)出之前還需要進(jìn)行編碼操作我就先不做介紹,主要是跟netty知識點強(qiáng)相關(guān),只是dubbo做了一些自己的編碼,以及集成了各類序列化方式。

后記

該文章講解了dubbo調(diào)用服務(wù)的方法所經(jīng)歷的所有步驟,直到調(diào)用消息發(fā)送到服務(wù)端為止,是目前最新代碼的解析。下一篇文將講解服務(wù)端收到方法調(diào)用的請求后,如何處理以及如何把調(diào)用結(jié)果返回的過程。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74796.html

相關(guān)文章

  • dubbo源碼解析四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析四十七)服務(wù)處理請求過程

    摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。 2.7大揭秘——服務(wù)端處理請求過程 目標(biāo):從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當(dāng)服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費端在發(fā)送請求的時候已經(jīng)做了編碼,所以我...

    yzzz 評論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標(biāo)了解的新特性,以及版本升級的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個的鍵值對,包含了一個服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級的引導(dǎo)。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析四十五)服務(wù)引用過程

    摘要:服務(wù)引用過程目標(biāo)從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標(biāo):從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進(jìn)行引用,這種方式更多的時候被用來做服務(wù)測試,不...

    xiaowugui666 評論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務(wù)提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當(dāng)網(wǎng)絡(luò)通訊層收到一個請求后,會找到對應(yīng)的實例,并調(diào)用它所對應(yīng)的實例,從而真正調(diào)用了服務(wù)提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進(jìn)行源碼學(xué)習(xí)。總結(jié)起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構(gòu)圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們在哪,我們下一...

    Meathill 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<