4.ZooKeeper客户端Curator实现Watch事件监听「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

       ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

一、Curator的Watch

1.1 Curator的Watch方式

Curator API中引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

       Curator提供了三种Watcher:

(1)NodeCache:监听指定的节点。

(2)PathChildrenCache:监听指定节点的子节点。

(3)TreeCache:监听指定节点和子节点及其子孙节点。

1.2 NodeCache:监听指定的节点

       监听指定节点:

  1. //获取监听对象
  2. NodeCache nodeCache = new NodeCache(curatorFramework, "/test");
  3. //添加监听
  4. nodeCache.getListenable().addListener(new NodeCacheListener() {
  5. //监听回调函数,单个监听到节点发生增删改操作时会执行此方法
  6. @Override
  7. public void nodeChanged() throws Exception {
  8. String path = nodeCache.getPath();
  9. System.out.println(path+"节点");
  10. //获取当前节点更新后的数据
  11. byte[] data = nodeCache.getCurrentData().getData();
  12. System.out.println("更新后的数据为:" + new String(data));
  13. }
  14. });
  15. //开启监听,如果为true则开启监听器
  16. nodeCache.start(true);
  17. System.out.println("监听器已开启!");
  18. //让线程休眠30s(为了方便测试)
  19. Thread.sleep(1000 * 30);

说明:只能监听到当前的节点,比如:/curator/test,只能监听到/curator/test;并不能监听到子节点,比如/curator/test/test1,不能监听到。

测试一下,zkCli.sh进行操作一下/curator/test节点:













1.3 PathChildrenCache
:监听指定节点的子节点

监听指定节点的子节点:

  1. /*
  2. 获取监听对象
  3. 参数1:客户端连接对象
  4. 参数2:节点,监听的是指定节点的子节点
  5. 参数3:是否开启缓存
  6. */
  7. PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,"/test", true);
  8. //添加监听
  9. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  10. //监听回调函数
  11. @Override
  12. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  13. //获取子节点的改变类型
  14. PathChildrenCacheEvent.Type type = event.getType();
  15. //判断监听子节点的改变类型是否为数据修改(UPDATE)
  16. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  17. System.out.println(event);
  18. //从监听事件对象中获取修改后的数据
  19. byte[] data = event.getData().getData();
  20. System.out.println(new String(data));
  21. }
  22. }
  23. });
  24. pathChildrenCache.start();
  25. System.out.println("监听器已开启!");
  26. //休眠线程,方便测试
  27. Thread.sleep(1000 * 120);

监听指定节点的子节点说明:






      * 1.该监听器能够监听连接和节点创建信息

    * 2.该监听器只能监听指定节点的子节点,而指定的节点以及子孙节点不可监听

     * 比如:监听的节点是/curator/test,那么对于节点/curator/test的改变是监听不到的, 对于子节点,比如/curator/test/test1是可以监听到的,对于子孙节点/curator/test/test1/test11是监听不到的。

1.4 TreeCache:监听指定节点和子节点及其子孙节点

监听指定节点和子节点及其子孙节点:

  1. //指定监听对象
  2. TreeCache treeCache = new TreeCache(curatorFramework, "/test");
  3. //添加监听器
  4. treeCache.getListenable().addListener(new TreeCacheListener() {
  5. @Override
  6. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  7. System.out.println("子节点被改变:"+event.getData().getPath());
  8. }
  9. });
  10. treeCache.start();
  11. System.out.println("监听器已开启!");
  12. //休眠线程,方便测试
  13. Thread.sleep(1000 * 120);

说明:不能监听指定节点的父节点等,当前节点,子节点,子孙节点,子子孙孙节点都能监控到。

1.5 CuratorCache

       我们发现NodeCache、PathChildrenCache、CuratorCache已经过时了,被CuratorCache直接替代了,那么这个要怎么使用呐?

  1. CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/test").build();
  2. //构建监听器
  3. CuratorCacheListener listener = CuratorCacheListener.builder()
  4. .forNodeCache(new NodeCacheListener() {
  5. @Override
  6. public void nodeChanged() throws Exception {
  7. System.out.println("节点改变了...");
  8. }
  9. })
  10. .build();
  11. //添加监听
  12. curatorCache.listenable().addListener(listener);
  13. //开启监听
  14. curatorCache.start();
  15. //让线程休眠30s(为了方便测试)
  16. Thread.sleep(1000 * 30);

       path children cache

和NodeCache一样,调用CuratorCacheListenerBuilder.forPathChildrenCache();

       tree cache

和NodeCache一样,调用CuratorCacheListenerBuilder. forTreeCache ();

购买完整视频,请前往:http://www.mark-to-win.com/TeacherV2.html?id=287