零 版本
JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
Zookeeper Server 版本 : 3.5.4-beta
Zookeeper Client 版本 : 3.5.4-beta
Curator 版本 : 4.2.0
一 Zookeeper ClientZookeeper Client 是 Zookeeper 的經(jīng)典原生客戶端。使用之前需要在 Maven 中導(dǎo)入依賴:
org.apache.zookeeper zookeeper 3.5.4-beta
代碼:
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.TimeUnit; public class ClientTest { public static void main(String[] args) { /** * 創(chuàng)建一個(gè) Zookeeper 的實(shí)例 * 此處為一個(gè)集群,Zookeeper 的 ip 之間用逗號(hào)隔開 * * 參數(shù)解釋: * param 1 - Zookeeper 的實(shí)例 ip ,此處是一個(gè)集群,所以配置了多個(gè) ip,用逗號(hào)隔開 * param 2 - session 過期時(shí)間,單位秒 (1000) * param 3 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch) */ ZooKeeper zooKeeper = null; try { Watcher createZkWatch = new MyWatch(); zooKeeper = new ZooKeeper("localhost:2101,localhost:2102,localhost:2103", 1000,createZkWatch); } catch (IOException e) { e.printStackTrace(); } /** * 值得注意的是,Zookeeper 對(duì)象去連接中間件實(shí)例是異步的 * 所以此處需要做一個(gè)死循環(huán)等待它連接完畢 * 更加優(yōu)雅的做法是使用 CownDownLatch 去做,但是 while 比較簡(jiǎn)單 */ while(zooKeeper.getState() == ZooKeeper.States.CONNECTING){ //返回 zookeeper 的狀態(tài) System.out.println(zooKeeper.getState()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } //如果連接不出錯(cuò)的話此處狀態(tài)應(yīng)該為 CONNECTED if(zooKeeper.getState() != ZooKeeper.States.CONNECTED) return; /** * 創(chuàng)建 ZooKeeper 節(jié)點(diǎn) * 參數(shù)解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 節(jié)點(diǎn)數(shù)據(jù) (my first data) * param 3 - 設(shè)置權(quán)限 (OPEN_ACL_UNSAFE) * param 4 - znode 類型 (PERSISTENT) * * * znode 類型有四種: * PERSISTENT - 持久化目錄節(jié)點(diǎn),客戶端與zookeeper斷開連接后,該節(jié)點(diǎn)依舊存在 * PERSISTENT_SEQUENTIAL - 持久化,并帶有序列號(hào) * EPHEMERAL - 臨時(shí)目錄節(jié)點(diǎn),客戶端與zookeeper斷開連接后,該節(jié)點(diǎn)被刪除 * EPHEMERAL_SEQUENTIAL - 臨時(shí),并帶有序列號(hào) */ try { String s = zooKeeper.create("/zoo", "my first data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("創(chuàng)建節(jié)點(diǎn):" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 創(chuàng)建一個(gè)二級(jí)節(jié)點(diǎn),參數(shù)同上 * 需要注意的是,必須要有一級(jí)節(jié)點(diǎn)才能有二級(jí)節(jié)點(diǎn),不然會(huì)報(bào)錯(cuò) */ try { String s = zooKeeper.create("/zoo/zoo_1", "my first data_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("創(chuàng)建二級(jí)節(jié)點(diǎn):" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查詢 ZooKeeper 節(jié)點(diǎn)的數(shù)據(jù) * 參數(shù)解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch) * param 3 - Zookeeper 實(shí)例信息和數(shù)據(jù)信息 (stat) * * 注意如果后續(xù)需要修改該節(jié)點(diǎn)的值,可以在此處記錄節(jié)點(diǎn)版本 version (非必要操作) */ Integer zooVersion = null; try { MyWatch getDataWatch = new MyWatch(); Stat stat = new Stat(); byte[] data = zooKeeper.getData("/zoo",getDataWatch,stat); System.out.println("查詢節(jié)點(diǎn)數(shù)據(jù):" + new String(data)); //從 stat 中可以獲取很多 Zookeeper 實(shí)例的信息 System.out.println("查詢節(jié)點(diǎn)數(shù)據(jù) czxid:" + stat.getCzxid()); //zxid zooVersion = stat.getVersion(); //此處獲取 /zoo 節(jié)點(diǎn)的版本號(hào) } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 修改 ZooKeeper 節(jié)點(diǎn)的數(shù)據(jù) * 參數(shù)解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 節(jié)點(diǎn)新數(shù)據(jù) (my first data change) * param 3 - 該節(jié)點(diǎn)的版本 * * 在成功修改了節(jié)點(diǎn)的數(shù)據(jù)之后,版本號(hào)會(huì)自動(dòng)加一 * 如果此時(shí)不知道節(jié)點(diǎn)的版本,也可以輸入 -1,會(huì)默認(rèn)取最新的節(jié)點(diǎn)版本去修改 */ try { Stat stat = zooKeeper.setData("/zoo", "my first data change".getBytes(), zooVersion); // zooVersion = -1 System.out.println("修改節(jié)點(diǎn)數(shù)據(jù) czxid:" + stat.getCzxid()); System.out.println("修改節(jié)點(diǎn)數(shù)據(jù) version:" + stat.getVersion()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查看 ZooKeeper 節(jié)點(diǎn)是否存在 * 參數(shù)解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 監(jiān)視者,用于獲取監(jiān)控事件 (MyWatch) * * 如果不存在,返回的 stat 為 null */ try { Stat stat = zooKeeper.exists("/zoo_not_exist", new MyWatch()); System.out.println("查看節(jié)點(diǎn)是否存在 stat:" + stat); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 刪除 ZooKeeper 節(jié)點(diǎn) * 參數(shù)解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 該節(jié)點(diǎn)的版本 * * 版本號(hào)如果不清楚的話可以填入 -1,和上述同理 * 值得注意的是,如果一個(gè)節(jié)點(diǎn)下屬存在子節(jié)點(diǎn),那么它不能被刪除 */ try { zooKeeper.delete("/zoo", -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private static class MyWatch implements Watcher{ public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent); } } }二 Curator
Curator 是 Netfix 開發(fā)的 Zookeeper Client,使用起來更方便,功能更加強(qiáng)大,目前應(yīng)用更加廣泛。使用之前需要在 Maven 中導(dǎo)入依賴:
org.apache.curator curator-recipes 4.2.0 org.apache.curator curator-framework 4.2.0
代碼:
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; public class CuratorTest { public static void main(String[] args) { /** * 創(chuàng)建客戶端 * * RetryPolicy 接口是重試策略 */ /** * 指定客戶端的重連策略 * * RetryOneTime(int ms) * 休眠一定毫秒數(shù)之后重新連接一次 * * RetryForever(int ms) * 和第一種策略的差別是會(huì)不斷嘗試重連 * * RetryNTimes(int times,int ms) * 和第一種策略的差別是,第一個(gè)參數(shù)指定重連次數(shù),第二個(gè)參數(shù)指定休眠間隔 * * RetryUntilElapsed(int max_sum_ms,int ms) * 第一個(gè)參數(shù)指定最大休眠時(shí)間,第二個(gè)參數(shù)指定休眠間隔,如果休眠時(shí)間超出了就不會(huì)繼續(xù)重連 * * ExponentialBackoffRetry(int ms,int,int max_ms) * 第一個(gè)參數(shù)代表最初的重連休眠時(shí)間,第二個(gè)參數(shù)代表最大重連次數(shù),第三個(gè)參數(shù)代表最大重連休眠時(shí)間 * 該策略下重連的休眠時(shí)間會(huì)隨著重連次數(shù)的增加而增加,從最初休眠時(shí)間一直增加到最大休眠時(shí)間 * 最大重連次數(shù)必須小于等于 29,超過的情況下會(huì)被自動(dòng)修改成 29 * * [其它策略不一一列舉] */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(100,3,1000); /** * 采用 buider 模式創(chuàng)建客戶端 */ CuratorFramework client = CuratorFrameworkFactory.builder() //Zookeeper 的地址 .connectString("localhost:2101,localhost:2102,localhost:2103") //session 的過期時(shí)間(毫秒) .sessionTimeoutMs(5000) //連接的超時(shí)時(shí)間(毫秒) .connectionTimeoutMs(5000) //拒絕策略 .retryPolicy(retryPolicy) //設(shè)置該客戶端能夠操作的目錄權(quán)限,不設(shè)置的話默認(rèn)可以操作全部 //比如此處設(shè)置為 zoo,即為該客戶端對(duì)象操作的節(jié)點(diǎn)前面默認(rèn)會(huì)添加 /zoo .namespace("zoo") //完成創(chuàng)建 .build(); //啟動(dòng)客戶端 client.start(); /** * 創(chuàng)建節(jié)點(diǎn) */ try { String createReturn = client.create() //節(jié)點(diǎn)類型 //PERSISTENT - 持久化目錄節(jié)點(diǎn),客戶端與zookeeper斷開連接后,該節(jié)點(diǎn)依舊存在 //PERSISTENT_SEQUENTIAL - 持久化,并帶有序列號(hào) //EPHEMERAL - 臨時(shí)目錄節(jié)點(diǎn),客戶端與zookeeper斷開連接后,該節(jié)點(diǎn)被刪除 //EPHEMERAL_SEQUENTIAL - 臨時(shí),并帶有序列號(hào) .withMode(CreateMode.PERSISTENT) //由于 namespace 設(shè)置為 zoo,所以此處相當(dāng)于創(chuàng)建 /zoo/zoo_1 節(jié)點(diǎn) .forPath("/zoo_1", "my first data zoo_1".getBytes()); System.out.println("創(chuàng)建節(jié)點(diǎn):" + createReturn); } catch (Exception e) { e.printStackTrace(); } /** * 查詢節(jié)點(diǎn) */ try { Stat stat = client.checkExists() //查詢 /zoo/zoo_1 節(jié)點(diǎn) .forPath("/zoo_1"); //如果不存在,stat 為 null System.out.println("查詢節(jié)點(diǎn):" + stat); } catch (Exception e) { e.printStackTrace(); } /** * 刪除節(jié)點(diǎn) */ try { client.delete() //如果該節(jié)點(diǎn)下有子節(jié)點(diǎn),會(huì)拋出異常且刪除失敗 .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } /** * 查詢節(jié)點(diǎn)的值 */ try { Stat stat = new Stat(); byte[] value = client.getData() //獲取節(jié)點(diǎn)的 stat .storingStatIn(stat) //查詢 /zoo/zoo_1 節(jié)點(diǎn) .forPath("/zoo_1"); System.out.println("查詢節(jié)點(diǎn)的值:" + new String(value)); } catch (Exception e) { e.printStackTrace(); } /** * 更新節(jié)點(diǎn)的值 */ try { Stat stat = client.setData() //設(shè)置版本值,此選項(xiàng)非必填 .withVersion(10086) .forPath("/zoo_1", "zoo_1 new data".getBytes()); } catch (Exception e) { e.printStackTrace(); } /** * 獲取節(jié)點(diǎn)的子節(jié)點(diǎn) */ try { //獲取所有子節(jié)點(diǎn)的節(jié)點(diǎn)名稱 List三 使用 Curator 實(shí)現(xiàn)分布式鎖nodes = client.getChildren() .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } } }
Zookeeper 中的分布式鎖實(shí)現(xiàn)原理很簡(jiǎn)單,就是多個(gè)線程一起去創(chuàng)建同一個(gè)節(jié)點(diǎn),誰創(chuàng)建成功鎖就歸誰;使用完之后刪除該節(jié)點(diǎn),其它節(jié)點(diǎn)再進(jìn)行一次爭(zhēng)搶。Curator 中有一個(gè)寫好的重入鎖 InterProcessMutex,簡(jiǎn)單封裝即可使用:
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Zookeeper 分布式鎖實(shí)現(xiàn) */ public class ZkLock implements Lock{ private InterProcessMutex lock; /** * 讓使用者方便運(yùn)用的構(gòu)造方法 */ public ZkLock(String zkAddrs){ this(zkAddrs, "/lock_node", "lock_base", 2000, new ExponentialBackoffRetry(1000, 10)); } /** * 核心構(gòu)造方法,根據(jù)傳入的參數(shù)去構(gòu)造 lock 對(duì)象 * @param zkAddrs Zookeeper 的服務(wù)地址 * @param lockNode 各個(gè)線程要去爭(zhēng)搶創(chuàng)建的 Znode,也就是客戶端有使用權(quán)限的 namespace * @param baseNode lockNode 的上級(jí) Znode * @param sessionOutTimeMs 過期時(shí)間 * @param policy 重連策略 */ public ZkLock(String zkAddrs,String lockNode,String baseNode,int sessionOutTimeMs,RetryPolicy policy){ //有效性驗(yàn)證 if(Objects.isNull(zkAddrs) || zkAddrs.trim().equals("") || Objects.isNull(lockNode) || lockNode.trim().equals("") || Objects.isNull(policy)) throw new RuntimeException(); //通過工廠創(chuàng)建連接 CuratorFrameworkFactory.Builder cfBuilder = CuratorFrameworkFactory.builder() .connectString(zkAddrs) .sessionTimeoutMs(sessionOutTimeMs) .retryPolicy(policy); if(baseNode != null && !baseNode.trim().equals("")) cfBuilder.namespace(baseNode); CuratorFramework cf = cfBuilder.build(); //開啟連接 cf.start(); //InterProcessMutex 是 Crator 里自帶的一個(gè)已經(jīng)實(shí)現(xiàn)好的重入鎖 //只要對(duì)其進(jìn)行簡(jiǎn)單封裝即可使用 lock = new InterProcessMutex(cf,lockNode); } /** * 上鎖方法,死循環(huán)調(diào)用 tryLock() 去上鎖 */ @Override public void lock() { while (!tryLock()) Thread.yield(); } /** * 嘗試獲取鎖,如果沒能獲取到會(huì)超時(shí)后報(bào)錯(cuò) */ @Override public boolean tryLock() { try { lock.acquire(); } catch (Exception e) { return Boolean.FALSE; } return Boolean.TRUE; } /** * 嘗試獲取鎖,如果指定時(shí)間內(nèi)獲取不到就返回 false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try { return lock.acquire(time,unit); } catch (Exception e) { return Boolean.FALSE; } } /** * 釋放鎖,如果報(bào)錯(cuò)就會(huì)遞歸去釋放 */ @Override public void unlock() { try { lock.release(); } catch (Exception e) { unlock(); } } //忽略 @Override public Condition newCondition() { throw new RuntimeException(); } //忽略 @Override public void lockInterruptibly() throws InterruptedException { lock(); } //測(cè)試 public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)要被操作的對(duì)象 AtomicInteger count = new AtomicInteger(30); //創(chuàng)建一個(gè)線程池 Executor executor = Executors.newFixedThreadPool(10); //創(chuàng)建所對(duì)象 Lock lock = new ZkLock("localhost:2101,localhost:2102,localhost:2103"); //for 循環(huán),把任務(wù)丟進(jìn)線程池里 for(int i = 0; i < 30; i++){ executor.execute(()->{ try { //加鎖 lock.lock(); //此處開啟業(yè)務(wù)邏輯 //demo 中簡(jiǎn)單模擬,將 count 對(duì)象減一 int a = count.decrementAndGet(); System.out.println(a); } catch (Exception e) { e.printStackTrace(); } finally { try { //釋放鎖 lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/74642.html
摘要:項(xiàng)目地址前言大數(shù)據(jù)技術(shù)棧思維導(dǎo)圖大數(shù)據(jù)常用軟件安裝指南一分布式文件存儲(chǔ)系統(tǒng)分布式計(jì)算框架集群資源管理器單機(jī)偽集群環(huán)境搭建集群環(huán)境搭建常用命令的使用基于搭建高可用集群二簡(jiǎn)介及核心概念環(huán)境下的安裝部署和命令行的基本使用常用操作分區(qū)表和分桶表視圖 項(xiàng)目GitHub地址:https://github.com/heibaiying... 前 言 大數(shù)據(jù)技術(shù)棧思維導(dǎo)圖 大數(shù)據(jù)常用軟件安裝指...
摘要:有可能是宕機(jī)或負(fù)荷嚴(yán)重的情況導(dǎo)致的。為分布式系統(tǒng)提供了協(xié)調(diào)功能和控制沖突。 背景 隨著計(jì)算機(jī)的硬件和操作系統(tǒng)兩者相輔相成地發(fā)展,從早期的ENIAC計(jì)算機(jī)到現(xiàn)在的x86的計(jì)算機(jī),從以前的單一控制終端(Single Operator, Single Console, SOSC)的操作系統(tǒng)到現(xiàn)在百花爭(zhēng)鳴的操作系統(tǒng)(如MacOS、Windows、Linux等),現(xiàn)代的操作系統(tǒng)發(fā)展還有一個(gè)最重要...
摘要:具有不可分割性即原語的執(zhí)行必須是連續(xù)的,在執(zhí)行過程中不允許被中斷。提供服務(wù)主要就是通過數(shù)據(jù)結(jié)構(gòu)原語集機(jī)制達(dá)到的。子節(jié)點(diǎn)的版本號(hào)數(shù)據(jù)節(jié)點(diǎn)版本號(hào)版本號(hào)創(chuàng)建該節(jié)點(diǎn)的會(huì)話的。后位則為遞增序列。 前言 最近加入了部門的技術(shù)興趣小組,被分配了Zookeeper的研究任務(wù)。在研究過程當(dāng)中,發(fā)現(xiàn)Zookeeper由于其開源的特性和其卓越的性能特點(diǎn),在業(yè)界使用廣泛,有很多的應(yīng)用場(chǎng)景,而這些不同的應(yīng)用場(chǎng)景...
摘要:由于分布式系統(tǒng)和應(yīng)用可以提供更強(qiáng)的計(jì)算能力,還能更好地容災(zāi)和擴(kuò)展,所以逐漸受到青睞。基礎(chǔ)由若干條指令組成,用于完成特定功能的過程稱為原語。 信息飛速膨脹,很多應(yīng)用無法依賴單個(gè)服務(wù)器處理龐大的數(shù)據(jù)量。由于分布式系統(tǒng)和應(yīng)用可以提供更強(qiáng)的計(jì)算能力,還能更好地容災(zāi)和擴(kuò)展,所以逐漸受到青睞。 在開發(fā)分布式應(yīng)用時(shí),通常需要花費(fèi)大量時(shí)間和精力來處理異構(gòu)系統(tǒng)中的協(xié)作通信問題。 什么是 ZooKeepe...
閱讀 959·2019-08-30 14:24
閱讀 998·2019-08-30 14:13
閱讀 1805·2019-08-29 17:21
閱讀 2690·2019-08-29 13:44
閱讀 1665·2019-08-29 11:04
閱讀 449·2019-08-26 10:44
閱讀 2571·2019-08-23 14:04
閱讀 914·2019-08-23 12:08