国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo之Zookeeper注冊中心

Null / 2687人閱讀

摘要:目前支持多種注冊中心。本編文章是分析使用作為注冊中心,如何整合進行服務注冊和訂閱服務。

目前dubbo支持多種注冊中心:Zookeeper、Redis、Simple、Multicast、Etcd3。

本編文章是分析使用Zookeeper作為注冊中心,dubbo如何整合Zookeeper進行服務注冊和訂閱服務。

首先dubbo將服務注冊到Zookeeper后,目錄結構如下所示:(注冊接口名:com.bob.dubbo.service.CityDubboService)

在consumer和provider服務啟動的時候,去把自身URL格式化成字符串,然后注冊到zookeeper相應節點下,作為臨時節點,斷開連接后,節點刪除;consumer啟動時,不僅會訂閱服務,同時也會將自己的URL注冊到zookeeper中;

ZookeeperRegistry

ZookeeperRegistry:dubbo與zookeeper交互主要的類,已下結合源碼進行分析,先來看

doSubcribe()

這個方法主要是用于訂閱服務,添加監聽器,動態監聽提供者列表變化:

</>復制代碼

  1. @Override
  2. public void doSubscribe(final URL url, final NotifyListener listener) {
  3. try {
  4. // 處理所有service層發起的訂閱,例如監控中心的訂閱
  5. if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  6. String root = toRootPath();
  7. ConcurrentMap listeners = zkListeners.get(url);
  8. if (listeners == null) {
  9. zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
  10. listeners = zkListeners.get(url);
  11. }
  12. ChildListener zkListener = listeners.get(listener);
  13. if (zkListener == null) {
  14. listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
  15. for (String child : currentChilds) {
  16. child = URL.decode(child);
  17. if (!anyServices.contains(child)) {
  18. anyServices.add(child);
  19. subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
  20. Constants.CHECK_KEY, String.valueOf(false)), listener);
  21. }
  22. }
  23. });
  24. zkListener = listeners.get(listener);
  25. }
  26. zkClient.create(root, false);
  27. List services = zkClient.addChildListener(root, zkListener);
  28. if (services != null && !services.isEmpty()) {
  29. for (String service : services) {
  30. service = URL.decode(service);
  31. anyServices.add(service);
  32. subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
  33. Constants.CHECK_KEY, String.valueOf(false)), listener);
  34. }
  35. }
  36. // 處理指定service層發起的訂閱,例如服務消費者的訂閱
  37. } else {
  38. List urls = new ArrayList<>();
  39. // 循環分類數組 , router, configurator, provider
  40. for (String path : toCategoriesPath(url)) {
  41. // 獲得 url 對應的監聽器集合
  42. ConcurrentMap listeners = zkListeners.get(url);
  43. if (listeners == null) {// 不存在,進行創建
  44. zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
  45. listeners = zkListeners.get(url);
  46. }
  47. // 獲得 ChildListener 對象
  48. ChildListener zkListener = listeners.get(listener);
  49. if (zkListener == null) {// 不存在子目錄的監聽器,進行創建 ChildListener 對象
  50. // 訂閱父級目錄, 當有子節點發生變化時,觸發此回調函數,回調listener中的notify()方法
  51. listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
  52. zkListener = listeners.get(listener);
  53. }
  54. 創建Type節點,此節點為持久節點
  55. zkClient.create(path, false);
  56. // 向 Zookeeper ,PATH 節點,發起訂閱,返回此節點下的所有子元素 path : /根節點/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
  57. List children = zkClient.addChildListener(path, zkListener);
  58. if (children != null) {
  59. urls.addAll(toUrlsWithEmpty(url, path, children));
  60. }
  61. }
  62. // 首次全量數據獲取完成時,調用 `#notify(...)` 方法,回調 NotifyListener, 在這一步從連接Provider,實例化Invoker
  63. notify(url, listener, urls);
  64. }
  65. } catch (Throwable e) {
  66. throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  67. }
  68. }

register()

ZookeeperRegistry父類FailbackRegistry中的方法,用于將服務注冊到zookeeper,具體代碼如下:

