国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四)注冊中心——dubbo

andot / 2757人閱讀

摘要:一該類繼承了類,該類里面封裝了一個重連機制,而注冊中心核心的功能注冊訂閱取消注冊取消訂閱,查詢注冊列表都是調(diào)用了我上一篇文章源碼解析三注冊中心開篇中講到的實現(xiàn)方法,畢竟這種實現(xiàn)注冊中心的方式是默認的方式,不過推薦使用,這個后續(xù)講解。

注冊中心——dubbo
目標:解釋以為dubbo實現(xiàn)的注冊中心原理,解讀duubo-registry-default源碼

dubbo內(nèi)置的注冊中心實現(xiàn)方式有四種,這是第一種,也是dubbo默認的注冊中心實現(xiàn)方式。我們可以從上篇文章中看到RegistryFactory接口的@SPI默認值是dubbo。

我們先來看看包下面有哪些類:

可以看到該包下就兩個類,下面就來解讀這兩個類。

(一)DubboRegistry

該類繼承了FailbackRegistry類,該類里面封裝了一個重連機制,而注冊中心核心的功能注冊、訂閱、取消注冊、取消訂閱,查詢注冊列表都是調(diào)用了我上一篇文章《dubbo源碼解析(三)注冊中心——開篇》中講到的實現(xiàn)方法,畢竟這種實現(xiàn)注冊中心的方式是dubbo默認的方式,不過dubbo推薦使用zookeeper,這個后續(xù)講解。

1.屬性
// 日志記錄
private final static Logger logger = LoggerFactory.getLogger(DubboRegistry.class);

// Reconnecting detection cycle: 3 seconds (unit:millisecond)
// 重新連接周期:3秒
private static final int RECONNECT_PERIOD_DEFAULT = 3 * 1000;

// Scheduled executor service
// 任務(wù)調(diào)度器
private final ScheduledExecutorService reconnectTimer = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryReconnectTimer", true));

// Reconnection timer, regular check connection is available. If unavailable, unlimited reconnection.
// 重新連接執(zhí)行器,定期檢查連接可用,如果不可用,則無限制重連
private final ScheduledFuture reconnectFuture;

// The lock for client acquisition process, lock the creation process of the client instance to prevent repeated clients
// 客戶端的鎖,保證客戶端的原子性,可見行,線程安全。
private final ReentrantLock clientLock = new ReentrantLock();

// 注冊中心Invoker
private final Invoker registryInvoker;

// 注冊中心服務(wù)對象
private final RegistryService registryService;

// 任務(wù)調(diào)度器reconnectTimer將等待的時間
private final int reconnectPeriod;

看上面的源碼,可以看到這里的重連是建立了一個計時器,并且會定期檢查連接是否可用,如果不可用,就無限重連。只要懂一點線程相關(guān)的知識,這里的屬性還是比較好理解的。

2.構(gòu)造函數(shù)DubboRegistry

先來看看源碼:

public DubboRegistry(Invoker registryInvoker, RegistryService registryService) {
    // 調(diào)用父類FailbackRegistry的構(gòu)造函數(shù)
    super(registryInvoker.getUrl());
    this.registryInvoker = registryInvoker;
    this.registryService = registryService;
    // Start reconnection timer
    // 優(yōu)先取url中key為reconnect.perio的配置,如果沒有,則使用默認的3s
    this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
    // 每reconnectPeriod秒去連接,首次連接也延遲reconnectPeriod秒
    reconnectFuture = reconnectTimer.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            // Check and connect to the registry
            try {
                connect();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
            }
        }
    }, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}

這個構(gòu)造方法中有兩個關(guān)鍵點:

關(guān)于等待時間優(yōu)先從url配置中取得,如果沒有這個值,再設(shè)置為默認值3s。

創(chuàng)建了一個重連計時器,一定的間隔時間去檢查是否斷開,如果斷開就進行連接。

3.connect

該方法是連接注冊中心的實現(xiàn),來看看源碼:

protected final void connect() {
    try {
        // Check whether or not it is connected
        // 檢查注冊中心是否已連接
        if (isAvailable()) {
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Reconnect to registry " + getUrl());
        }
        // 獲得客戶端鎖
        clientLock.lock();
        try {
            // Double check whether or not it is connected
            // 二次查詢注冊中心是否已經(jīng)連接
            if (isAvailable()) {
                return;
            }
            // 恢復(fù)注冊和訂閱
            recover();
        } finally {
            // 釋放鎖
            clientLock.unlock();
        }
    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
        if (getUrl().getParameter(Constants.CHECK_KEY, true)) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            }
            throw new RuntimeException(t.getMessage(), t);
        }
        logger.error("Failed to connect to registry " + getUrl().getAddress() + " from provider/consumer " + NetUtils.getLocalHost() + " use dubbo " + Version.getVersion() + ", cause: " + t.getMessage(), t);
    }
}

