摘要:就是上面提到的變化后執行的程序。這段代碼是的構造函數,除了里面的一些賦值函數,重點說下這個函數它有個參數,第一個參數是你要的名字,類似這樣的。
昨晚微信開放了JS接口。忙活了一晚上。
然后周六就沒啥斗志了,突然想起第二篇說好要介紹demo的沒介紹,就趕緊來寫了。
先來個 傳送門
這里簡單介紹了下官方demo。用來演示Zookeeper官方推薦的程序框架(Executor、DataMonitor)。
這個Demo實現了如下功能:
監視一個結點,獲取該結點的數據,并啟動一個自定義程序
如果這個結點發生變化了(數據更改,新增或者刪除),則停止這個自定義程序并重新執行。
如果這個結點被刪除了,則停止運行自定義程序
對著Demo看下代碼。首先,官方有這么個章節
Program DesignConventionally, ZooKeeper applications are broken into two units, one which maintains the connection, and the other which monitors data. In this application, the class called the Executor maintains the ZooKeeper connection, and the class called the DataMonitor monitors the data in the ZooKeeper tree. Also, Executor contains the main thread and contains the execution logic. It is responsible for what little user interaction there is, as well as interaction with the exectuable program you pass in as an argument and which the sample (per the requirements) shuts down and restarts, according to the state of the znode.
通俗翻譯下,官方認為,一個zookeeper應用簡單地分為兩個單元即可,一個用來管理連接,另一個用來監視數據。在我們的Demo中,Executor被用來管理和Zookeeper的連接,DataMonitor用來監視Zookeeper中的數據樹。另外,Executor同時也包含了主線程的執行(也就是Main函數),然后也包含一點點的執行邏輯。Demo對用戶交互的部分非常少,輸入的幾個參數(好吧,我自己去實現的時候,偷懶去寫死了)就可以開始跑了,Demo還會對znode的變化或者新的znode的連入進行一些響應。
看完這段,對原文最下面的完整樣例算了解一些,那這里既然就兩個類,Executor和DataMonitor嘛,對吧。就分別看下。
javapublic Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); }
這個構造函數么實際上做了這么幾個事:
這里的filename是用來記錄znode變化后的data的,隨便填寫,當然你也可以刪了。
exec就是上面提到的 znode變化后執行的程序。(我偷懶,指定了一個ls)
zk就是用來初始化ZooKeeper連接的啦,第一個參數是主機和端口號,第二個是Session超時時間,第三個是DefaultWatcher。DefaultWacheter就是你在不指定wacher對象但是想watch的時候默認的watcher,Wathcer是一個接口。
dm就是初始化DataMoniter用來監視znode啦。
Watcher
javapublic void process(WatchedEvent event);
這個是Watcher接口里的唯一函數,用處就是在你關心的事件被觸發后,會調用這個接口,具體參數內容是什么就看文檔吧。
DataMonitor
javapublic DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
這段代碼是DataMonitor的構造函數,除了里面的一些賦值函數,重點說下zk.exists這個函數
它有4個參數,第一個參數是你要query的znode名字,類似/zk_test這樣的。第二個參數是問你要不要watch,就是這個結點變化后要不要得到通知,第三個是exists結果的回調,是StatCallback接口,exists目的是想查找這個znode是否存在,結果從第三個接口返回,
其實這個接口的設計讓我挺頭疼的,單一職責原則不見了,這個接口干了兩件事情,第一是告訴你這個znode存在不存在,第二件事是問你要不要監控這個znode的變化。OK這里你不提供Watcher的話,就用上面new Zookeeper里面提供的默認Watcher(這里就是我們的Executor對象啦)。
好了讓我們看下工作流吧。
zk.exists第一次調用的時候,返回的結果傳給了DataMonitor(實現了StatCallback接口)的processResult函數
java public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don"t need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
rc是這次exists查詢的返回碼,具體含義看變量名也大概知道。 最后一個if里面的listener是DataMonitor里面的DataMonitorListener接口,我們的Executor實現了它(其實就是把結果從DataMonitor再傳出去)。這里實現的效果就是如果結點存在的話,就把結點的數據傳出去,不然就傳個null,當然如果數據沒有變化(用prevData記錄了上一次的數據)那么就什么都不做。
于是第一次你的exec就執行了。
這時候,我用zkCli去刪除了這個監視的結點(我設置的是/zk_test)或者重新設置了data,那么我的Watcher都會收到通知(這里我就很奇怪,為什么不是我關心的事情,比如exists關心的應該是create和delete)
源碼里是這么說的
java/** * Return the stat of the node of the given path. Return null if no such a * node exists. ** If the watch is non-null and the call is successful (no exception is thrown), * a watch will be left on the node with the given path. The watch will be * triggered by a successful operation that creates/delete the node or sets * the data on the node. * * @param path the node path * @param watcher explicit watcher * @return the stat of the node of the given path; return null if no such a * node exists. * @throws KeeperException If the server signals an error * @throws InterruptedException If the server transaction is interrupted. * @throws IllegalArgumentException if an invalid path is specified */ public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException;
OK,我剛剛設置的Watcher是Executor,實際上Executor是把這個事情傳給了DataMonitor,看process函數
javapublic void process(WatchedEvent event) { dm.process(event); }
DataMonitor中的process
javapublic void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: logger.debug("SyncConnected"); // In this particular example we don"t need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: logger.debug("expired"); // It"s all over dead = true; listener.closing(Code.SESSIONEXPIRED.intValue()); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let"s find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
關鍵的是第一個if/else, else中處理的事件是我們最關心的和znode相關的,我們發現znode發生了變化(create,delete,dataChanged,childChanged)就再次調用exists去進行我們結點關心的操作。
好了,說了那么多的代碼其實都是在說框架里的東西。這個demo里最讓人不關心的是業務邏輯部分么= =
請看Executor里面的exists方法
java public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } }
就是根據我們的要的data來執行我們要的代碼之類的。具體大家可以調試一下。
兩篇文章應該夠入個門了= =,接下去我會做一些跟自己工作上業務結合的東西。
真正寫代碼的時候,樣例、文檔、源碼三者聯合起來看,你才能看的出作者的設計目的,但是用多了才能知道作者的設計初衷。
啊,如果有人可以看完這篇blog,在下感謝您的大力支持。
有刺盡管挑= = 大家和諧討論。應該還有三的。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/64220.html
摘要:好吧,就是給指定的結點里面稱之為提供了統一的名稱。分布式鎖服務這個特性是最吸引我的特性了,如何實現分布式鎖呢,就是使用提供的有序且臨時的特性實現。當然詳細的可以參照分布式鎖避免羊群效應這篇文章,同時寫了如何避免羊群效應。 最近想學東西,于是就又拿起前段時間因為沒時間而落下的zookeeper啃了起來,第一次啃完教程發現什么都不明白,第二次啃完發現,這東西,就這么簡單的東西啊? 先來摘...
摘要:后面聽到的時候,是因為可以作為分布式鎖的一種實現。二為什么能干這么多從上面我們可以知道,可以用來做統一配置管理統一命名服務分布式鎖集群管理。 前言 只有光頭才能變強。文本已收錄至我的GitHub倉庫,歡迎Star:https://github.com/ZhongFuCheng3y/3y 上次寫了一篇 什么是消息隊列?以后,本來想入門一下Kafka的(裝一下環境、看看Kafka一些概念...
摘要:項目地址前言大數據技術棧思維導圖大數據常用軟件安裝指南一分布式文件存儲系統分布式計算框架集群資源管理器單機偽集群環境搭建集群環境搭建常用命令的使用基于搭建高可用集群二簡介及核心概念環境下的安裝部署和命令行的基本使用常用操作分區表和分桶表視圖 項目GitHub地址:https://github.com/heibaiying... 前 言 大數據技術棧思維導圖 大數據常用軟件安裝指...
閱讀 1891·2021-11-17 09:33
閱讀 6484·2021-10-12 10:20
閱讀 2306·2021-09-22 15:50
閱讀 1793·2021-09-22 15:10
閱讀 626·2021-09-10 10:51
閱讀 630·2021-09-10 10:50
閱讀 3048·2021-08-11 11:19
閱讀 1786·2019-08-30 15:55