</>復制代碼

  1. @Override
  2. public void register(URL url) {
  3. // 調用父類AbstractRegistry中的register()方法,將url存儲到注冊集合中
  4. super.register(url);
  5. // 如果之前這個url注冊失敗,則會從注冊失敗集合中刪除
  6. removeFailedRegistered(url);
  7. removeFailedUnregistered(url);
  8. try {
  9. // 像注冊中心發送注冊請求
  10. doRegister(url);
  11. } catch (Exception e) {
  12. Throwable t = e;
  13. // If the startup detection is opened, the Exception is thrown directly.
  14. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  15. && url.getParameter(Constants.CHECK_KEY, true)
  16. && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
  17. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  18. if (check || skipFailback) {
  19. if (skipFailback) {
  20. t = t.getCause();
  21. }
  22. throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  23. } else {
  24. logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  25. }
  26. // 將url存入注冊失敗集合中,進行重試try()
  27. addFailedRegistered(url);
  28. }
  29. }

doRegister()

ZookeeperRegistry類中的方法

</>復制代碼

  1. @Override
  2. public void doRegister(URL url) {
  3. try {
  4. // 通過zookeeper客戶端向注冊中心發送服務注冊請求,在zookeeper下創建服務對應的節點
  5. zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  6. } catch (Throwable e) {
  7. throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  8. }
  9. }

在介紹注冊registry()方法的時候,解析到了FailbackRegistry類,接下來咱們來分析一下這個類的作用:

FailbackRegistry

這個類是ZookeeperRegistry的父類,通過分析該類的結構,主要是用于服務的注冊、訂閱、重試,而服務具體的注冊、訂閱又在ZookeeperRegistry子類進行了實現,現在我們來分析重試這個功能,服務暴露和訂閱的配置文件中一般會設置重試這個屬性,如下所示:

</>復制代碼

上面是一個服務暴露的示例,設置了retries屬性,表示重試的次數。接下來咱們就以注冊重試進行分析(服務訂閱是同樣的原理):在注冊registry()方法中(代碼上面已提供),在異常catch{}代碼塊中有一個addFailedRegistered(url)方法,這個就是將注冊失敗的url添加到集合中,并創建一個重試的任務FailedRegisteredTask(url, this),代碼如下:

</>復制代碼

  1. private void addFailedRegistered(URL url) {
  2. // 先從集合中獲取,如果存在,直接返回
  3. FailedRegisteredTask oldOne = failedRegistered.get(url);
  4. if (oldOne != null) {
  5. return;
  6. }
  7. // 本地集合不存在,則創建重試定時任務,默認每隔5s執行
  8. FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
  9. oldOne = failedRegistered.putIfAbsent(url, newTask);
  10. if (oldOne == null) {
  11. // 將定時任務放置在HashedWheelTimer這個處理定時任務的容器,(HashedWheelTimer執行原理,可以自行查找資料,這里就不介紹)
  12. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
  13. }
  14. }

咱們下來看FailedRegisteredTask這個定時任務,有哪些東西,FailedRegisteredTask是AbstractRetryTask的子類,在執行new FailedRegisteredTask(url, this)代碼時,其實調用的是父類構造函數,其中retryTimes表示重試的次數,在沒有配置的情況下,默認重試三次:

</>復制代碼

  1. AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
  2. if (url == null || StringUtils.isBlank(taskName)) {
  3. throw new IllegalArgumentException();
  4. }
  5. this.url = url;
  6. this.registry = registry;
  7. this.taskName = taskName;
  8. cancel = false;
  9. this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
  10. // 重試次數,默認情況下重試三次
  11. this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES);
  12. }

在AbstractRetryTask類中有一個run()方法,在run()方法會根據XML配置文件中的retries屬性值進行比較來進行重試,如果沒有達到重試次數,則會調用doRetry(url, registry, timeout),而這個方法又在子類具體實現,這里我以注冊FailedRegisteredTask舉例:

</>復制代碼

  1. @Override
  2. public void run(Timeout timeout) throws Exception {
  3. if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
  4. // other thread cancel this timeout or stop the timer.
  5. return;
  6. }
  7. // 重試次數與設置的retries進行比較,超過則不在進行重試
  8. if (times > retryTimes) {
  9. // reach the most times of retry.
  10. logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
  11. return;
  12. }
  13. if (logger.isInfoEnabled()) {
  14. logger.info(taskName + " : " + url);
  15. }
  16. try {
  17. // 調用子類實現,進行重試
  18. doRetry(url, registry, timeout);
  19. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  20. logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
  21. // reput this task when catch exception.
  22. reput(timeout, retryPeriod);
  23. }
  24. }

在子類FailedRegisteredTask中doRetry()方法具體實現:

</>復制代碼

  1. public final class FailedRegisteredTask extends AbstractRetryTask {
  2. private static final String NAME = "retry register";
  3. public FailedRegisteredTask(URL url, FailbackRegistry registry) {
  4. super(url, registry, NAME);
  5. }
  6. @Override
  7. protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
  8. // 調用ZookeeperRegistry類中的doRegister()方法進行注冊
  9. registry.doRegister(url);
  10. registry.removeFailedRegisteredTask(url);
  11. }
  12. }

分析到這里,有個疑問:重試任務已經封裝了,任務什么時候去執行,怎么執行的?其實在上面咱們就分析到過,就是使用了HashedWheelTimer,這個類是在ZookeeperRegistry類初始化的時候就會去初始化:

</>復制代碼

  1. public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
  2. // 這個地方進行初始化的:初始化父類FailbackRegistry
  3. super(url);
  4. if (url.isAnyHost()) {
  5. throw new IllegalStateException("registry address == null");
  6. }
  7. String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
  8. if (!group.startsWith(Constants.PATH_SEPARATOR)) {
  9. group = Constants.PATH_SEPARATOR + group;
  10. }
  11. this.root = group;
  12. zkClient = zookeeperTransporter.connect(url);
  13. zkClient.addStateListener(state -> {
  14. if (state == StateListener.RECONNECTED) {
  15. try {
  16. recover();
  17. } catch (Exception e) {
  18. logger.error(e.getMessage(), e);
  19. }
  20. }
  21. });
  22. }

</>復制代碼

  1. public FailbackRegistry(URL url) {
  2. super(url);
  3. this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
  4. // 創建HashedWheelTimer對象
  5. retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
  6. }

然后在addFailedRegistered()方法中有retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);這樣的一條代碼,這個就是執行任務的開始點:

</>復制代碼

  1. @Override
  2. public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
  3. if (task == null) {
  4. throw new NullPointerException("task");
  5. }
  6. if (unit == null) {
  7. throw new NullPointerException("unit");
  8. }
  9. long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
  10. if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
  11. pendingTimeouts.decrementAndGet();
  12. throw new RejectedExecutionException("Number of pending timeouts ("
  13. + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
  14. + "timeouts (" + maxPendingTimeouts + ")");
  15. }
  16. // 開啟輪詢任務
  17. start();
  18. // Add the timeout to the timeout queue which will be processed on the next tick.
  19. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
  20. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  21. // Guard against overflow.
  22. if (delay > 0 && deadline < 0) {
  23. deadline = Long.MAX_VALUE;
  24. }
  25. HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  26. timeouts.add(timeout);
  27. return timeout;
  28. }

調用start()方法時,開啟一個線程work去輪詢存儲到HashedWheelTimer容器的任務,然后調用任務中的run()方法,

</>復制代碼

  1. public void start() {
  2. switch (WORKER_STATE_UPDATER.get(this)) {
  3. case WORKER_STATE_INIT:
  4. if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
  5. // 開啟work線程,執行work線程中的run()方法
  6. workerThread.start();
  7. }
  8. break;
  9. case WORKER_STATE_STARTED:
  10. break;
  11. case WORKER_STATE_SHUTDOWN:
  12. throw new IllegalStateException("cannot be started once stopped");
  13. default:
  14. throw new Error("Invalid WorkerState");
  15. }
  16. // Wait until the startTime is initialized by the worker.
  17. while (startTime == 0) {
  18. try {
  19. startTimeInitialized.await();
  20. } catch (InterruptedException ignore) {
  21. // Ignore - it will be ready very soon.
  22. }
  23. }
  24. }

</>復制代碼

  1. @Override
  2. public void run() {
  3. // Initialize the startTime.
  4. startTime = System.nanoTime();
  5. if (startTime == 0) {
  6. // We use 0 as an indicator for the uninitialized value here, so make sure it"s not 0 when initialized.
  7. startTime = 1;
  8. }
  9. // Notify the other threads waiting for the initialization at start().
  10. startTimeInitialized.countDown();
  11. do {
  12. final long deadline = waitForNextTick();
  13. if (deadline > 0) {
  14. int idx = (int) (tick & mask);
  15. processCancelledTasks();
  16. HashedWheelBucket bucket =
  17. wheel[idx];
  18. transferTimeoutsToBuckets();
  19. // 執行重試任務
  20. bucket.expireTimeouts(deadline);
  21. tick++;
  22. }
  23. } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
  24. // Fill the unprocessedTimeouts so we can return them from stop() method.
  25. for (HashedWheelBucket bucket : wheel) {
  26. bucket.clearTimeouts(unprocessedTimeouts);
  27. }
  28. for (; ; ) {
  29. HashedWheelTimeout timeout = timeouts.poll();
  30. if (timeout == null) {
  31. break;
  32. }
  33. if (!timeout.isCancelled()) {
  34. unprocessedTimeouts.add(timeout);
  35. }
  36. }
  37. processCancelledTasks();
  38. }

