国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(二十二)遠(yuǎn)程調(diào)用——Protocol

孫淑建 / 3454人閱讀

摘要:七該類也實(shí)現(xiàn)了,也是裝飾了接口,但是它是在服務(wù)引用和暴露過程中加上了監(jiān)聽器的功能。如果是注冊中心,則暴露該創(chuàng)建一個(gè)暴露者監(jiān)聽器包裝類對象該方法是在服務(wù)暴露上做了監(jiān)聽器功能的增強(qiáng),也就是加上了監(jiān)聽器。

遠(yuǎn)程調(diào)用——Protocol
目標(biāo):介紹遠(yuǎn)程調(diào)用中協(xié)議的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-api中的各種protocol包的源碼,是重點(diǎn)內(nèi)容。
前言

在遠(yuǎn)程調(diào)用中協(xié)議是非常重要的一層,看下面這張圖:

該層是在信息交換層之上,分為了并且夾雜在服務(wù)暴露和服務(wù)引用中間,為了有一個(gè)約定的方式進(jìn)行調(diào)用。

dubbo支持不同協(xié)議的擴(kuò)展,比如http、thrift等等,具體的可以參照官方文檔。本文講解的源碼大部分是對于公共方法的實(shí)現(xiàn),而具體的服務(wù)暴露和服務(wù)引用會在各個(gè)協(xié)議實(shí)現(xiàn)中講到。

下面是該包下面的類圖:

源碼分析 (一)AbstractProtocol

該類是協(xié)議的抽象類,實(shí)現(xiàn)了Protocol接口,其中實(shí)現(xiàn)了一些公共的方法,抽象方法在它的子類AbstractProxyProtocol中定義。

1.屬性
/**
 * 服務(wù)暴露者集合
 */
protected final Map> exporterMap = new ConcurrentHashMap>();

/**
 * 服務(wù)引用者集合
 */
//TODO SOFEREFENCE
protected final Set> invokers = new ConcurrentHashSet>();
2.serviceKey
protected static String serviceKey(URL url) {
    // 獲得綁定的端口號
    int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
    return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
            url.getParameter(Constants.GROUP_KEY));
}

protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
    return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}

該方法是為了得到服務(wù)key group+"/"+serviceName+":"+serviceVersion+":"+port

3.destroy
@Override
public void destroy() {
    // 遍歷服務(wù)引用實(shí)體
    for (Invoker invoker : invokers) {
        if (invoker != null) {
            // 從集合中移除
            invokers.remove(invoker);
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Destroy reference: " + invoker.getUrl());
                }
                // 銷毀
                invoker.destroy();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 遍歷服務(wù)暴露者
    for (String key : new ArrayList(exporterMap.keySet())) {
        // 從集合中移除
        Exporter exporter = exporterMap.remove(key);
        if (exporter != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                }
                // 取消暴露
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

該方法是對invoker和exporter的銷毀。

(二)AbstractProxyProtocol

該類繼承了AbstractProtocol類,其中利用了代理工廠對AbstractProtocol中的兩個(gè)集合進(jìn)行了填充,并且對異常做了處理。

1.屬性
/**
 * rpc的異常類集合
 */
private final List> rpcExceptions = new CopyOnWriteArrayList>();

/**
 * 代理工廠
 */
private ProxyFactory proxyFactory;
2.export
@Override
@SuppressWarnings("unchecked")
public  Exporter export(final Invoker invoker) throws RpcException {
    // 獲得uri
    final String uri = serviceKey(invoker.getUrl());
    // 獲得服務(wù)暴露者
    Exporter exporter = (Exporter) exporterMap.get(uri);
    if (exporter != null) {
        return exporter;
    }
    // 新建一個(gè)線程
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
    exporter = new AbstractExporter(invoker) {
        /**
         * 取消暴露
         */
        @Override
        public void unexport() {
            super.unexport();
            // 移除該key對應(yīng)的服務(wù)暴露者
            exporterMap.remove(uri);
            if (runnable != null) {
                try {
                    // 啟動線程
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    };
    // 加入集合
    exporterMap.put(uri, exporter);
    return exporter;
}

其中分為兩個(gè)步驟,創(chuàng)建一個(gè)exporter,放入到集合匯中。在創(chuàng)建exporter時(shí)對unexport方法進(jìn)行了重寫。

3.refer
@Override
public  Invoker refer(final Class type, final URL url) throws RpcException {
    // 通過代理獲得實(shí)體域
    final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url);
    Invoker invoker = new AbstractInvoker(type, url) {
        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            try {
                // 獲得調(diào)用結(jié)果
                Result result = target.invoke(invocation);
                Throwable e = result.getException();
                // 如果拋出異常,則拋出相應(yīng)異常
                if (e != null) {
                    for (Class rpcException : rpcExceptions) {
                        if (rpcException.isAssignableFrom(e.getClass())) {
                            throw getRpcException(type, url, invocation, e);
                        }
                    }
                }
                return result;
            } catch (RpcException e) {
                // 拋出未知異常
                if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                    e.setCode(getErrorCode(e.getCause()));
                }
                throw e;
            } catch (Throwable e) {
                throw getRpcException(type, url, invocation, e);
            }
        }
    };
    // 加入集合
    invokers.add(invoker);
    return invoker;
}

該方法是服務(wù)引用,先從代理工廠中獲得Invoker對象target,然后創(chuàng)建了真實(shí)的invoker在重寫方法中調(diào)用代理的方法,最后加入到集合。

protected abstract  Runnable doExport(T impl, Class type, URL url) throws RpcException;

protected abstract  T doRefer(Class type, URL url) throws RpcException;

可以看到其中抽象了服務(wù)引用和暴露的方法,讓各類協(xié)議各自實(shí)現(xiàn)。

(三)AbstractInvoker

該類是invoker的抽象方法,因?yàn)閰f(xié)議被夾在服務(wù)引用和服務(wù)暴露中間,無論什么協(xié)議都有一些通用的Invoker和exporter的方法實(shí)現(xiàn),而該類就是實(shí)現(xiàn)了Invoker的公共方法,而把doInvoke抽象出來,讓子類只關(guān)注這個(gè)方法。

1.屬性
/**
 * 服務(wù)類型
 */
private final Class type;

/**
 * url對象
 */
private final URL url;

/**
 * 附加值
 */
private final Map attachment;

/**
 * 是否可用
 */
private volatile boolean available = true;

/**
 *  是否銷毀
 */
private AtomicBoolean destroyed = new AtomicBoolean(false);
2.convertAttachment
private static Map convertAttachment(URL url, String[] keys) {
    if (keys == null || keys.length == 0) {
        return null;
    }
    Map attachment = new HashMap();
    // 遍歷key,把值放入附加值集合中
    for (String key : keys) {
        String value = url.getParameter(key);
        if (value != null && value.length() > 0) {
            attachment.put(key, value);
        }
    }
    return attachment;
}

該方法是轉(zhuǎn)化為附加值,把url中的值轉(zhuǎn)化為服務(wù)調(diào)用invoker的附加值。

3.invoke
@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let"s allow the current invoke to proceed
    // 如果服務(wù)引用銷毀,則打印告警日志,但是通過
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }

    RpcInvocation invocation = (RpcInvocation) inv;
    // 會話域中加入該調(diào)用鏈
    invocation.setInvoker(this);
    // 把附加值放入會話域
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    // 把上下文的附加值放入會話域
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }
    // 如果開啟的是異步調(diào)用,則把該設(shè)置也放入附加值
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    // 加入編號
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        // 執(zhí)行調(diào)用鏈
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

該方法做了一些公共的操作,比如服務(wù)引用銷毀的檢測,加入附加值,加入調(diào)用鏈實(shí)體域到會話域中等。然后執(zhí)行了doInvoke抽象方法。各協(xié)議自己去實(shí)現(xiàn)。

(四)AbstractExporter

該類和AbstractInvoker類似,也是在服務(wù)暴露中實(shí)現(xiàn)了一些公共方法。

1.屬性
/**
 * 實(shí)體域
 */
private final Invoker invoker;

/**
 * 是否取消暴露服務(wù)
 */
private volatile boolean unexported = false;
2.unexport
@Override
public void unexport() {
    // 如果已經(jīng)消取消暴露,則之間返回
    if (unexported) {
        return;
    }
    // 設(shè)置為true
    unexported = true;
    // 銷毀該實(shí)體域
    getInvoker().destroy();
}
(五)InvokerWrapper

該類是Invoker的包裝類,其中用到類裝飾模式,不過并沒有實(shí)現(xiàn)實(shí)際的功能增強(qiáng)。

public class InvokerWrapper implements Invoker {

    /**
     * invoker對象
     */
    private final Invoker invoker;

    private final URL url;

    public InvokerWrapper(Invoker invoker, URL url) {
        this.invoker = invoker;
        this.url = url;
    }

    @Override
    public Class getInterface() {
        return invoker.getInterface();
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public boolean isAvailable() {
        return invoker.isAvailable();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

    @Override
    public void destroy() {
        invoker.destroy();
    }

}
(六)ProtocolFilterWrapper

該類實(shí)現(xiàn)了Protocol接口,其中也用到了裝飾模式,是對Protocol的裝飾,是在服務(wù)引用和暴露的方法上加上了過濾器功能。

1.buildInvokerChain
private static  Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
    Invoker last = invoker;
    // 獲得過濾器的所有擴(kuò)展實(shí)現(xiàn)類實(shí)例集合
    List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        // 從最后一個(gè)過濾器開始循環(huán),創(chuàng)建一個(gè)帶有過濾器鏈的invoker對象
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            // 記錄last的invoker
            final Invoker next = last;
            // 新建last
            last = new Invoker() {

                @Override
                public Class getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                /**
                 * 關(guān)鍵在這里,調(diào)用下一個(gè)filter代表的invoker,把每一個(gè)過濾器串起來
                 * @param invocation
                 * @return
                 * @throws RpcException
                 */
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

該方法就是創(chuàng)建帶 Filter 鏈的 Invoker 對象。倒序的把每一個(gè)過濾器串連起來,形成一個(gè)invoker。

2.export
@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果是注冊中心,則直接暴露服務(wù)
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 服務(wù)提供側(cè)暴露服務(wù)
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

該方法是在服務(wù)暴露上做了過濾器鏈的增強(qiáng),也就是加上了過濾器。

3.refer
@Override
public  Invoker refer(Class type, URL url) throws RpcException {
    // 如果是注冊中心,則直接引用
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 消費(fèi)者側(cè)引用服務(wù)
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

該方法是在服務(wù)引用上做了過濾器鏈的增強(qiáng),也就是加上了過濾器。

(七)ProtocolListenerWrapper

該類也實(shí)現(xiàn)了Protocol,也是裝飾了Protocol接口,但是它是在服務(wù)引用和暴露過程中加上了監(jiān)聽器的功能。

1.export
@Override
public  Exporter export(Invoker invoker) throws RpcException {
    // 如果是注冊中心,則暴露該invoker
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 創(chuàng)建一個(gè)暴露者監(jiān)聽器包裝類對象
    return new ListenerExporterWrapper(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

該方法是在服務(wù)暴露上做了監(jiān)聽器功能的增強(qiáng),也就是加上了監(jiān)聽器。

2.refer
@Override
public  Invoker refer(Class type, URL url) throws RpcException {
    // 如果是注冊中心。則直接引用服務(wù)
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 創(chuàng)建引用服務(wù)監(jiān)聽器包裝類對象
    return new ListenerInvokerWrapper(protocol.refer(type, url),
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                            .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

該方法是在服務(wù)引用上做了監(jiān)聽器功能的增強(qiáng),也就是加上了監(jiān)聽器。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了遠(yuǎn)程調(diào)用中關(guān)于協(xié)議的部分,其實(shí)就是講了一些公共的方法,并且把關(guān)鍵方法抽象出來讓子類實(shí)現(xiàn),具體的方法實(shí)現(xiàn)都在各個(gè)協(xié)議中自己實(shí)現(xiàn)。接下來我將開始對rpc模塊的代理進(jìn)行講解。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72957.html

相關(guān)文章

  • dubbo源碼解析(四十六)消費(fèi)端發(fā)送請求過程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請求過程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評論0 收藏0
  • dubbo源碼解析(四十七)服務(wù)端處理請求過程

    摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。 2.7大揭秘——服務(wù)端處理請求過程 目標(biāo):從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當(dāng)服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請求的時(shí)候已經(jīng)做了編碼,所以我...

    yzzz 評論0 收藏0
  • dubbo源碼解析(四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費(fèi)端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析二十七)遠(yuǎn)程調(diào)用——injvm本地調(diào)用

    摘要:遠(yuǎn)程調(diào)用本地調(diào)用目標(biāo)介紹本地調(diào)用的設(shè)計(jì)和實(shí)現(xiàn),介紹的源碼。前言是一個(gè)遠(yuǎn)程調(diào)用的框架,但是它沒有理由不支持本地調(diào)用,本文就要講解關(guān)于本地調(diào)用的實(shí)現(xiàn)。服務(wù)暴露者集合取消暴露調(diào)用父類的取消暴露方法從集合中移除二該類繼承了類,是本地調(diào)用的實(shí)現(xiàn)。 遠(yuǎn)程調(diào)用——injvm本地調(diào)用 目標(biāo):介紹injvm本地調(diào)用的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-injvm的源碼。 前言 dubbo是一個(gè)遠(yuǎn)程調(diào)用的...

    sean 評論0 收藏0
  • dubbo源碼解析二十六)遠(yuǎn)程調(diào)用——http協(xié)議

    摘要:前言基于表單的遠(yuǎn)程調(diào)用協(xié)議,采用的實(shí)現(xiàn),關(guān)于協(xié)議就不用多說了吧。后記該部分相關(guān)的源碼解析地址該文章講解了遠(yuǎn)程調(diào)用中關(guān)于協(xié)議的部分,內(nèi)容比較簡單,可以參考著官方文檔了解一下。 遠(yuǎn)程調(diào)用——http協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟http協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-http的源碼。 前言 基于HTTP表單的遠(yuǎn)程調(diào)用協(xié)議,采用 Spring 的HttpInvoker實(shí)現(xiàn),關(guān)于h...

    xiyang 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<