Java zookeeper服務的使用詳解
Java語言客戶端使用zookeeper
下載zookeeper連接工具,方便我們查看zookeeper存的數據。下載地址:
https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取碼: 3ych
下載後解壓就可以使用瞭:
使用頁面:
Java語言連接z00keeper
首先引入maven 依賴jar包
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> </dependency>
編寫Java代碼
public class Test001 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計數器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //zk核心節點+事件通知 //節點路徑和界定啊value /** * 參數一:連接地址 * 參數二:zk超時時間 * 參數三:事件通知 */ //1、創建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計數器減 1 } } }); //計數器結果必須是為0 才能繼續執行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創建我們的連接"); //2、創建我們的節點 /** * 參數一:路徑名稱 * 參數二:節點value * 參數三:節點權限acl * 蠶食四:節點類型 臨時和永久 */ String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(s); zooKeeper.close(); } }
可以利用連接工具查看操作結果。
zooKeeper類有很多api操作節點,可以創建、刪除。
zookeeper Javaapi文檔: 點擊查看
四種節點類型
第一種:臨時節點:會話關閉之後,就自動消失 CreateMode.PERSISTENT_SEQUENTIAL
第二種:臨時有序節點 CreateMode.EPHEMERAL
第三種:持久節點:會話關閉之後,持久化到硬盤 CreateMode.PERSISTENT
第四種:持久有序節點 CreateMode.PERSISTENT_SEQUENTIAL
ACL權限
ACL權限模型,實際上就是對樹每個節點實現控制.
身份的認證有4種方式:
world:默認方式,相當於全世界都能訪問.
auth:代表已經認證通過的用戶(cli中可以通過addauth digest user:pwd來添加當前上下文中的授權用戶).
digest:即用戶名:密碼這種方式認證,這也是業務系統中最常用的.
ip:使用lp地址認證。
代碼案例:使用賬號密碼實現權限控制
1、添加有權限控制的節點數據
public class Test002 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計數器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { /** * 參數一:連接地址 * 參數二:zk超時時間 * 參數三:事件通知 */ //1、創建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計數器減 1 } } }); //計數器結果必須是為0 才能繼續執行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創建我們的連接"); //創建賬號 admin 可以實現讀寫操作 Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123")); ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin); //創建賬號 guest 隻允許做讀操作 Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123")); ACL acl2 = new ACL(ZooDefs.Perms.READ, guest); ArrayList<ACL> acls = new ArrayList<>(); acls.add(acl1); acls.add(acl2); //2、創建我們的節點 /** * 參數一:路徑名稱 * 參數二:節點value * 參數三:節點權限acl * 蠶食四:節點類型 臨時和永久 */ String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT); System.out.println(s); zooKeeper.close(); } }
2、獲取設置瞭權限的節點數據
public class Test003 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計數器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { /** * 參數一:連接地址 * 參數二:zk超時時間 * 參數三:事件通知 */ //1、創建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計數器減 1 } } }); //計數器結果必須是為0 才能繼續執行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創建我們的連接"); //設置一下zookeeper 的賬號才有權限獲取內容 zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes()); //獲取節點的內容 byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat()); System.out.println(new String(data)); zooKeeper.close(); } }
實現事件監聽通知
Zookeeper實現基本的總結:類似於文件存儲系統,可以幫助我們解決分佈式領域中遇到問題
Zookeeper分佈式協調工具
特征:
- 定義的節點包含key (路徑)和 value ,路徑不允許有重復保證唯一性.
- Zookeeper分為四種類型持久、持久序號、臨時、臨時序號.
- 持久與臨時節點區別:連接如果一旦關閉,當前的節點自動刪除;
- 事件通知監聽節點發生的變化刪除、修改、子節點
Java代碼案例:
public class Test004 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計數器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { //1、創建zk 連接 ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT); String parentPath = "/kaico/jing"; //2、監聽節點發生的變化,監聽子節點是否發生變化,如果發生變化都可以獲取到回調通知。 // zkClient.subscribeChildChanges(parentPath, new IZkChildListener() { // @Override // public void handleChildChange(String s, List<String> list) throws Exception { // System.out.println("s:" + s + ",節點發生瞭變化"); // list.forEach((t)->{ // System.out.println("子節點:" + t); // }); // } // }); //監聽節點的內容是否發生變化或刪除 zkClient.subscribeDataChanges(parentPath, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println("修改的節點為:" + s + ",修改之後的值:" + o); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("節點:" + s + "被刪除"); } }); //修改值內容 zkClient.writeData(parentPath, "666666666666666"); while (true){ } // zkClient.close(); } }
微服務使用zookeeper作為註冊中心
調用接口邏輯圖
使用zookeeper實現邏輯:
根據服務提供方的名稱創建對應的節點,服務提供方的接口所有的ip+端口作為子節點的value的值,這樣服務調用方根據服務提供方的名稱在zookeeper上找到對應的ip+端口從而可以調用對應的接口,再監聽該節點,如果提供接口的機器發生宕機於zookeeper斷開連接,子節點也相應的減少瞭,服務調用方也會收到通知。
分佈式鎖
分佈式鎖的概念:解決再多個jvm中最終隻能有一個jvm 執行。
zookeeper實現分佈式鎖的思路:
節點保證唯一、事件通知、臨時節點(生命周期和Session會關聯)﹒
創建分佈式鎖原理:
1.多個jvm同時在Zookeeper 上創建相同的臨時節點(lockPath).
2. 因為臨時節點路徑保證唯一的性,隻要誰能夠創建成功誰就能夠獲取鎖,就可以開始執
行業務邏輯;,
3.如果節點已經給其他請求創建的話或者是創建節點失敗,當前的請求實現等待;
釋放鎖的原理
因為我們采用臨時節點,當前節點創建成功,表示獲取鎖成功;正常執行完業務邏輯調用Session關閉連接方法,當前的節點會刪除;—-釋放鎖
其他正在等待請求,采用事件監聽如果當前節點被刪除的話,又重新進入到獲取鎖流程;
臨時節點+事件通知。
代碼實現分佈式鎖
實現分佈式鎖的方式有多種:數據庫、redis、zookeeper,這裡使用zookeeper實現。
實現原理:
因為Zookeeper節點路徑保持唯一,不允許重復 且有臨時節點特性連接關閉後當前節點會自動消失,從而實現分佈式鎖。
- 多請求同時創建相同的節點(lockPath),隻要誰能夠創建成功 誰就能夠獲取到鎖;
- 如果創建節點的時候,突然該節點已經被其他請求創建的話則直接等待;
- 隻要能夠創建節點成功,則開始進入到正常業務邏輯操作,其他沒有獲取鎖進行等待;
- 正常業務邏輯流程執行完後,調用zk關閉連接方式釋放鎖,從而是其他的請求開始進入到獲取鎖的資源。
使用zookeeper 實現分們式鎖的代碼案例
利用模板設計模式實現分佈式鎖
1、定義鎖接口 Lock
public interface Lock { /** * 獲取鎖 */ public void getLock(); /** * 釋放鎖 */ public void unLock(); }
2、定義抽象類實現鎖接口 Lock ,設計其他的方法完成對鎖的操作
abstract class AbstractTemplzateLock implements Lock { @Override public void getLock() { // 模版方法 定義共同抽象的骨架 if (tryLock()) { System.out.println(">>>" + Thread.currentThread().getName() + ",獲取鎖成功"); } else { // 開始實現等待 waitLock();// 事件監聽 // 重新獲取 getLock(); } } /** * 獲取鎖 * @return */ protected abstract boolean tryLock(); /** * 等待鎖 * @return */ protected abstract void waitLock(); /** * 釋放鎖 * @return */ protected abstract void unImplLock(); @Override public void unLock() { unImplLock(); } }
3、利用zookeeper實現鎖,繼承抽象類 AbstractTemplzateLock
public class ZkTemplzateImplLock extends AbstractTemplzateLock { //參數1 連接地址 private static final String ADDRES = "192.168.212.147:2181"; // 參數2 zk超時時間 private static final int TIMEOUT = 5000; // 創建我們的zk連接 private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT); /** * 共同的創建臨時節點 */ private String lockPath = "/lockPath"; private CountDownLatch countDownLatch = null; @Override protected boolean tryLock() { // 獲取鎖的思想:多個jvm同時創建臨時節點,隻要誰能夠創建成功 誰能夠獲取到鎖 try { zkClient.createEphemeral(lockPath); return true; } catch (Exception e) { // // 如果創建已經存在的話 // e.printStackTrace(); return false; } } @Override protected void waitLock() { // 1.使用事件監聽 監聽lockPath節點是否已經被刪除,如果被刪除的情況下 有可以重新的進入到獲取鎖的權限 IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { if (countDownLatch != null) { countDownLatch.countDown();// 計數器變為0 } } }; zkClient.subscribeDataChanges(lockPath, iZkDataListener); // 2.使用countDownLatch等待 if (countDownLatch == null) { countDownLatch = new CountDownLatch(1); } try { countDownLatch.await();// 如果當前計數器不是為0 就一直等待 } catch (Exception e) { } // 3. 如果當前節點被刪除的情況下,有需要重新進入到獲取鎖 zkClient.unsubscribeDataChanges(lockPath, iZkDataListener); } @Override protected void unImplLock() { if (zkClient != null) { zkClient.close(); System.out.println(Thread.currentThread().getName() + ",釋放瞭鎖>>>"); } } }
4、編寫使用鎖的方法
//main方法使用多線程測試分佈式鎖 public static void main(String[] args) { // OrderService orderService = new OrderService(); for (int i = 0; i < 100; i++) { new Thread(new OrderService()).start(); } // 單個jvm中多線程同時生成訂單號碼如果發生重復 如何解決 synchronized或者是lock鎖 // 如果在多個jvm中同時生成訂單號碼如果發生重復如何解決 // 註意synchronized或者是lock鎖 隻能夠在本地的jvm中有效 // 分佈式鎖的概念 } //多線程run 方法 public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZkTemplzateImplLock(); @Override public void run() { getNumber(); } private void getNumber() { try { lock.getLock(); Thread.sleep(50); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",獲取的number:" + number); // 如果zk超時瞭,有做數據庫寫的操作統一直接回滾 } catch (Exception e) { } finally { lock.unLock(); } } // ZkTemplzateImplLock父親 模版類 AbstractTemplzateLock 父親 Lock } //自動生成訂單號的類 public class OrderNumGenerator { /** * 序號 */ private static int count; /** * 生成我們的時間戳 為訂單號碼 * @return */ public String getNumber() { SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); try { Thread.sleep(30); } catch (Exception e) { } return simpt.format(new Date()) + "-" + ++count; } }
如何防止死鎖?
創建zkClient時設置session 連接時間 sessionTimeout。 也就是設置Session連接超時時間,在規定的時間內獲取鎖後超時啦~自動回滾當前數據庫業務邏輯。
註意:等待鎖時,zkClient註冊的事件最後需要刪除。
到此這篇關於Java zookeeper服務的使用詳解的文章就介紹到這瞭,更多相關Java zookeeper服務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 分析ZooKeeper分佈式鎖的實現
- 基於Zookeeper實現分佈式鎖詳解
- java並發包中CountDownLatch和線程池的使用詳解
- Java中實現線程間通信的實例教程
- java多線程CountDownLatch與線程池ThreadPoolExecutor/ExecutorService案例