Java zookeeper服務的使用詳解

Java語言客戶端使用zookeeper

下載zookeeper連接工具,方便我們查看zookeeper存的數據。下載地址:

https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取碼: 3ych

下載後解壓就可以使用瞭:

使用頁面:

Java語言連接z00keeper

首先引入maven 依賴jar包

<dependency>
	<groupId>com.101tec</groupId>
	<artifactId>zkclient</artifactId>
	<version>0.9</version>
</dependency>

編寫Java代碼

public class Test001 {
    private static final String ADDRES = "127.0.0.1:2181";
    private static final int TIMAOUT = 5000;
    //計數器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        //zk核心節點+事件通知
        //節點路徑和界定啊value
        /**
         * 參數一:連接地址
         * 參數二:zk超時時間
         * 參數三:事件通知
         */
        //1、創建zk鏈接
        ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.KeeperState state = watchedEvent.getState();
                if(state == Event.KeeperState.SyncConnected){
                    System.out.println("zk鏈接成功");
                    countDownLatch.countDown(); //計數器減 1
                }
            }
        });
        //計數器結果必須是為0 才能繼續執行
        System.out.println("zk正在等待連接");
        countDownLatch.await();
        System.out.println("開始創建我們的連接");
        //2、創建我們的節點
        /**
         * 參數一:路徑名稱
         * 參數二:節點value
         * 參數三:節點權限acl
         * 蠶食四:節點類型 臨時和永久
         */
        String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(s);
        zooKeeper.close();
    }
}

可以利用連接工具查看操作結果。

zooKeeper類有很多api操作節點,可以創建、刪除。

zookeeper Javaapi文檔: 點擊查看

四種節點類型

第一種:臨時節點:會話關閉之後,就自動消失 CreateMode.PERSISTENT_SEQUENTIAL

第二種:臨時有序節點 CreateMode.EPHEMERAL

第三種:持久節點:會話關閉之後,持久化到硬盤 CreateMode.PERSISTENT

第四種:持久有序節點 CreateMode.PERSISTENT_SEQUENTIAL

ACL權限

ACL權限模型,實際上就是對樹每個節點實現控制.

身份的認證有4種方式:

world:默認方式,相當於全世界都能訪問.

auth:代表已經認證通過的用戶(cli中可以通過addauth digest user:pwd來添加當前上下文中的授權用戶).

digest:即用戶名:密碼這種方式認證,這也是業務系統中最常用的.

ip:使用lp地址認證。

代碼案例:使用賬號密碼實現權限控制

1、添加有權限控制的節點數據

public class Test002 {
    private static final String ADDRES = "127.0.0.1:2181";
    private static final int TIMAOUT = 5000;
    //計數器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        /**
         * 參數一:連接地址
         * 參數二:zk超時時間
         * 參數三:事件通知
         */
        //1、創建zk鏈接
        ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.KeeperState state = watchedEvent.getState();
                if(state == Event.KeeperState.SyncConnected){
                    System.out.println("zk鏈接成功");
                    countDownLatch.countDown(); //計數器減 1
                }
            }
        });
        //計數器結果必須是為0 才能繼續執行
        System.out.println("zk正在等待連接");
        countDownLatch.await();
        System.out.println("開始創建我們的連接");
        //創建賬號 admin 可以實現讀寫操作
        Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
        ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin);
        //創建賬號 guest 隻允許做讀操作
        Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123"));
        ACL acl2 = new ACL(ZooDefs.Perms.READ, guest);
        ArrayList<ACL> acls = new ArrayList<>();
        acls.add(acl1);
        acls.add(acl2);
        //2、創建我們的節點
        /**
         * 參數一:路徑名稱
         * 參數二:節點value
         * 參數三:節點權限acl
         * 蠶食四:節點類型 臨時和永久
         */
        String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT);
        System.out.println(s);
        zooKeeper.close();
    }
}

2、獲取設置瞭權限的節點數據

public class Test003 {
    private static final String ADDRES = "127.0.0.1:2181";
    private static final int TIMAOUT = 5000;
    //計數器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        /**
         * 參數一:連接地址
         * 參數二:zk超時時間
         * 參數三:事件通知
         */
        //1、創建zk鏈接
        ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                Event.KeeperState state = watchedEvent.getState();
                if(state == Event.KeeperState.SyncConnected){
                    System.out.println("zk鏈接成功");
                    countDownLatch.countDown(); //計數器減 1
                }
            }
        });
        //計數器結果必須是為0 才能繼續執行
        System.out.println("zk正在等待連接");
        countDownLatch.await();
        System.out.println("開始創建我們的連接");
        //設置一下zookeeper 的賬號才有權限獲取內容
        zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());
        //獲取節點的內容
        byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat());
        System.out.println(new String(data));
        zooKeeper.close();
    }
}