</>復制代碼

  1. void expireTimeouts(long deadline) {
  2. HashedWheelTimeout timeout = head;
  3. // process all timeouts
  4. while (timeout != null) {
  5. // 輪詢獲取重試任務
  6. HashedWheelTimeout next = timeout.next;
  7. if (timeout.remainingRounds <= 0) {
  8. next = remove(timeout);
  9. if (timeout.deadline <= deadline) {
  10. // 執行重試任務
  11. timeout.expire();
  12. } else {
  13. // The timeout was placed into a wrong slot. This should never happen.
  14. throw new IllegalStateException(String.format(
  15. "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
  16. }
  17. } else if (timeout.isCancelled()) {
  18. next = remove(timeout);
  19. } else {
  20. timeout.remainingRounds--;
  21. }
  22. timeout = next;
  23. }
  24. }

</>復制代碼

  1. public void expire() {
  2. if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
  3. return;
  4. }
  5. try {
  6. // 調用任務中的run()方法,(如:AbstractRetryTask任務中的run()方法,在去調用子類FailedRegisteredTask中的doRetry()方法進行重試注冊)
  7. task.run(this);
  8. } catch (Throwable t) {
  9. if (logger.isWarnEnabled()) {
  10. logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t);
  11. }
  12. }
  13. }

在上面對于HashedWheelTimer的具體實現原理,并沒有進行詳細的進行分析,如果想了解的和學習的話,可以自行查找資料。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/73033.html

相關文章

  • dubbo zookeeper spring整合helloworld

    摘要:是目前非常流行的分布式服務技術,很多公司都在用。空閑之余,搭了個,分享給大家。本文下載下載是服務的注冊中心,下載后進入到安裝目錄。雙擊即可啟動注冊中心服務。 dubbo是目前非常流行的分布式服務技術,很多公司都在用。空閑之余,搭了個helloworld,分享給大家。本文demo下載1.下載 zookeeperzookeeper是服務的注冊中心,下載后進入到安裝目錄G:bakCenter...

    sugarmo 評論0 收藏0
  • Dubbo 一篇文章就夠了:從入門到實戰

    摘要:啟動容器,加載,運行服務提供者。服務提供者在啟動時,在注冊中心發布注冊自己提供的服務。注冊中心返回服務提供者地址列表給消費者,如果有變更,注冊中心將基于長連接推送變更數據給消費者。 一 為什么需要 dubbo 很多時候,其實我們使用這個技術的時候,可能都是因為項目需要,所以,我們就用了,但是,至于為什么我們需要用到這個技術,可能自身并不是很了解的,但是,其實了解技術的來由及背景知識,對...

    tomener 評論0 收藏0
  • 超詳細,新手都能看懂 !使用SpringBoot+Dubbo 搭建一個簡單的分布式服務

    Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個簡單分布式服務 實戰之前,先來看幾個重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構 什么是 RPC? 為什么要用 Dubbo? 開始實戰 1 ...

    chengtao1633 評論0 收藏0
  • Spring Cloud與Dubbo的完美融合手「Spring Cloud Alibaba」

    摘要:構建服務接口創建一個簡單的項目,并在下面定義一個抽象接口,比如構建服務接口提供方第一步創建一個項目,在中引入第一步中構建的包以及對和的依賴,比如第一步中構建的包這里需要注意兩點必須包含包,不然啟動會報錯。 很早以前,在剛開始搞Spring Cloud基礎教程的時候,寫過這樣一篇文章:《微服務架構的基礎框架選擇:Spring Cloud還是Dubbo?》,可能不少讀者也都看過。之后也就一...

    wpw 評論0 收藏0
  • 微服務架構基礎注冊中心

    摘要:在微服務架構中,注冊中心是核心的基礎服務之一。在微服務架構流行之前,注冊中心就已經開始出現在分布式架構的系統中。服務提供者注冊到注冊中心,服務消費者到注冊中心訂閱,同時,注冊中心中的變更也會通知服務消費者。 在微服務架構中,注冊中心是核心的基礎服務之一。在微服務架構流行之前,注冊中心就已經開始出現在分布式架構的系統中。Dubbo是一個在國內比較流行的分布式框架,被大量的中小型互聯網公司...

    JayChen 評論0 收藏0

發表評論

0條評論

Null

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<