国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Dubbo 源碼分析 - 服務(wù)調(diào)用過(guò)程

Travis / 2092人閱讀

摘要:服務(wù)調(diào)用過(guò)程比較復(fù)雜,包含眾多步驟。源碼分析在進(jìn)行源碼分析之前,我們先來(lái)通過(guò)一張圖了解服務(wù)調(diào)用過(guò)程。服務(wù)調(diào)用方式支持同步和異步兩種調(diào)用方式,其中異步調(diào)用還可細(xì)分為有返回值的異步調(diào)用和無(wú)返回值的異步調(diào)用。

注: 本系列文章已捐贈(zèng)給 Dubbo 社區(qū),你也可以在 Dubbo 官方文檔中閱讀本系列文章。

1. 簡(jiǎn)介

在前面的文章中,我們分析了 Dubbo SPI、服務(wù)導(dǎo)出與引入、以及集群容錯(cuò)方面的代碼。經(jīng)過(guò)前文的鋪墊,本篇文章我們終于可以分析服務(wù)調(diào)用過(guò)程了。Dubbo 服務(wù)調(diào)用過(guò)程比較復(fù)雜,包含眾多步驟。比如發(fā)送請(qǐng)求、編解碼、服務(wù)降級(jí)、過(guò)濾器鏈處理、序列化、線程派發(fā)以及響應(yīng)請(qǐng)求等步驟。限于篇幅原因,本篇文章無(wú)法對(duì)所有的步驟一一進(jìn)行分析。本篇文章將會(huì)重點(diǎn)分析請(qǐng)求的發(fā)送與接收、編解碼、線程派發(fā)以及響應(yīng)的發(fā)送與接收等過(guò)程,至于服務(wù)降級(jí)、過(guò)濾器鏈和序列化大家自行進(jìn)行分析,也可以將其當(dāng)成一個(gè)黑盒,暫時(shí)忽略也沒(méi)關(guān)系。介紹完本篇文章要分析的內(nèi)容,接下來(lái)我們進(jìn)入正題吧。

2. 源碼分析

在進(jìn)行源碼分析之前,我們先來(lái)通過(guò)一張圖了解 Dubbo 服務(wù)調(diào)用過(guò)程。

首先服務(wù)消費(fèi)者通過(guò)代理對(duì)象 Proxy 發(fā)起遠(yuǎn)程調(diào)用,接著通過(guò)網(wǎng)絡(luò)客戶端 Client 將編碼后的請(qǐng)求發(fā)送給服務(wù)提供方的網(wǎng)絡(luò)層上,也就是 Server。Server 在收到請(qǐng)求后,首先要做的事情是對(duì)數(shù)據(jù)包進(jìn)行解碼。然后將解碼后的請(qǐng)求發(fā)送至分發(fā)器 Dispatcher,再由分發(fā)器將請(qǐng)求派發(fā)到指定的線程池上,最后由線程池調(diào)用具體的服務(wù)。這就是一個(gè)遠(yuǎn)程調(diào)用請(qǐng)求的發(fā)送與接收過(guò)程。至于響應(yīng)的發(fā)送與接收過(guò)程,這張圖中沒(méi)有表現(xiàn)出來(lái)。對(duì)于這兩個(gè)過(guò)程,我們也會(huì)進(jìn)行詳細(xì)分析。

2.1 服務(wù)調(diào)用方式

Dubbo 支持同步和異步兩種調(diào)用方式,其中異步調(diào)用還可細(xì)分為“有返回值”的異步調(diào)用和“無(wú)返回值”的異步調(diào)用。所謂“無(wú)返回值”異步調(diào)用是指服務(wù)消費(fèi)方只管調(diào)用,但不關(guān)心調(diào)用結(jié)果,此時(shí) Dubbo 會(huì)直接返回一個(gè)空的 RpcResult。若要使用異步特性,需要服務(wù)消費(fèi)方手動(dòng)進(jìn)行配置。默認(rèn)情況下,Dubbo 使用同步調(diào)用方式。

本節(jié)以及其他章節(jié)將會(huì)使用 Dubbo 官方提供的 Demo 分析整個(gè)調(diào)用過(guò)程,下面我們從 DemoService 接口的代理類開(kāi)始進(jìn)行分析。Dubbo 默認(rèn)使用 Javassist 框架為服務(wù)接口生成動(dòng)態(tài)代理類,因此我們需要先將代理類進(jìn)行反編譯才能看到源碼。這里使用阿里開(kāi)源 Java 應(yīng)用診斷工具 Arthas 反編譯代理類,結(jié)果如下:

/**
 * Arthas 反編譯步驟:
 * 1. 啟動(dòng) Arthas
 *    java -jar arthas-boot.jar
 *
 * 2. 輸入編號(hào)選擇進(jìn)程
 *    Arthas 啟動(dòng)后,會(huì)打印 Java 應(yīng)用進(jìn)程列表,比如:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 這里輸入編號(hào) 3,讓 Arthas 關(guān)聯(lián)到啟動(dòng)類為 com.....Consumer 的 Java 進(jìn)程上
 *
 * 3. 由于 Demo 項(xiàng)目中只有一個(gè)服務(wù)接口,因此此接口的代理類類名為 proxy0,此時(shí)使用 sc 命令搜索這個(gè)類名。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 更多使用方法請(qǐng)參考 Arthas 官方文檔:
 *   https://alibaba.github.io/arthas/quick-start.html
 */
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數(shù)組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0() {
    }

    public String sayHello(String string) {
        // 將參數(shù)存儲(chǔ)到 Object 數(shù)組中
        Object[] arrobject = new Object[]{string};
        // 調(diào)用 InvocationHandler 實(shí)現(xiàn)類的 invoke 方法得到調(diào)用結(jié)果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調(diào)用結(jié)果
        return (String)object;
    }

    /** 回聲測(cè)試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }
}

如上,代理類的邏輯比較簡(jiǎn)單。首先將運(yùn)行時(shí)參數(shù)存儲(chǔ)到數(shù)組中,然后調(diào)用 InvocationHandler 接口實(shí)現(xiàn)類的 invoke 方法,得到調(diào)用結(jié)果,最后將結(jié)果轉(zhuǎn)型并返回給調(diào)用方。關(guān)于代理類的邏輯就說(shuō)這么多,繼續(xù)向下分析。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker invoker;

    public InvokerInvocationHandler(Invoker handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        
        // 攔截定義在 Object 類中的方法(未被子類重寫(xiě)),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        
        // 如果 toString、hashCode 和 equals 等方法被子類重寫(xiě)了,這里也直接調(diào)用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        
        // 將 method 和 args 封裝到 RpcInvocation 中,并執(zhí)行后續(xù)的調(diào)用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內(nèi)部封裝了服務(wù)降級(jí)邏輯。下面簡(jiǎn)單看一下:

public class MockClusterInvoker implements Invoker {
    
    private final Invoker invoker;
    
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 獲取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 無(wú) mock 邏輯,直接調(diào)用其他 Invoker 對(duì)象的 invoke 方法,
            // 比如 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // force:xxx 直接執(zhí)行 mock 邏輯,不發(fā)起遠(yuǎn)程調(diào)用
            result = doMockInvoke(invocation, null);
        } else {
            // fail:xxx 表示消費(fèi)方對(duì)調(diào)用服務(wù)失敗后,再執(zhí)行 mock 邏輯,不拋出異常
            try {
                // 調(diào)用其他 Invoker 對(duì)象的 invoke 方法
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    // 調(diào)用失敗,執(zhí)行 mock 邏輯
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    
    // 省略其他方法
}

服務(wù)降級(jí)不是本文重點(diǎn),因此這里就不分析 doMockInvoke 方法了。考慮到前文已經(jīng)詳細(xì)分析過(guò) FailoverClusterInvoker,因此本節(jié)略過(guò) FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker implements Invoker {
    
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設(shè)置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設(shè)置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設(shè)置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實(shí)現(xiàn)
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    
    // 省略其他方法
}

上面的代碼來(lái)自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調(diào)用 doInvoke 執(zhí)行后續(xù)的調(diào)用。doInvoke 是一個(gè)抽象方法,需要由子類實(shí)現(xiàn),下面到 DubboInvoker 中看一下。

public class DubboInvoker extends AbstractInvoker {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設(shè)置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數(shù)組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 為 true,表示“單向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無(wú)返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發(fā)送請(qǐng)求
                currentClient.send(inv, isSent);
                // 設(shè)置上下文中的 future 為 null
                RpcContext.getContext().setFuture(null);
                // 返回一個(gè)空的 RpcResult
                return new RpcResult();
            } 

            // 異步有返回值
            else if (isAsync) {
                // 發(fā)送請(qǐng)求,獲得 ResponseFuture 實(shí)例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設(shè)置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter(future));
                // 暫時(shí)返回一個(gè)空結(jié)果
                return new RpcResult();
            } 

            // 同步調(diào)用
            else {
                RpcContext.getContext().setFuture(null);
                // 發(fā)送請(qǐng)求,得到一個(gè) ResponseFuture 實(shí)例,并調(diào)用該實(shí)例的 get 方法進(jìn)行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其他方法
}

上面的代碼包含了 Dubbo 對(duì)同步和異步調(diào)用的處理邏輯,搞懂了上面的代碼,會(huì)對(duì) Dubbo 的同步和異步調(diào)用方式有更深入的了解。Dubbo 實(shí)現(xiàn)同步和異步調(diào)用比較關(guān)鍵的一點(diǎn)就在于由誰(shuí)調(diào)用 ResponseFuture 的 get 方法。同步調(diào)用模式下,由框架自身調(diào)用 ResponseFuture 的 get 方法。異步調(diào)用模式下,則由用戶調(diào)用該方法。ResponseFuture 是一個(gè)接口,下面我們來(lái)看一下它的默認(rèn)實(shí)現(xiàn)類 DefaultFuture 的源碼。

public class DefaultFuture implements ResponseFuture {
    
    private static final Map CHANNELS = 
        new ConcurrentHashMap();

    private static final Map FUTURES = 
        new ConcurrentHashMap();
    
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        
        // 獲取請(qǐng)求 id,這個(gè) id 很重要,后面還會(huì)見(jiàn)到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲(chǔ)  映射關(guān)系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循環(huán)檢測(cè)服務(wù)提供方是否成功返回了調(diào)用結(jié)果
                while (!isDone()) {
                    // 如果調(diào)用結(jié)果尚未返回,這里等待一段時(shí)間
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果調(diào)用結(jié)果成功返回,或等待超時(shí),此時(shí)跳出 while 循環(huán),執(zhí)行后續(xù)的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            
            // 如果調(diào)用結(jié)果仍未返回,則拋出超時(shí)異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        
        // 返回調(diào)用結(jié)果
        return returnFromResponse();
    }
    
    @Override
    public boolean isDone() {
        // 通過(guò)檢測(cè) response 字段為空與否,判斷是否收到了調(diào)用結(jié)果
        return response != null;
    }
    
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        
        // 如果調(diào)用結(jié)果的狀態(tài)為 Response.OK,則表示調(diào)用過(guò)程正常,服務(wù)提供方成功返回了調(diào)用結(jié)果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        
        // 拋出異常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    
    // 省略其他方法
}

如上,當(dāng)服務(wù)消費(fèi)者還未接收到調(diào)用結(jié)果時(shí),用戶線程調(diào)用 get 方法會(huì)被阻塞住。同步調(diào)用模式下,框架獲得 DefaultFuture 對(duì)象后,會(huì)立即調(diào)用 get 方法進(jìn)行等待。而異步模式下則是將該對(duì)象封裝到 FutureAdapter 實(shí)例中,并將 FutureAdapter 實(shí)例設(shè)置到 RpcContext 中,供用戶使用。FutureAdapter 是一個(gè)適配器,用于將 Dubbo 中的 ResponseFuture 與 JDK 中的 Future 進(jìn)行適配。這樣當(dāng)用戶線程調(diào)用 Future 的 get 方法時(shí),經(jīng)過(guò) FutureAdapter 適配,最終會(huì)調(diào)用 ResponseFuture 實(shí)現(xiàn)類對(duì)象的 get 方法,也就是 DefaultFuture 的 get 方法。

到這里關(guān)于 Dubbo 幾種調(diào)用方式的代碼邏輯就分析完了,下面來(lái)分析請(qǐng)求數(shù)據(jù)的發(fā)送與接收,以及響應(yīng)數(shù)據(jù)的發(fā)送與接收過(guò)程。

2.2 服務(wù)消費(fèi)方發(fā)送請(qǐng)求 2.2.1 發(fā)送請(qǐng)求

本節(jié)我們來(lái)看一下同步調(diào)用模式下,服務(wù)消費(fèi)方是如何發(fā)送調(diào)用請(qǐng)求的。在深入分析源碼前,我們先來(lái)看一張圖。

這張圖展示了服務(wù)消費(fèi)方發(fā)送請(qǐng)求過(guò)程的部分調(diào)用棧,略為復(fù)雜。從上圖可以看出,經(jīng)過(guò)多次調(diào)用后,才將請(qǐng)求數(shù)據(jù)送至 Netty NioClientSocketChannel。這樣做的原因是通過(guò) Exchange 層為框架引入 Request 和 Response 語(yǔ)義,這一點(diǎn)會(huì)在接下來(lái)的源碼分析過(guò)程中會(huì)看到。其他的就不多說(shuō)了,下面開(kāi)始進(jìn)行分析。首先分析 ReferenceCountExchangeClient 的源碼。

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) {
        this.client = client;
        // 引用計(jì)數(shù)自增
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
        
        // ...
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接調(diào)用被裝飾對(duì)象的同簽名方法
        return client.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接調(diào)用被裝飾對(duì)象的同簽名方法
        return client.request(request, timeout);
    }

    /** 引用計(jì)數(shù)自增,該方法由外部調(diào)用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
        referenceCount.incrementAndGet();
    }
    
        @Override
    public void close(int timeout) {
        // referenceCount 自減
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else {
                client.close(timeout);
            }
            client = replaceWithLazyClient();
        }
    }
    
    // 省略部分方法
}

ReferenceCountExchangeClient 內(nèi)部定義了一個(gè)引用計(jì)數(shù)變量 referenceCount,每當(dāng)該對(duì)象被引用一次 referenceCount 都會(huì)進(jìn)行自增。每當(dāng) close 方法被調(diào)用時(shí),referenceCount 進(jìn)行自減。ReferenceCountExchangeClient 內(nèi)部?jī)H實(shí)現(xiàn)了一個(gè)引用計(jì)數(shù)的功能,其他方法并無(wú)復(fù)雜邏輯,均是直接調(diào)用被裝飾對(duì)象的相關(guān)方法。所以這里就不多說(shuō)了,繼續(xù)向下分析,這次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient {

    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        
        // 創(chuàng)建 HeaderExchangeChannel 對(duì)象
        this.channel = new HeaderExchangeChannel(client);
        
        // 以下代碼均與心跳檢測(cè)邏輯有關(guān)
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 開(kāi)啟心跳檢測(cè)定時(shí)器
            startHeartbeatTimer();
        }
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
        return channel.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 對(duì)象的同簽名方法
        return channel.request(request, timeout);
    }

    @Override
    public void close() {
        doClose();
        channel.close();
    }
    
    private void doClose() {
        // 停止心跳檢測(cè)定時(shí)器
        stopHeartbeatTimer();
    }

    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection getChannels() {
                            return Collections.singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
                heartbeatTimer.cancel(true);
                scheduled.purge();
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        heartbeatTimer = null;
    }
    
    // 省略部分方法
}

HeaderExchangeClient 中很多方法只有一行代碼,即調(diào)用 HeaderExchangeChannel 對(duì)象的同簽名方法。那 HeaderExchangeClient 有什么用處呢?答案是封裝了一些關(guān)于心跳檢測(cè)的邏輯。心跳檢測(cè)并非本文所關(guān)注的點(diǎn),因此就不多說(shuō)了,繼續(xù)向下看。

final class HeaderExchangeChannel implements ExchangeChannel {
    
    private final Channel channel;
    
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        
        // 這里的 channel 指向的是 NettyClient
        this.channel = channel;
    }
    
    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...");
        }
        // 創(chuàng)建 Request 對(duì)象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        // 設(shè)置雙向通信標(biāo)志為 true
        req.setTwoWay(true);
        // 這里的 request 變量類型為 RpcInvocation
        req.setData(request);
                                        
        // 創(chuàng)建 DefaultFuture 對(duì)象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 調(diào)用 NettyClient 的 send 方法發(fā)送請(qǐng)求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        // 返回 DefaultFuture 對(duì)象
        return future;
    }
}

到這里大家終于看到了 Request 語(yǔ)義了,上面的方法首先定義了一個(gè) Request 對(duì)象,然后再將該對(duì)象傳給 NettyClient 的 send 方法,進(jìn)行后續(xù)的調(diào)用。需要說(shuō)明的是,NettyClient 中并未實(shí)現(xiàn) send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    
    @Override
    public void send(Object message) throws RemotingException {
        // 該方法由 AbstractClient 類實(shí)現(xiàn)
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
    
    // 省略其他方法
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        
        // 獲取 Channel,getChannel 是一個(gè)抽象方法,具體由子類實(shí)現(xiàn)
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        }
        
        // 繼續(xù)向下調(diào)用
        channel.send(message, sent);
    }
    
    protected abstract Channel getChannel();
    
    // 省略其他方法
}

默認(rèn)情況下,Dubbo 使用 Netty 作為底層的通信框架,因此下面我們到 NettyClient 類中看一下 getChannel 方法的實(shí)現(xiàn)邏輯。

public class NettyClient extends AbstractClient {
    
    // 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 獲取一個(gè) NettyChannel 類型對(duì)象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap channelMap = 
        new ConcurrentHashMap();

    private final org.jboss.netty.channel.Channel channel;
    
    /** 私有構(gòu)造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        
        // 嘗試從集合中獲取 NettyChannel 實(shí)例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則創(chuàng)建一個(gè)新的 NettyChannel 實(shí)例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 將  鍵值對(duì)存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }
}

獲取到 NettyChannel 實(shí)例后,即可進(jìn)行后續(xù)的調(diào)用。下面看一下 NettyChannel 的 send 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發(fā)送消息(包含請(qǐng)求和響應(yīng)消息)
        ChannelFuture future = channel.write(message);
        
        // sent 的值源于  中 sent 的配置值,有兩種配置值:
        //   1. true: 等待消息發(fā)出,消息發(fā)送失敗將拋出異常
        //   2. false: 不等待消息發(fā)出,將消息放入 IO 隊(duì)列,即刻返回
        // 默認(rèn)情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發(fā)出,若在規(guī)定時(shí)間沒(méi)能發(fā)出,success 會(huì)被置為 false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");
    }

    // 若 success 為 false,這里拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
    }
}

經(jīng)歷多次調(diào)用,到這里請(qǐng)求數(shù)據(jù)的發(fā)送過(guò)程就結(jié)束了,過(guò)程漫長(zhǎng)。為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個(gè)調(diào)用路徑貼出來(lái)。

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個(gè) Filter 調(diào)用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

在 Netty 中,出站數(shù)據(jù)在發(fā)出之前還需要進(jìn)行編碼操作,接下來(lái)我們來(lái)分析一下請(qǐng)求數(shù)據(jù)的編碼邏輯。

2.2.2 請(qǐng)求編碼

在分析請(qǐng)求編碼邏輯之前,我們先來(lái)看一下 Dubbo 數(shù)據(jù)包結(jié)構(gòu)。

Dubbo 數(shù)據(jù)包分為消息頭和消息體,消息頭用于存儲(chǔ)一些元信息,比如魔數(shù)(Magic),數(shù)據(jù)包類型(Request/Response),消息體長(zhǎng)度(Data Length)等。消息體中用于存儲(chǔ)具體的調(diào)用消息,比如方法名稱,參數(shù)列表等。下面簡(jiǎn)單列舉一下消息頭的內(nèi)容。

偏移量(Bit) 字段 取值
0 ~ 7 魔數(shù)高位 0xda00
8 ~ 15 魔數(shù)低位 0xbb
16 數(shù)據(jù)包類型 0 - Response, 1 - Request
17 調(diào)用方式 僅在第16位被設(shè)為1的情況下有效,0 - 單向調(diào)用,1 - 雙向調(diào)用
18 事件標(biāo)識(shí) 0 - 當(dāng)前數(shù)據(jù)包是請(qǐng)求或響應(yīng)包,1 - 當(dāng)前數(shù)據(jù)包是心跳包
19 ~ 23 序列化器編號(hào) 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~ 31 狀態(tài) 20 - OK
30 - CLIENT_TIMEOUT
31 - SERVER_TIMEOUT
40 - BAD_REQUEST
50 - BAD_RESPONSE
......
32 ~ 95 請(qǐng)求編號(hào) 共8字節(jié),運(yùn)行時(shí)生成
96 ~ 127 消息體長(zhǎng)度 運(yùn)行時(shí)計(jì)算

了解了 Dubbo 數(shù)據(jù)包格式,接下來(lái)我們就可以探索編碼過(guò)程了。這次我們開(kāi)門(mén)見(jiàn)山,直接分析編碼邏輯所在類。如下:

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長(zhǎng)度
    protected static final int HEADER_LENGTH = 16;
    // 魔數(shù)內(nèi)容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對(duì) Request 對(duì)象進(jìn)行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對(duì) Response 對(duì)象進(jìn)行編碼,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 創(chuàng)建消息頭字節(jié)數(shù)組,長(zhǎng)度為 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設(shè)置魔數(shù)
        Bytes.short2bytes(MAGIC, header);

        // 設(shè)置數(shù)據(jù)包類型(Request/Response)和序列化器編號(hào)
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設(shè)置通信方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        
        // 設(shè)置事件標(biāo)識(shí)
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 設(shè)置請(qǐng)求編號(hào),8個(gè)字節(jié),從第4個(gè)字節(jié)開(kāi)始設(shè)置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當(dāng)前的寫(xiě)位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,為消息頭預(yù)留 16 個(gè)字節(jié)的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 創(chuàng)建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對(duì)事件數(shù)據(jù)進(jìn)行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 對(duì)請(qǐng)求數(shù)據(jù)進(jìn)行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 獲取寫(xiě)入的字節(jié)數(shù),也就是消息體長(zhǎng)度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長(zhǎng)度寫(xiě)入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動(dòng)到 savedWriteIndex,為寫(xiě)消息頭做準(zhǔn)備
        buffer.writerIndex(savedWriteIndex);
        // 從 savedWriteIndex 下標(biāo)處寫(xiě)入消息頭
        buffer.writeBytes(header);
        // 設(shè)置新的 writerIndex,writerIndex = 原寫(xiě)下標(biāo) + 消息頭長(zhǎng)度 + 消息體長(zhǎng)度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其他方法
}

以上就是請(qǐng)求對(duì)象的編碼過(guò)程,該過(guò)程首先會(huì)通過(guò)位運(yùn)算將消息頭寫(xiě)入到 header 數(shù)組中。然后對(duì) Request 對(duì)象的 data 字段執(zhí)行序列化操作,序列化后的數(shù)據(jù)最終會(huì)存儲(chǔ)到 ChannelBuffer 中。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長(zhǎng)度 len,緊接著將 len 寫(xiě)入到 header 指定位置處。最后再將消息頭字節(jié)數(shù)組 header 寫(xiě)入到 ChannelBuffer 中,整個(gè)編碼過(guò)程就結(jié)束了。本節(jié)的最后,我們?cè)賮?lái)看一下 Request 對(duì)象的 data 字段序列化過(guò)程,也就是 encodeRequestData 方法的邏輯,如下:

public class DubboCodec extends ExchangeCodec implements Codec2 {
    
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化調(diào)用方法名
        out.writeUTF(inv.getMethodName());
        // 將參數(shù)類型轉(zhuǎn)換為字符串,并進(jìn)行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 對(duì)運(yùn)行時(shí)參數(shù)進(jìn)行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
}

至此,關(guān)于服務(wù)消費(fèi)方發(fā)送請(qǐng)求的過(guò)程就分析完了,接下來(lái)我們來(lái)看一下服務(wù)提供方是如何接收請(qǐng)求的。

2.3 服務(wù)提供方接收請(qǐng)求

前面說(shuō)過(guò),默認(rèn)情況下 Dubbo 使用 Netty 作為底層的通信框架。Netty 檢測(cè)到有數(shù)據(jù)入站后,首先會(huì)通過(guò)解碼器對(duì)數(shù)據(jù)進(jìn)行解碼,并將解碼后的數(shù)據(jù)傳遞給下一個(gè)入站處理器的指定方法。所以在進(jìn)行后續(xù)的分析之前,我們先來(lái)看一下數(shù)據(jù)解碼過(guò)程。

2.3.1 請(qǐng)求解碼

這里直接分析請(qǐng)求數(shù)據(jù)的解碼邏輯,忽略中間過(guò)程,如下:

public class ExchangeCodec extends TelnetCodec {
    
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        // 創(chuàng)建消息頭字節(jié)數(shù)組
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        // 讀取消息頭數(shù)據(jù)
        buffer.readBytes(header);
        // 調(diào)用重載方法進(jìn)行后續(xù)解碼工作
        return decode(channel, buffer, readable, header);
    }

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 檢查魔數(shù)是否相等
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 通過(guò) telnet 命令行發(fā)送的數(shù)據(jù)包不包含消息頭,所以這里
            // 調(diào)用 TelnetCodec 的 decode 方法對(duì)數(shù)據(jù)包進(jìn)行解碼
            return super.decode(channel, buffer, readable, header);
        }
        
        // 檢測(cè)可讀數(shù)據(jù)量是否少于消息頭長(zhǎng)度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 從消息頭中獲取消息體長(zhǎng)度
        int len = Bytes.bytes2int(header, 12);
        // 檢測(cè)消息體長(zhǎng)度是否超出限制,超出則拋出異常
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 檢測(cè)可讀的字節(jié)數(shù)是否小于實(shí)際的字節(jié)數(shù)
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 繼續(xù)進(jìn)行解碼工作
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

上面方法通過(guò)檢測(cè)消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包,比如通過(guò) telnet 命令行發(fā)出的數(shù)據(jù)包。接著再對(duì)消息體長(zhǎng)度,以及可讀字節(jié)數(shù)進(jìn)行檢測(cè)。最后調(diào)用 decodeBody 方法進(jìn)行后續(xù)的解碼工作,ExchangeCodec 中實(shí)現(xiàn)了 decodeBody 方法,但因其子類 DubboCodec 覆寫(xiě)了該方法,所以在運(yùn)行時(shí) DubboCodec 中的 decodeBody 方法會(huì)被調(diào)用。下面我們來(lái)看一下該方法的代碼。

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 獲取消息頭中的第三個(gè)字節(jié),并通過(guò)邏輯與運(yùn)算得到序列化器編號(hào)
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取調(diào)用編號(hào)
        long id = Bytes.bytes2long(header, 4);
        // 通過(guò)邏輯與運(yùn)算得到調(diào)用類型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // 對(duì)響應(yīng)結(jié)果進(jìn)行解碼,得到 Response 對(duì)象。這個(gè)非本節(jié)內(nèi)容,后面再分析
            // ...
        } else {
            // 創(chuàng)建 Request 對(duì)象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            // 通過(guò)邏輯與運(yùn)算得到通信方式,并設(shè)置到 Request 對(duì)象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            
            // 通過(guò)位運(yùn)算檢測(cè)數(shù)據(jù)包是否為事件類型
            if ((flag & FLAG_EVENT) != 0) {
                // 設(shè)置心跳事件到 Request 對(duì)象中
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    // 對(duì)心跳包進(jìn)行解碼,該方法已被標(biāo)注為廢棄
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    // 對(duì)事件數(shù)據(jù)進(jìn)行解碼
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    // 根據(jù) url 參數(shù)判斷是否在 IO 線程上對(duì)消息體進(jìn)行解碼
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        // 在當(dāng)前線程,也就是 IO 線程上進(jìn)行后續(xù)的解碼工作。此工作完成后,可將
                        // 調(diào)用方法名、attachment、以及調(diào)用參數(shù)解析出來(lái)
                        inv.decode();
                    } else {
                        // 僅創(chuàng)建 DecodeableRpcInvocation 對(duì)象,但不在當(dāng)前線程上執(zhí)行解碼邏輯
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                
                // 設(shè)置 data 到 Request 對(duì)象中
                req.setData(data);
            } catch (Throwable t) {
                // 若解碼過(guò)程中出現(xiàn)異常,則將 broken 字段設(shè)為 true,
                // 并將異常對(duì)象設(shè)置到 Reqeust 對(duì)象中
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
}

如上,decodeBody 對(duì)部分字段進(jìn)行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會(huì)調(diào)用 DecodeableRpcInvocation 的 decode 方法進(jìn)行后續(xù)的解碼工作。此工作完成后,可將調(diào)用方法名、attachment、以及運(yùn)行時(shí)調(diào)用參數(shù)解析出來(lái)。下面我們來(lái)看一下 DecodeableRpcInvocation 的 decode 方法邏輯。

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 通過(guò)反序列化得到 dubbo version,并保存到 attachments 變量中
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 通過(guò)反序列化得到 path,version,并保存到 attachments 變量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 通過(guò)反序列化得到調(diào)用方法名
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class[] pts;
            // 通過(guò)反序列化得到參數(shù)類型字符串,比如 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 將 desc 解析為參數(shù)類型數(shù)組
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析運(yùn)行時(shí)參數(shù)
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 設(shè)置參數(shù)類型數(shù)組
            setParameterTypes(pts);

            // 通過(guò)反序列化得到原 attachments 的內(nèi)容
            Map map = (Map) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }
                // 將 map 與當(dāng)前對(duì)象中的 attachment 集合進(jìn)行融合
                attachment.putAll(map);
                setAttachments(attachment);
            }
            
            // 對(duì) callback 類型的參數(shù)進(jìn)行處理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            // 設(shè)置參數(shù)列表
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;
    }
}

上面的方法通過(guò)反序列化將諸如 path、version、調(diào)用方法名、參數(shù)列表等信息依次解析出來(lái),并設(shè)置到相應(yīng)的字段中,最終得到一個(gè)具有完整調(diào)用信息的 DecodeableRpcInvocation 對(duì)象。

到這里,請(qǐng)求數(shù)據(jù)解碼的過(guò)程就分析完了。此時(shí)我們得到了一個(gè) Request 對(duì)象,這個(gè)對(duì)象會(huì)被傳送到下一個(gè)入站處理器中,我們繼續(xù)往下看。

2.3.2 調(diào)用服務(wù)

解碼器將數(shù)據(jù)包解析成 Request 對(duì)象后,NettyHandler 的 messageReceived 方法緊接著會(huì)收到這個(gè)對(duì)象,并將這個(gè)對(duì)象繼續(xù)向下傳遞。這期間該對(duì)象會(huì)被依次傳遞給 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 將該對(duì)象封裝到 Runnable 實(shí)現(xiàn)類對(duì)象中,并將 Runnable 放入線程池中執(zhí)行后續(xù)的調(diào)用邏輯。整個(gè)調(diào)用棧如下:

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由線程池執(zhí)行后續(xù)的調(diào)用邏輯

考慮到篇幅,以及很多中間調(diào)用的邏輯并非十分重要,所以這里就不對(duì)調(diào)用棧中的每個(gè)方法都進(jìn)行分析了。這里我們直接分析調(diào)用棧中的分析第一個(gè)和最后一個(gè)調(diào)用方法邏輯。如下:

@Sharable
public class NettyHandler extends SimpleChannelHandler {
    
    private final Map channels = new ConcurrentHashMap();

    private final URL url;

    private final ChannelHandler handler;
    
    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        
        // 這里的 handler 類型為 NettyServer
        this.handler = handler;
    }
    
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 獲取 NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            // 繼續(xù)向下調(diào)用
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}

如上,NettyHandler 中的 messageReceived 邏輯比較簡(jiǎn)單。首先根據(jù)一些信息獲取 NettyChannel 實(shí)例,然后將 NettyChannel 實(shí)例以及 Request 對(duì)象向下傳遞。下面再來(lái)看看 AllChannelHandler 的邏輯,在詳細(xì)分析代碼之前,我們先來(lái)了解一下 Dubbo 中的線程派發(fā)模型。

2.3.2.1 線程派發(fā)模型

Dubbo 將底層通信框架中接收請(qǐng)求的線程稱為 IO 線程。如果一些事件處理邏輯可以很快執(zhí)行完,比如只在內(nèi)存打一個(gè)標(biāo)記,此時(shí)直接在 IO 線程上執(zhí)行該段邏輯即可。但如果事件的處理邏輯比較耗時(shí),比如該段邏輯會(huì)發(fā)起數(shù)據(jù)庫(kù)查詢或者 HTTP 請(qǐng)求。此時(shí)我們就不應(yīng)該讓事件處理邏輯在 IO 線程上執(zhí)行,而是應(yīng)該派發(fā)到線程池中去執(zhí)行。原因也很簡(jiǎn)單,IO 線程主要用于接收請(qǐng)求,如果 IO 線程被占滿,將導(dǎo)致它不能接收新的請(qǐng)求。

以上就是線程派發(fā)的背景,下面我們?cè)賮?lái)通過(guò) Dubbo 調(diào)用圖,看一下線程派發(fā)器所處的位置。

如上圖,紅框中的 Dispatcher 就是線程派發(fā)器。需要說(shuō)明的是,Dispatcher 真實(shí)的職責(zé)創(chuàng)建具有線程派發(fā)能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具備線程派發(fā)能力。Dubbo 支持 5 種不同的線程派發(fā)策略,下面通過(guò)一個(gè)表格列舉一下。

策略 用途
all 所有消息都派發(fā)到線程池,包括請(qǐng)求,響應(yīng),連接事件,斷開(kāi)事件等
direct 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行
message 只有請(qǐng)求響應(yīng)消息派發(fā)到線程池,其它消息均在 IO 線程上執(zhí)行
execution 只有請(qǐng)求消息派發(fā)到線程池,不含響應(yīng)。其它消息均在 IO 線程上執(zhí)行
connection 在 IO 線程上,將連接斷開(kāi)事件放入隊(duì)列,有序逐個(gè)執(zhí)行,其它消息派發(fā)到線程池

默認(rèn)配置下,Dubbo 使用 all 派發(fā)策略,即將所有的消息都派發(fā)到線程池中。下面我們來(lái)分析一下 AllChannelHandler 的代碼。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /** 處理連接事件 */
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 獲取線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將連接事件派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., " error when process connected event .", t);
        }
    }

    /** 處理斷開(kāi)事件 */
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process disconnected event .", t);
        }
    }

    /** 處理請(qǐng)求和響應(yīng)消息,這里的 message 變量類型可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請(qǐng)求和響應(yīng)消息派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 如果通信方式為雙向通信,此時(shí)將 Server side ... threadpool is exhausted 
                // 錯(cuò)誤信息封裝到 Response 中,并返回給服務(wù)消費(fèi)方。
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                        + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    // 返回包含錯(cuò)誤信息的 Response 對(duì)象
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(..., " error when process received event .", t);
        }
    }

    /** 處理異常信息 */
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process caught event ...");
        }
    }
}