實現事件監聽通知

Zookeeper實現基本的總結:類似於文件存儲系統,可以幫助我們解決分佈式領域中遇到問題

Zookeeper分佈式協調工具

特征:

  • 定義的節點包含key (路徑)和 value ,路徑不允許有重復保證唯一性.
  • Zookeeper分為四種類型持久、持久序號、臨時、臨時序號.
  • 持久與臨時節點區別:連接如果一旦關閉,當前的節點自動刪除;
  • 事件通知監聽節點發生的變化刪除、修改、子節點

Java代碼案例:

public class Test004 {
    private static final String ADDRES = "127.0.0.1:2181";
    private static final int TIMAOUT = 5000;
    //計數器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        //1、創建zk 連接
        ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT);
        String parentPath = "/kaico/jing";
        //2、監聽節點發生的變化,監聽子節點是否發生變化,如果發生變化都可以獲取到回調通知。
//        zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
//            @Override
//            public void handleChildChange(String s, List<String> list) throws Exception {
//                System.out.println("s:" + s + ",節點發生瞭變化");
//                list.forEach((t)->{
//                    System.out.println("子節點:" + t);
//                });
//            }
//        });
        //監聽節點的內容是否發生變化或刪除
        zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("修改的節點為:" + s + ",修改之後的值:" + o);
            }
            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("節點:" + s + "被刪除");
            }
        });
        //修改值內容
        zkClient.writeData(parentPath, "666666666666666");
        while (true){
        }
//        zkClient.close();
    }
}

微服務使用zookeeper作為註冊中心

調用接口邏輯圖

使用zookeeper實現邏輯:

根據服務提供方的名稱創建對應的節點,服務提供方的接口所有的ip+端口作為子節點的value的值,這樣服務調用方根據服務提供方的名稱在zookeeper上找到對應的ip+端口從而可以調用對應的接口,再監聽該節點,如果提供接口的機器發生宕機於zookeeper斷開連接,子節點也相應的減少瞭,服務調用方也會收到通知。

分佈式鎖

分佈式鎖的概念:解決再多個jvm中最終隻能有一個jvm 執行。

zookeeper實現分佈式鎖的思路:

節點保證唯一、事件通知、臨時節點(生命周期和Session會關聯)﹒

創建分佈式鎖原理:

1.多個jvm同時在Zookeeper 上創建相同的臨時節點(lockPath).

2. 因為臨時節點路徑保證唯一的性,隻要誰能夠創建成功誰就能夠獲取鎖,就可以開始執

行業務邏輯;,

3.如果節點已經給其他請求創建的話或者是創建節點失敗,當前的請求實現等待;

釋放鎖的原理

因為我們采用臨時節點,當前節點創建成功,表示獲取鎖成功;正常執行完業務邏輯調用Session關閉連接方法,當前的節點會刪除;—-釋放鎖

其他正在等待請求,采用事件監聽如果當前節點被刪除的話,又重新進入到獲取鎖流程;

臨時節點+事件通知。

代碼實現分佈式鎖

實現分佈式鎖的方式有多種:數據庫、redis、zookeeper,這裡使用zookeeper實現。

實現原理:

因為Zookeeper節點路徑保持唯一,不允許重復 且有臨時節點特性連接關閉後當前節點會自動消失,從而實現分佈式鎖。

  • 多請求同時創建相同的節點(lockPath),隻要誰能夠創建成功 誰就能夠獲取到鎖;
  • 如果創建節點的時候,突然該節點已經被其他請求創建的話則直接等待;
  • 隻要能夠創建節點成功,則開始進入到正常業務邏輯操作,其他沒有獲取鎖進行等待;
  • 正常業務邏輯流程執行完後,調用zk關閉連接方式釋放鎖,從而是其他的請求開始進入到獲取鎖的資源。

使用zookeeper 實現分們式鎖的代碼案例

利用模板設計模式實現分佈式鎖

1、定義鎖接口 Lock

public interface Lock {
    /**
     * 獲取鎖
     */
    public void getLock();
    /**
     * 釋放鎖
     */
    public void unLock();
}

