摘要:定時清理任務是清理過期的注冊的服務。方法我在源碼解析四注冊中心中已經(jīng)講到。后記該部分相關的源碼解析地址該文章講解了利用來實現(xiàn)注冊中心,其中關鍵的是需要弄明白以及單播廣播多播的概念,其他的邏輯并不復雜。
注冊中心——multicast
目標:解釋以為multicast實現(xiàn)的注冊中心原理,理解單播、廣播、多播區(qū)別,解讀duubo-registry-multicast的源碼
這是dubbo實現(xiàn)注冊中心的第二種方式,也是dubbo的demo模塊中用的注冊中心實現(xiàn)方式。multicast其實是用到了MulticastSocket來實現(xiàn)的。
我這邊稍微補充一點關于多點廣播,也就是MulticastSocket的介紹。MulticastSocket類是繼承了DatagramSocket類,DatagramSocket只允許把數(shù)據(jù)報發(fā)送給一個指定的目標地址,而MulticastSocket可以將數(shù)據(jù)報以廣播的形式發(fā)送給多個客戶端。它的思想是MulticastSocket會把一個數(shù)據(jù)報發(fā)送給一個特定的多點廣播地址,這個多點廣播地址是一組特殊的網(wǎng)絡地址,當客戶端需要發(fā)送或者接收廣播信息時,只要加入該組就好。IP協(xié)議為多點廣播提供了一批特殊的IP地址,地址范圍是224.0.0.0至239.255.255.255。MulticastSocket類既可以將數(shù)據(jù)報發(fā)送到多點廣播地址,也可以接收其他主機的廣播信息。
以上是對multicast背景的簡略介紹,接下來讓我們具體的來看dubbo怎么把MulticastSocket運用到注冊中心的實現(xiàn)中。
我們先來看看包下面有哪些類:
可以看到跟默認的注冊中心的包結構非常類似。接下來我們就來解讀一下這兩個類。
(一)MulticastRegistry該類繼承了FailbackRegistry類,該類就是針對注冊中心核心的功能注冊、訂閱、取消注冊、取消訂閱,查詢注冊列表進行展開,利用廣播的方式去實現(xiàn)。
1.屬性// logging output // 日志記錄輸出 private static final Logger logger = LoggerFactory.getLogger(MulticastRegistry.class); // 默認的多點廣播端口 private static final int DEFAULT_MULTICAST_PORT = 1234; // 多點廣播的地址 private final InetAddress mutilcastAddress; // 多點廣播 private final MulticastSocket mutilcastSocket; // 多點廣播端口 private final int mutilcastPort; //收到的URL private final ConcurrentMap> received = new ConcurrentHashMap >(); // 任務調(diào)度器 private final ScheduledExecutorService cleanExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboMulticastRegistryCleanTimer", true)); // 定時清理執(zhí)行器,一定時間清理過期的url private final ScheduledFuture> cleanFuture; // 清理的間隔時間 private final int cleanPeriod; // 管理員權限 private volatile boolean admin = false;
看上面的屬性,需要關注以下幾個點:
mutilcastSocket,該類是muticast注冊中心實現(xiàn)的關鍵,這里補充一下單播、廣播、以及多播的區(qū)別,因為下面會涉及到。單播是每次只有兩個實體相互通信,發(fā)送端和接收端都是唯一確定的;廣播目的地址為網(wǎng)絡中的全體目標,而多播的目的地址是一組目標,加入該組的成員均是數(shù)據(jù)包的目的地。
關注任務調(diào)度器和清理計時器,該類封裝了定時清理過期的服務的策略。
2.構造方法public MulticastRegistry(URL url) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } if (!isMulticastAddress(url.getHost())) { throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255"); } try { mutilcastAddress = InetAddress.getByName(url.getHost()); // 如果url攜帶的配置中沒有端口號,則使用默認端口號 mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort(); mutilcastSocket = new MulticastSocket(mutilcastPort); // 禁用多播數(shù)據(jù)報的本地環(huán)回 mutilcastSocket.setLoopbackMode(false); // 加入同一組廣播 mutilcastSocket.joinGroup(mutilcastAddress); Thread thread = new Thread(new Runnable() { @Override public void run() { byte[] buf = new byte[2048]; // 實例化數(shù)據(jù)報 DatagramPacket recv = new DatagramPacket(buf, buf.length); while (!mutilcastSocket.isClosed()) { try { // 接收數(shù)據(jù)包 mutilcastSocket.receive(recv); String msg = new String(recv.getData()).trim(); int i = msg.indexOf(" "); if (i > 0) { msg = msg.substring(0, i).trim(); } // 接收消息請求,根據(jù)消息并相應操作,比如注冊,訂閱等 MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress()); Arrays.fill(buf, (byte) 0); } catch (Throwable e) { if (!mutilcastSocket.isClosed()) { logger.error(e.getMessage(), e); } } } } }, "DubboMulticastRegistryReceiver"); // 設置為守護進程 thread.setDaemon(true); // 開啟線程 thread.start(); } catch (IOException e) { throw new IllegalStateException(e.getMessage(), e); } // 優(yōu)先從url中獲取清理延遲配置,若沒有,則默認為60s this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT); // 如果配置了需要清理 if (url.getParameter("clean", true)) { // 開啟計時器 this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { // 清理過期的服務 clean(); // Remove the expired } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t); } } }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS); } else { this.cleanFuture = null; } }
這個構造器最關鍵的就是一個線程和一個定時清理任務。
線程中做的工作是根據(jù)接收到的消息來判定是什么請求,作出對應的操作,只要mutilcastSocket沒有斷開,就一直接收消息,內(nèi)部的實現(xiàn)體現(xiàn)在receive方法中,下文會展開講述。
定時清理任務是清理過期的注冊的服務。通過兩次socket的嘗試來判定是否過期。clean方法下文會展開講述
3.isMulticastAddressprivate static boolean isMulticastAddress(String ip) { int i = ip.indexOf("."); if (i > 0) { String prefix = ip.substring(0, i); if (StringUtils.isInteger(prefix)) { int p = Integer.parseInt(prefix); return p >= 224 && p <= 239; } } return false; }
該方法很簡單,為也沒寫注釋,就是判斷是否為多點廣播地址,地址范圍是224.0.0.0至239.255.255.255。
4.cleanprivate void clean() { // 當url中攜帶的服務接口配置為是*時候,才可以執(zhí)行清理 if (admin) { for (Setproviders : new HashSet >(received.values())) { for (URL url : new HashSet (providers)) { // 判斷是否過期 if (isExpired(url)) { if (logger.isWarnEnabled()) { logger.warn("Clean expired provider " + url); } //取消注冊 doUnregister(url); } } } } }
該方法也比較簡單,關機的是如何判斷過期以及做的取消注冊的操作。下面會展開講解這幾個方法。
5.isExpiredprivate boolean isExpired(URL url) { // 如果為非動態(tài)管理模式或者協(xié)議是consumer、route或者override,則沒有過期 if (!url.getParameter(Constants.DYNAMIC_KEY, true) || url.getPort() <= 0 || Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()) || Constants.ROUTE_PROTOCOL.equals(url.getProtocol()) || Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) { return false; } Socket socket = null; try { // 利用url攜帶的主機地址和端口號實例化socket socket = new Socket(url.getHost(), url.getPort()); } catch (Throwable e) { // 如果實例化失敗,等待100ms重試第二次,如果還失敗,則判定已過期 try { // 等待100ms Thread.sleep(100); } catch (Throwable e2) { } Socket socket2 = null; try { socket2 = new Socket(url.getHost(), url.getPort()); } catch (Throwable e2) { return true; } finally { if (socket2 != null) { try { socket2.close(); } catch (Throwable e2) { } } } } finally { if (socket != null) { try { socket.close(); } catch (Throwable e) { } } } return false; }
這個方法就是判斷服務是否過期,有兩次嘗試socket的操作,如果嘗試失敗,則判斷為過期。
6.receiveprivate void receive(String msg, InetSocketAddress remoteAddress) { if (logger.isInfoEnabled()) { logger.info("Receive multicast message: " + msg + " from " + remoteAddress); } // 如果這個消息是以register、unregister、subscribe開頭的,則進行相應的操作 if (msg.startsWith(Constants.REGISTER)) { URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim()); // 注冊服務 registered(url); } else if (msg.startsWith(Constants.UNREGISTER)) { URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim()); // 取消注冊服務 unregistered(url); } else if (msg.startsWith(Constants.SUBSCRIBE)) { URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim()); // 獲得以及注冊的url集合 Seturls = getRegistered(); if (urls != null && !urls.isEmpty()) { for (URL u : urls) { // 判斷是否合法 if (UrlUtils.isMatch(url, u)) { String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp(); // 建議服務提供者和服務消費者在不同機器上運行,如果在同一機器上,需設置unicast=false // 同一臺機器中的多個進程不能單播單播,或者只有一個進程接收信息,發(fā)給消費者的單播消息可能被提供者搶占,兩個消費者在同一臺機器也一樣, // 只有multicast注冊中心有此問題 if (url.getParameter("unicast", true) // Whether the consumer"s machine has only one process && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information unicast(Constants.REGISTER + " " + u.toFullString(), host); } else { broadcast(Constants.REGISTER + " " + u.toFullString()); } } } } }/* else if (msg.startsWith(UNSUBSCRIBE)) { }*/ }
可以很清楚的看到,根據(jù)接收到的消息開頭的數(shù)據(jù)來判斷需要做什么類型的操作,重點在于訂閱,可以選擇單播訂閱還是廣播訂閱,這個取決于url攜帶的配置是什么。
7.broadcastprivate void broadcast(String msg) { if (logger.isInfoEnabled()) { logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort); } try { byte[] data = (msg + " ").getBytes(); // 實例化數(shù)據(jù)報,重點是目的地址是mutilcastAddress DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort); // 發(fā)送數(shù)據(jù)報 mutilcastSocket.send(hi); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
這是廣播的實現(xiàn)方法,重點是數(shù)據(jù)報的目的地址是mutilcastAddress。代表著一組地址
8.unicastprivate void unicast(String msg, String host) { if (logger.isInfoEnabled()) { logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort); } try { byte[] data = (msg + " ").getBytes(); // 實例化數(shù)據(jù)報,重點是目的地址是只是單個地址 DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort); // 發(fā)送數(shù)據(jù)報 mutilcastSocket.send(hi); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
這是單播的實現(xiàn),跟廣播的區(qū)別就只是目的地址不一樣,單播的目的地址就只是一個地址,而廣播的是一組地址。
9.doRegister && doUnregister && doSubscribe && doUnsubscribe@Override protected void doRegister(URL url) { broadcast(Constants.REGISTER + " " + url.toFullString()); } @Override protected void doUnregister(URL url) { broadcast(Constants.UNREGISTER + " " + url.toFullString()); } @Override protected void doSubscribe(URL url, NotifyListener listener) { // 當url中攜帶的服務接口配置為是*時候,才可以執(zhí)行清理,類似管理員權限 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { admin = true; } broadcast(Constants.SUBSCRIBE + " " + url.toFullString()); // 對監(jiān)聽器進行同步鎖 synchronized (listener) { try { listener.wait(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } catch (InterruptedException e) { } } } @Override protected void doUnsubscribe(URL url, NotifyListener listener) { if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { unregister(url); } broadcast(Constants.UNSUBSCRIBE + " " + url.toFullString()); }
這幾個方法就是實現(xiàn)了父類FailbackRegistry的抽象方法。都是調(diào)用了broadcast方法。
10.destroy@Override public void destroy() { super.destroy(); try { // 取消清理任務 if (cleanFuture != null) { cleanFuture.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { // 把該地址從組內(nèi)移除 mutilcastSocket.leaveGroup(mutilcastAddress); // 關閉mutilcastSocket mutilcastSocket.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } // 關閉線程池 ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod); }
該方法的邏輯跟dubbo注冊中心的destroy方法類似,就多了把該地址從組內(nèi)移除的操作。gracefulShutdown方法我在《dubbo源碼解析(四)注冊中心——dubbo》中已經(jīng)講到。
11.register@Override public void register(URL url) { super.register(url); registered(url); }
protected void registered(URL url) { // 遍歷訂閱的監(jiān)聽器集合 for (Map.Entry> entry : getSubscribed().entrySet()) { URL key = entry.getKey(); // 判斷是否合法 if (UrlUtils.isMatch(key, url)) { // 通過消費者url獲得接收到的服務url集合 Set urls = received.get(key); if (urls == null) { received.putIfAbsent(key, new ConcurrentHashSet ()); urls = received.get(key); } // 加入服務url urls.add(url); List list = toList(urls); for (NotifyListener listener : entry.getValue()) { // 把服務url的變化通知監(jiān)聽器 notify(key, listener, list); synchronized (listener) { listener.notify(); } } } } }
可以看到該類重寫了父類的register方法,不過邏輯沒有過多的變化,就是把需要注冊的url放入緩存中,如果通知監(jiān)聽器url的變化。
12.unregister@Override public void unregister(URL url) { super.unregister(url); unregistered(url); }
protected void unregistered(URL url) { // 遍歷訂閱的監(jiān)聽器集合 for (Map.Entry> entry : getSubscribed().entrySet()) { URL key = entry.getKey(); if (UrlUtils.isMatch(key, url)) { Set urls = received.get(key); // 緩存中移除 if (urls != null) { urls.remove(url); } if (urls == null || urls.isEmpty()){ if (urls == null){ urls = new ConcurrentHashSet (); } // 設置攜帶empty協(xié)議的url URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL); urls.add(empty); } List list = toList(urls); // 通知監(jiān)聽器 服務url變化 for (NotifyListener listener : entry.getValue()) { notify(key, listener, list); } } } }
這個邏輯也比較清晰,把需要取消注冊的服務url從緩存中移除,然后如果沒有接收的服務url了,就加入一個攜帶empty協(xié)議的url,然后通知監(jiān)聽器服務變化。
13.lookup@Override public Listlookup(URL url) { List urls = new ArrayList (); // 通過消費者url獲得訂閱的服務的監(jiān)聽器 Map > notifiedUrls = getNotified().get(url); // 獲得注冊的服務url集合 if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List values : notifiedUrls.values()) { urls.addAll(values); } } // 如果為空,則從內(nèi)存緩存properties獲得相關value,并且返回為注冊的服務 if (urls.isEmpty()) { List cacheUrls = getCacheUrls(url); if (cacheUrls != null && !cacheUrls.isEmpty()) { urls.addAll(cacheUrls); } } // 如果還是為空則從緩存registered中獲得已注冊 服務URL 集合 if (urls.isEmpty()) { for (URL u : getRegistered()) { if (UrlUtils.isMatch(url, u)) { urls.add(u); } } } // 如果url攜帶的配置服務接口為*,也就是所有服務,則從緩存subscribed獲得已注冊 服務URL 集合 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { for (URL u : getSubscribed().keySet()) { if (UrlUtils.isMatch(url, u)) { urls.add(u); } } } return urls; }
該方法是返回注冊的服務url列表,可以看到有很多種獲得的方法這些緩存都保存在AbstractRegistry類中,相關的介紹可以查看《dubbo源碼解析(三)注冊中心——開篇》。
14.subscribe && unsubscribe@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); subscribed(url, listener); } @Override public void unsubscribe(URL url, NotifyListener listener) { super.unsubscribe(url, listener); received.remove(url); }
protected void subscribed(URL url, NotifyListener listener) { // 查詢注冊列表 Listurls = lookup(url); // 通知url notify(url, listener, urls); }
這兩個重寫了父類的方法,分別是訂閱和取消訂閱。邏輯很簡單。
(二)MulticastRegistryFactory該類繼承了AbstractRegistryFactory類,實現(xiàn)了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:
public class MulticastRegistryFactory extends AbstractRegistryFactory { @Override public Registry createRegistry(URL url) { return new MulticastRegistry(url); } }
可以看到就是實例化了MulticastRegistry而已,所有這里就不解釋了。
后記該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了dubbo利用multicast來實現(xiàn)注冊中心,其中關鍵的是需要弄明白MulticastSocket以及單播、廣播、多播的概念,其他的邏輯并不復雜。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72104.html
摘要:英文全名為,也叫遠程過程調(diào)用,其實就是一個計算機通信協(xié)議,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務而不需要了解底層網(wǎng)絡技術的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...
摘要:是用來監(jiān)聽處理注冊數(shù)據(jù)變更的事件。這里的是節(jié)點的接口,里面協(xié)定了關于節(jié)點的一些操作方法,我們可以來看看源代碼獲得節(jié)點地址判斷節(jié)點是否可用銷毀節(jié)點三這個接口是注冊中心的工廠接口,用來返回注冊中心的對象。 注冊中心——開篇 目標:解釋注冊中心在dubbo框架中作用,dubbo-registry-api源碼解讀 注冊中心是什么? 服務治理框架中可以大致分為服務通信和服務管理兩個部分,服務管理...
摘要:一該類繼承了類,該類里面封裝了一個重連機制,而注冊中心核心的功能注冊訂閱取消注冊取消訂閱,查詢注冊列表都是調(diào)用了我上一篇文章源碼解析三注冊中心開篇中講到的實現(xiàn)方法,畢竟這種實現(xiàn)注冊中心的方式是默認的方式,不過推薦使用,這個后續(xù)講解。 注冊中心——dubbo 目標:解釋以為dubbo實現(xiàn)的注冊中心原理,解讀duubo-registry-default源碼 dubbo內(nèi)置的注冊中心實現(xiàn)方式...
摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個的鍵值對,包含了一個服務所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...
摘要:服務提供者在啟動時,向注冊中心注冊自己提供的服務。注冊中心返回服務提供者地址列表給消費者,如果有變更,注冊中心將基于長連接推送變更數(shù)據(jù)給消費者。 先來了解一下這些年架構的變化,下面的故事是我編的。。。。 傳統(tǒng)架構:很多年前,剛學完JavaWeb開發(fā)的我憑借一人之力就開發(fā)了一個網(wǎng)站,網(wǎng)站 所有的功能和應用都集中在一起,方便了我的開發(fā)同時也節(jié)省了成本。但是后來我的網(wǎng)站訪問流量突然加大,我通...
閱讀 378·2023-04-25 16:38
閱讀 1495·2021-09-26 09:46
閱讀 3340·2021-09-08 09:35
閱讀 2788·2019-08-30 12:54
閱讀 3260·2019-08-29 17:06
閱讀 1027·2019-08-29 14:06
閱讀 3354·2019-08-29 13:00
閱讀 3473·2019-08-28 17:53