如上,請(qǐng)求對(duì)象會(huì)被封裝 ChannelEventRunnable 中,ChannelEventRunnable 將會(huì)是服務(wù)調(diào)用過(guò)程的新起點(diǎn)。所以接下來(lái)我們以 ChannelEventRunnable 為起點(diǎn)向下探索。

2.3.2.2 調(diào)用服務(wù)

本小節(jié),我們從 ChannelEventRunnable 開(kāi)始分析,該類的主要代碼如下:

public class ChannelEventRunnable implements Runnable {
    
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    
    @Override
    public void run() {
        // 檢測(cè)通道狀態(tài),對(duì)于請(qǐng)求或響應(yīng)消息,此時(shí) state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對(duì)象,進(jìn)行后續(xù)的調(diào)用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
            }
        } 
        
        // 其他消息類型通過(guò) switch 進(jìn)行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
                }
                break;
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}

如上,請(qǐng)求和響應(yīng)消息出現(xiàn)頻率明顯比其他類型消息高,所以這里對(duì)該類型的消息進(jìn)行了針對(duì)性判斷。ChannelEventRunnable 僅是一個(gè)中轉(zhuǎn)站,它的 run 方法中并不包含具體的調(diào)用邏輯,僅用于將參數(shù)傳給其他 ChannelHandler 對(duì)象進(jìn)行處理,該對(duì)象類型為 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對(duì) Decodeable 接口實(shí)現(xiàn)類對(duì)象進(jìn)行解碼
            decode(message);
        }

        if (message instanceof Request) {
            // 對(duì) Request 的 data 字段進(jìn)行解碼
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 對(duì) Request 的 result 字段進(jìn)行解碼
            decode(((Response) message).getResult());
        }

        // 執(zhí)行后續(xù)邏輯
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有兩個(gè)實(shí)現(xiàn)類,
        // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執(zhí)行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            }
        }
    }
}

DecodeHandler 主要是包含了一些解碼邏輯。2.2.1 節(jié)分析請(qǐng)求解碼時(shí)說(shuō)過(guò),請(qǐng)求解碼可在 IO 線程上執(zhí)行,也可在線程池中執(zhí)行,這個(gè)取決于運(yùn)行時(shí)配置。DecodeHandler 存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線程池中被解碼。解碼完畢后,完全解碼后的 Request 對(duì)象會(huì)繼續(xù)向后傳遞,下一站是 HeaderExchangeHandler。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.handler = handler;
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請(qǐng)求對(duì)象
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                } 
                // 處理普通的請(qǐng)求
                else {
                    // 雙向通信
                    if (request.isTwoWay()) {
                        // 向后調(diào)用服務(wù),并得到調(diào)用結(jié)果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調(diào)用結(jié)果返回給服務(wù)消費(fèi)端
                        channel.send(response);
                    } 
                    // 如果是單向通信,僅向后調(diào)用指定服務(wù)即可,無(wú)需返回調(diào)用結(jié)果
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }      
            // 處理響應(yīng)對(duì)象,服務(wù)消費(fèi)方會(huì)執(zhí)行此處邏輯,后面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關(guān),忽略
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測(cè)請(qǐng)求是否合法,不合法則返回狀態(tài)碼為 BAD_REQUEST 的響應(yīng)
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null)
                msg = null;
            else if
                (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            // 設(shè)置 BAD_REQUEST 狀態(tài)
            res.setStatus(Response.BAD_REQUEST);

            return res;
        }
        
        // 獲取 data 字段值,也就是 RpcInvocation 對(duì)象
        Object msg = req.getData();
        try {
            // 繼續(xù)向下調(diào)用
            Object result = handler.reply(channel, msg);
            // 設(shè)置 OK 狀態(tài)碼
            res.setStatus(Response.OK);
            // 設(shè)置調(diào)用結(jié)果
            res.setResult(result);
        } catch (Throwable e) {
            // 若調(diào)用過(guò)程出現(xiàn)異常,則設(shè)置 SERVICE_ERROR,表示服務(wù)端異常
            res.s           
               
                                           
                       
                 
            
                     
             
               

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/72896.html

相關(guān)文章

  • Dubbo 源碼分析 - 集群容錯(cuò)之 Cluster

    摘要:集群用途是將多個(gè)服務(wù)提供者合并為一個(gè),并將這個(gè)暴露給服務(wù)消費(fèi)者。比如發(fā)請(qǐng)求,接受服務(wù)提供者返回的數(shù)據(jù)等。如果包含,表明對(duì)應(yīng)的服務(wù)提供者可能因網(wǎng)絡(luò)原因未能成功提供服務(wù)。如果不包含,此時(shí)還需要進(jìn)行可用性檢測(cè),比如檢測(cè)服務(wù)提供者網(wǎng)絡(luò)連通性等。 1.簡(jiǎn)介 為了避免單點(diǎn)故障,現(xiàn)在的應(yīng)用至少會(huì)部署在兩臺(tái)服務(wù)器上。對(duì)于一些負(fù)載比較高的服務(wù),會(huì)部署更多臺(tái)服務(wù)器。這樣,同一環(huán)境下的服務(wù)提供者數(shù)量會(huì)大于1...

    denson 評(píng)論0 收藏0
  • Dubbo 源碼分析 - 服務(wù)導(dǎo)出

    摘要:支持兩種服務(wù)導(dǎo)出方式,分別延遲導(dǎo)出和立即導(dǎo)出。本文打算分析服務(wù)延遲導(dǎo)出過(guò)程,因此不會(huì)分析方法。服務(wù)導(dǎo)出之前,要進(jìn)行對(duì)一系列的配置進(jìn)行檢查,以及生成。返回時(shí),表示需要延遲導(dǎo)出。賽程預(yù)告,下一站是服務(wù)導(dǎo)出的前置工作。 1.服務(wù)導(dǎo)出過(guò)程 本篇文章,我們來(lái)研究一下 Dubbo 導(dǎo)出服務(wù)的過(guò)程。Dubbo 服務(wù)導(dǎo)出過(guò)程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會(huì)立即執(zhí)行服...

    劉玉平 評(píng)論0 收藏0
  • dubbo源碼解析(四十五)服務(wù)引用過(guò)程

    摘要:服務(wù)引用過(guò)程目標(biāo)從源碼的角度分析服務(wù)引用過(guò)程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過(guò)程 目標(biāo):從源碼的角度分析服務(wù)引用過(guò)程。 前言 前面服務(wù)暴露過(guò)程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來(lái)進(jìn)行引用,這種方式更多的時(shí)候被用來(lái)做服務(wù)測(cè)試,不...

    xiaowugui666 評(píng)論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務(wù)提供者代碼上面這個(gè)類會(huì)被封裝成為一個(gè)實(shí)例,并新生成一個(gè)實(shí)例。這樣當(dāng)網(wǎng)絡(luò)通訊層收到一個(gè)請(qǐng)求后,會(huì)找到對(duì)應(yīng)的實(shí)例,并調(diào)用它所對(duì)應(yīng)的實(shí)例,從而真正調(diào)用了服務(wù)提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進(jìn)行源碼學(xué)習(xí)。總結(jié)起來(lái)就是先總體,后局部.也就是先把需要注意的概念先拋出來(lái),把整體架構(gòu)圖先畫(huà)出來(lái).讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們?cè)谀?我們下一...

    Meathill 評(píng)論0 收藏0
  • dubbo源碼解析(四十四)服務(wù)暴露過(guò)程

    摘要:服務(wù)暴露過(guò)程目標(biāo)從源碼的角度分析服務(wù)暴露過(guò)程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠(yuǎn)程兩個(gè)過(guò)程。其中服務(wù)暴露的第八步已經(jīng)沒(méi)有了。將泛化調(diào)用版本號(hào)或者等信息加入獲得服務(wù)暴露地址和端口號(hào),利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過(guò)程 目標(biāo):從源碼的角度分析服務(wù)暴露過(guò)程。 前言 本來(lái)這一篇一個(gè)寫(xiě)異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫(xiě)才能更加的讓讀者理解。我覺(jué)...

    light 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<