2、定義抽象類實現鎖接口 Lock ,設計其他的方法完成對鎖的操作

abstract class AbstractTemplzateLock implements Lock {
    @Override
    public void getLock() {
        // 模版方法 定義共同抽象的骨架
        if (tryLock()) {
            System.out.println(">>>" + Thread.currentThread().getName() + ",獲取鎖成功");
        } else {
            // 開始實現等待
            waitLock();// 事件監聽
            // 重新獲取
            getLock();
        }
    }
    /**
     * 獲取鎖
     * @return
     */
    protected abstract boolean tryLock();
    /**
     * 等待鎖
     * @return
     */
    protected abstract void waitLock();
    /**
     * 釋放鎖
     * @return
     */
    protected abstract void unImplLock();
    @Override
    public void unLock() {
        unImplLock();
    }
}

3、利用zookeeper實現鎖,繼承抽象類 AbstractTemplzateLock

public class ZkTemplzateImplLock extends AbstractTemplzateLock {
    //參數1 連接地址
    private static final String ADDRES = "192.168.212.147:2181";
    // 參數2 zk超時時間
    private static final int TIMEOUT = 5000;
    // 創建我們的zk連接
    private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT);
    /**
     * 共同的創建臨時節點
     */
    private String lockPath = "/lockPath";
    private CountDownLatch countDownLatch = null;
    @Override
    protected boolean tryLock() {
        // 獲取鎖的思想:多個jvm同時創建臨時節點,隻要誰能夠創建成功 誰能夠獲取到鎖
        try {
            zkClient.createEphemeral(lockPath);
            return true;
        } catch (Exception e) {
//            // 如果創建已經存在的話
//            e.printStackTrace();
            return false;
        }
    }
    @Override
    protected void waitLock() {
        // 1.使用事件監聽 監聽lockPath節點是否已經被刪除,如果被刪除的情況下 有可以重新的進入到獲取鎖的權限
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
            }
            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();// 計數器變為0
                }
            }
        };
        zkClient.subscribeDataChanges(lockPath, iZkDataListener);
        // 2.使用countDownLatch等待
        if (countDownLatch == null) {
            countDownLatch = new CountDownLatch(1);
        }
        try {
            countDownLatch.await();// 如果當前計數器不是為0 就一直等待
        } catch (Exception e) {
        }
        // 3. 如果當前節點被刪除的情況下,有需要重新進入到獲取鎖
        zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
    }
    @Override
    protected void unImplLock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + ",釋放瞭鎖>>>");
        }
    }
}

4、編寫使用鎖的方法

//main方法使用多線程測試分佈式鎖
  public static void main(String[] args) {
//        OrderService orderService = new OrderService();
        for (int i = 0; i < 100; i++) {
            new Thread(new OrderService()).start();
        }
        // 單個jvm中多線程同時生成訂單號碼如果發生重復 如何解決 synchronized或者是lock鎖
        // 如果在多個jvm中同時生成訂單號碼如果發生重復如何解決
        // 註意synchronized或者是lock鎖 隻能夠在本地的jvm中有效
        // 分佈式鎖的概念
    }
//多線程run 方法
public class OrderService implements Runnable {
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
    private Lock lock = new ZkTemplzateImplLock();
    @Override
    public void run() {
        getNumber();
    }
    private void getNumber() {
        try {
            lock.getLock();
            Thread.sleep(50);
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",獲取的number:" + number);
            // 如果zk超時瞭,有做數據庫寫的操作統一直接回滾
        } catch (Exception e) {

        } finally {
            lock.unLock();
        }
    }
    // ZkTemplzateImplLock父親 模版類 AbstractTemplzateLock 父親  Lock
}
//自動生成訂單號的類
public class OrderNumGenerator {
    /**
     * 序號
     */
    private static int count;
    /**
     * 生成我們的時間戳 為訂單號碼
     * @return
     */
    public String getNumber() {
        SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        try {
            Thread.sleep(30);
        } catch (Exception e) {
        }
        return simpt.format(new Date()) + "-" + ++count;
    }
}

如何防止死鎖?

創建zkClient時設置session 連接時間 sessionTimeout。 也就是設置Session連接超時時間,在規定的時間內獲取鎖後超時啦~自動回滾當前數據庫業務邏輯。

註意:等待鎖時,zkClient註冊的事件最後需要刪除。

到此這篇關於Java zookeeper服務的使用詳解的文章就介紹到這瞭,更多相關Java zookeeper服務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: