1.ZooKeeper Java客户端的基本使用「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」

这节我们来看看通过java代码怎么和ZK进行交互,这里主要使用apache提供的org.apache.zookeeper来进行和zk server进行连接的。

一、client : zookeeper

1.1 说明

       现在我们要使用代码的方式进行连接到zk server,那么我们一个zookeeper java客户端的依赖包,那这里我们使用的就是apache提供的zookeeper.jar。

       在zookeeper提供了和zk server交互的核心类就是ZooKeeper,接下来我们看下的步骤:

(1)创建项目;

(2)在pom.xml文件中添加zookeeper的依赖包;

(3)编写一个测试类,实现创建一个节点和获取一个节点;

1.2 开发环境

(1)操作系统:Mac OS;

(2)ZK Server : 3.6.2;

(3)开发工具:idea;

(4)JDK:1.8

1.3 hello小栗子

1.3.1 创建一个项目

       使用idea创建一个maven project,取名为:zookeeper-java。

1.3.2 添加依赖

       在pom.xml文件中添加依赖,zookeeper(zk client包)和junit(单元测试包):

  1. <!-- zookeeper java客户端,选择的版本号最好是和zk的服务端是相同的版本,避免引发奇奇怪怪的问题 -->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.6.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>4.13.2</version>
  11. <scope>test</scope>
  12. </dependency>

说明:zookeeper的版本最好是和服务端的版本是一样的。

1.3.3 编写例子

(1)和zk server建立连接:

  1. String connectString = "127.0.0.1:2181";
  2. int timeout = 4000;
  3. /*
  4. * 连接过程是异步的
  5. * 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
  6. * 这里也就是main线程结束守护线程也就结束了。
  7. */
  8. ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
  9. public void process(WatchedEvent event) {
  10. if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
  11. System.out.println("连接已经建立");
  12. }
  13. }
  14. });

说明:

① 连接过程是异步的;

② 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。这里也就是main线程结束守护线程也就结束了。

③ 如何避免main线程不结束呢?使用线程的sleep,休眠一下。

       集群的情况下,多个地址使用逗号分隔:

String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";

(2)创建节点

       有了zooKeeper对象之后,就可以使用这个对象的create方法创建节点了:

  1. String path = "/myconfig";
  2. byte[] bytes = new String("hello").getBytes();
  3. //节点不能多次创建,多次创建会报错: KeeperErrorCode = NodeExists for /myconfig
  4. String rs = zooKeeper.create(path,bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  5. //result:/myconfig
  6. System.out.println("result:"+rs);

说明:

① 创建节点调用的方法是:zooKeeper.create(),第一个参数是节点的path;第二个参数是byte[] data;第三个参数是:ACl权限信息;第四个参数是:节点的类型;

② 节点不能多次创建,否则会报错:KeeperErrorCode = NodeExists for /myconfig。

(3)获取节点的数据:

       获取节点的数据是getData,这里有一个Watcher可以监听到数据的变化:

  1. //通过create创建数据,通过get获取数据
  2. //这种方式只能监听一次
  3. byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
  4. public void process(WatchedEvent event) {
  5. if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
  6. System.out.println("数据改变了:"+event.getPath());
  7. }
  8. }
  9. },null);
  10. System.out.println("获取到的数据是:"+ new String(dataBytes));

整个类的代码如下:

  1. package com.kfit;
  2. import org.apache.zookeeper.*;
  3. import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
  4. /**
  5. * TODO
  6. *
  7. * @author 悟纤「公众号SpringBoot」
  8. * @date 2021-03-16
  9. * @slogan 大道至简 悟在天成
  10. */
  11. public class ZkJavaClientDemo {
  12. public static void main(String[] args) throws Exception {
  13. String connectString = "127.0.0.1:2181";
  14. int timeout = 4000;
  15. /*
  16. * 连接过程是异步的
  17. * 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
  18. * 这里也就是main线程结束守护线程也就结束了。
  19. */
  20. ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
  21. public void process(WatchedEvent event) {
  22. if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
  23. System.out.println("连接已经建立");
  24. }
  25. }
  26. });
  27. System.out.println("开始创建节点...");
  28. String path = "/myconfig";
  29. byte[] bytes = new String("hello").getBytes();
  30. //节点不能多次创建,多次创建会报错: KeeperErrorCode = NodeExists for /myconfig
  31. String rs = zooKeeper.create(path,bytes, OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  32. //result:/myconfig
  33. System.out.println("result:"+rs);
  34. //通过create创建数据,通过get获取数据
  35. //这种方式只能监听一次
  36. byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
  37. public void process(WatchedEvent event) {
  38. if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
  39. System.out.println("数据改变了:"+event.getPath());
  40. }
  41. }
  42. },null);
  43. System.out.println("获取到的数据是:"+ new String(dataBytes));
  44. /*
  45. * 通过sleep 避免主线程结束
  46. */
  47. Thread.sleep(Integer.MAX_VALUE);
  48. System.out.println("main end.");
  49. }
  50. }





       运行main方法:

       这时候我们通过zkCli.sh进行访问zk server,然后修改节点数据/myconfig:

set /myconfig hello1

 

1.3.4 永久监听

我们发现现在我们的编码只能监听一次,如何实现永久监听呢?很简单,只要在监听的代码里再次监听就可以了,很简单:

  1. Watcher watcher = new Watcher(){
  2. public void process(WatchedEvent event) {
  3. if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
  4. System.out.println("数据改变了:"+event.getPath());
  5. try {
  6. byte[] dataBytes = zooKeeper.getData(path,this,null);
  7. System.out.println("获取到的数据是:"+ new String(dataBytes));
  8. } catch (KeeperException e) {
  9. e.printStackTrace();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. };
  16. byte[] dataBytes = zooKeeper.getData(path,watcher,null);
  17. System.out.println("获取到的数据是:"+ new String(dataBytes));

       多次执行set 都是能够监听到的。

1.3.5 永久监听方式二

       上面的方式不管是之前的版本还是最新的版本都是通用的,如果是3.6.x版本的是支持永久递归监听的,那么怎么玩呐?也很简单:

  1. zooKeeper.addWatch(path, event -> {
  2. System.out.println("获取到的数据是:"+ event);
  3. },AddWatchMode.PERSISTENT);

event这里使用了Lambda表达式,非Lambda的写法是:

  1. zooKeeper.addWatch(path, new Watcher() {
  2. @Override
  3. public void process(WatchedEvent event) {
  4. System.out.println("获取到的数据是:"+ event);
  5. }
  6. }, AddWatchMode.PERSISTENT);

       这里的AddWatchMode可以是PERSISTENT和PERSISTENT_RECURSIVE。

二、进阶的使用

2.1 修改节点

       修改节点使用的是set的操作,我们看下简单的例子:

  1. //path = "/myconfig";
  2. byte[] bytes = new String("hello-update").getBytes();
  3. Stat stat = zooKeeper.setData(path,bytes,-1);

说明:

(1)执行set的时候,如果node不存在,会抛出异常:KeeperErrorCode = NoNode for /myconfig。在执行zooKeeper.setData()就会抛出异常,就不会有返回值Stat了

(2)这里的第三个参数是version:

       ① 如果不考虑并发修改的问题的话,那么version=-1;

    ② 如果填写的version和我们节点的version对不上的话,那么执行setData会报错:KeeperErrorCode = BadVersion for /myconfig。

2.2 并发修改节点

       对于节点的修改,会出现多个线程进行并发修改的问题,那么我们控制并发节点的修改问题呐,很简单,在前面的例子中的第三个参数version就是用来解决节点的并发修改问题的。具体的一个修改思路:

(1)、通过getData获取到节点的版本信息;

(2)、在执行setData的时候,传递当前获取到的版本号;

       具体的代码如下:

  1. Stat nodeStat = new Stat();
  2. zooKeeper.getData(path, false,nodeStat);
  3. byte[] dataBytes = new String("hello-update").getBytes();
  4. zooKeeper.setData(path,dataBytes,nodeStat.getVersion());

2.3 删除节点

       删除节点很简单,我们直接来看下代码:

zooKeeper.delete(path, -1);

版本号的说明:-1 代表匹配所有版本号,直接删除。任意大于-1的代表可以指定数据版本删除。

2.4 异步获取数据

       我们之前getData的方式是同步的,那么如何异步获取呢?对于zookeeper也提供了相应的方式:

  1. @Override
  2. public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
  3. System.out.println(Thread.currentThread().getName());//main-EventThread
  4. System.out.println("ctx="+ctx);//ctx=1000
  5. System.out.println("data="+new String(data));//data=hello
  6. }
  7. }, "1000");

三、小结

(1)使用java client的核心思路:引jar包,使用zookeeper类连接上ZK Server,然后就可以调用create/get/set/delete等操作节点的方法。

(2)对于节点的监听watcher操作,只监听一次,可以使用循环监听的方式进行永久监听;在3.6.x的版本可以使用addWatch方法永久递归监听。

(3)不能递归创建和删除节点。

购买完整视频,请前往:http://www.mark-to-win.com/TeacherV2.html?id=287