我們可以看到這里的重連機制其實就是調(diào)用了父類FailbackRegistry的recover方法,關(guān)于recover方法我在《dubbo源碼解析(三)注冊中心——開篇》中已經(jīng)講解過了。還有要關(guān)注的就是需要保證客戶端線程安全。需要獲得鎖和釋放鎖。

4.isAvailable

該方法就是用來檢查注冊中心是否連接,源碼如下:

public boolean isAvailable() {
    if (registryInvoker == null)
        return false;
    return registryInvoker.isAvailable();
}
5.destroy

該方法是銷毀方法,主要是銷毀重連計時器、注冊中心的Invoker和任務(wù)調(diào)度器,源碼如下:

@Override
public void destroy() {
    super.destroy();
    try {
        // Cancel the reconnection timer
        // 取消重新連接計時器
        if (!reconnectFuture.isCancelled()) {
            reconnectFuture.cancel(true);
        }
    } catch (Throwable t) {
        logger.warn("Failed to cancel reconnect timer", t);
    }
    // 銷毀注冊中心的Invoker
    registryInvoker.destroy();
    // 關(guān)閉任務(wù)調(diào)度器
    ExecutorUtil.gracefulShutdown(reconnectTimer, reconnectPeriod);
}

這里用到了ExecutorUtil中的gracefulShutdown,因為ExecutorUtil是common模塊中的類,我在第一篇中講到我會穿插在各個文章中介紹這個模塊,所以我咋這里介紹一下這個gracefulShutdown方法,我們可以看一下這個源碼:

public static void gracefulShutdown(Executor executor, int timeout) {
    if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
        return;
    }
    final ExecutorService es = (ExecutorService) executor;
    try {
        // Disable new tasks from being submitted
        // 停止接收新的任務(wù)并且等待已經(jīng)提交的任務(wù)(包含提交正在執(zhí)行和提交未執(zhí)行)執(zhí)行完成
        // 當(dāng)所有提交任務(wù)執(zhí)行完畢,線程池即被關(guān)閉
        es.shutdown();
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    try {
        // Wait a while for existing tasks to terminate
        // 當(dāng)?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,如果沒關(guān)閉,再關(guān)閉一次
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            // 試圖停止所有正在執(zhí)行的線程,不再處理還在池隊列中等待的任務(wù)
            // ShutdownNow()并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    if (!isTerminated(es)) {
        newThreadToCloseExecutor(es);
    }
}

可以看到這個銷毀任務(wù)調(diào)度器,也就是退出線程池,調(diào)用了shutdown、shutdownNow方法,這里也替大家惡補了一下線程池關(guān)閉的方法區(qū)別。

6.doRegister && doUnregister && doSubscribe && doUnsubscribe && lookup

關(guān)于這個五個方法都是調(diào)用了RegistryService的方法,讀者可自主查看《dubbo源碼解析(三)注冊中心——開篇》來理解內(nèi)部實現(xiàn)。

(二)DubboRegistryFactory

該類繼承了AbstractRegistryFactory類,實現(xiàn)了AbstractRegistryFactory抽象出來的createRegistry方法,是dubbo這種實現(xiàn)的注冊中心的工廠類,里面做了一些初始化的處理,以及創(chuàng)建注冊中心DubboRegistry的對象實例。因為該類的屬性比較好理解,所以下面就不在展開講解了。

1.getRegistryURL

獲取注冊中心url,類似于初始化注冊中心url的方法。

private static URL getRegistryURL(URL url) {
    return url.setPath(RegistryService.class.getName())
            // 移除暴露服務(wù)和引用服務(wù)的參數(shù)
            .removeParameter(Constants.EXPORT_KEY).removeParameter(Constants.REFER_KEY)
            // 添加注冊中心服務(wù)接口class值
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            // 啟用sticky 粘性連接,讓客戶端總是連接同一提供者
            .addParameter(Constants.CLUSTER_STICKY_KEY, "true")
            // 決定在創(chuàng)建客戶端時建立連接
            .addParameter(Constants.LAZY_CONNECT_KEY, "true")
            // 不重連
            .addParameter(Constants.RECONNECT_KEY, "false")
            // 方法調(diào)用超時時間為10s
            .addParameterIfAbsent(Constants.TIMEOUT_KEY, "10000")
            // 每個客戶端上一個接口的回調(diào)服務(wù)實例的限制為10000個
            .addParameterIfAbsent(Constants.CALLBACK_INSTANCES_LIMIT_KEY, "10000")
            // 注冊中心連接超時時間10s
            .addParameterIfAbsent(Constants.CONNECT_TIMEOUT_KEY, "10000")
            // 添加方法級配置
            .addParameter(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(Wrapper.getWrapper(RegistryService.class).getDeclaredMethodNames())), ","))
            //.addParameter(Constants.STUB_KEY, RegistryServiceStub.class.getName())
            //.addParameter(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()) //for event dispatch
            //.addParameter(Constants.ON_DISCONNECT_KEY, "disconnect")
            .addParameter("subscribe.1.callback", "true")
            .addParameter("unsubscribe.1.callback", "false");
}

看上面的源碼可以很直白的看書就是對url中的配置做一些初始化設(shè)置。幾乎每個key對應(yīng)的意義我都在上面展示了,會比較好理解。

2.createRegistry

該方法就是實現(xiàn)了AbstractRegistryFactory抽象出來的createRegistry方法,該子類就只關(guān)注createRegistry方法,其他公共的邏輯都在AbstractRegistryFactory已經(jīng)實現(xiàn)。看一下源碼:

@Override
public Registry createRegistry(URL url) {
    // 類似于初始化注冊中心
    url = getRegistryURL(url);
    List urls = new ArrayList();
    // 移除備用的值
    urls.add(url.removeParameter(Constants.BACKUP_KEY));
    String backup = url.getParameter(Constants.BACKUP_KEY);
    if (backup != null && backup.length() > 0) {
        // 分割備用地址
        String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);
        for (String address : addresses) {
            urls.add(url.setAddress(address));
        }
    }
    // 創(chuàng)建RegistryDirectory,里面有多個Registry的Invoker
    RegistryDirectory directory = new RegistryDirectory(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
    // 將directory中的多個Invoker偽裝成一個Invoker
    Invoker registryInvoker = cluster.join(directory);
    // 代理
    RegistryService registryService = proxyFactory.getProxy(registryInvoker);
    // 創(chuàng)建注冊中心對象
    DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // 通知監(jiān)聽器
    directory.notify(urls);
    // 訂閱
    directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));
    return registry;
}

在這個方法中做了實例化了DubboRegistry,并且做了通知和訂閱的操作。相關(guān)集群、代理以及包含了很多Invoker的Directory我會在后續(xù)文章中講到。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了dubbo的注冊中心默認實現(xiàn),默認實現(xiàn)很多都是調(diào)用了之前api中講解到的方法,并沒有重寫過多的方法,其中已經(jīng)涉及到集群之類的內(nèi)容,請關(guān)注我后續(xù)的文章,講解這些內(nèi)容。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72041.html

相關(guān)文章

  • dubbo源碼解析(一)Hello,Dubbo

    摘要:英文全名為,也叫遠程過程調(diào)用,其實就是一個計算機通信協(xié)議,它是一種通過網(wǎng)絡(luò)從遠程計算機程序上請求服務(wù)而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...

    evin2016 評論0 收藏0
  • dubbo源碼解析十三)2.7新特性

    摘要:大揭秘目標了解的新特性,以及版本升級的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個的鍵值對,包含了一個服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導(dǎo)。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析(三)注冊中心——開篇

    摘要:是用來監(jiān)聽處理注冊數(shù)據(jù)變更的事件。這里的是節(jié)點的接口,里面協(xié)定了關(guān)于節(jié)點的一些操作方法,我們可以來看看源代碼獲得節(jié)點地址判斷節(jié)點是否可用銷毀節(jié)點三這個接口是注冊中心的工廠接口,用來返回注冊中心的對象。 注冊中心——開篇 目標:解釋注冊中心在dubbo框架中作用,dubbo-registry-api源碼解讀 注冊中心是什么? 服務(wù)治理框架中可以大致分為服務(wù)通信和服務(wù)管理兩個部分,服務(wù)管理...

    CastlePeaK 評論0 收藏0
  • dubbo源碼解析(七)注冊中心——zookeeper

    摘要:層根據(jù)不同的目錄可以有服務(wù)提供者服務(wù)消費者路由規(guī)則配置規(guī)則。通過這樣的方式,可以處理類似服務(wù)提供者為空的情況。 注冊中心——zookeeper 目標:解釋以為zookeeper實現(xiàn)的注冊中心原理,解讀duubo-registry-zookeeper的源碼 這篇文章是講解注冊中心的最后一篇文章。這篇文章講的是dubbo的注冊中心用zookeeper來實現(xiàn)。這種實現(xiàn)注冊中心的方法也是dub...

    wanglu1209 評論0 收藏0
  • dubbo源碼解析)服務(wù)暴露過程

    摘要:服務(wù)暴露過程目標從源碼的角度分析服務(wù)暴露過程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠程兩個過程。其中服務(wù)暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務(wù)暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過程 目標:從源碼的角度分析服務(wù)暴露過程。 前言 本來這一篇一個寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<