4.ZooKeeper客户端Curator实现Watch事件监听「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
一、Curator的Watch
1.1 Curator的Watch方式
Curator API中引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
Curator提供了三种Watcher:
(1)NodeCache:监听指定的节点。
(2)PathChildrenCache:监听指定节点的子节点。
(3)TreeCache:监听指定节点和子节点及其子孙节点。
1.2 NodeCache:监听指定的节点
监听指定节点:
- //获取监听对象
- NodeCache nodeCache = new NodeCache(curatorFramework, "/test");
-
- //添加监听
- nodeCache.getListenable().addListener(new NodeCacheListener() {
- //监听回调函数,单个监听到节点发生增删改操作时会执行此方法
- @Override
- public void nodeChanged() throws Exception {
- String path = nodeCache.getPath();
- System.out.println(path+"节点");
-
- //获取当前节点更新后的数据
- byte[] data = nodeCache.getCurrentData().getData();
- System.out.println("更新后的数据为:" + new String(data));
- }
- });
-
- //开启监听,如果为true则开启监听器
- nodeCache.start(true);
-
- System.out.println("监听器已开启!");
- //让线程休眠30s(为了方便测试)
- Thread.sleep(1000 * 30);
说明:只能监听到当前的节点,比如:/curator/test,只能监听到/curator/test;并不能监听到子节点,比如/curator/test/test1,不能监听到。
测试一下,zkCli.sh进行操作一下/curator/test节点:
1.3 PathChildrenCache:监听指定节点的子节点
监听指定节点的子节点:
- /*
- 获取监听对象
- 参数1:客户端连接对象
- 参数2:节点,监听的是指定节点的子节点
- 参数3:是否开启缓存
- */
- PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,"/test", true);
- //添加监听
- pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
- //监听回调函数
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- //获取子节点的改变类型
- PathChildrenCacheEvent.Type type = event.getType();
- //判断监听子节点的改变类型是否为数据修改(UPDATE)
- if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
- System.out.println(event);
- //从监听事件对象中获取修改后的数据
- byte[] data = event.getData().getData();
- System.out.println(new String(data));
- }
- }
- });
-
- pathChildrenCache.start();
- System.out.println("监听器已开启!");
-
- //休眠线程,方便测试
- Thread.sleep(1000 * 120);
监听指定节点的子节点说明:
* 1.该监听器能够监听连接和节点创建信息
* 2.该监听器只能监听指定节点的子节点,而指定的节点以及子孙节点不可监听
* 比如:监听的节点是/curator/test,那么对于节点/curator/test的改变是监听不到的, 对于子节点,比如/curator/test/test1是可以监听到的,对于子孙节点/curator/test/test1/test11是监听不到的。
1.4 TreeCache:监听指定节点和子节点及其子孙节点
监听指定节点和子节点及其子孙节点:
- //指定监听对象
- TreeCache treeCache = new TreeCache(curatorFramework, "/test");
- //添加监听器
- treeCache.getListenable().addListener(new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- System.out.println("子节点被改变:"+event.getData().getPath());
- }
- });
-
- treeCache.start();
- System.out.println("监听器已开启!");
- //休眠线程,方便测试
- Thread.sleep(1000 * 120);
说明:不能监听指定节点的父节点等,当前节点,子节点,子孙节点,子子孙孙节点都能监控到。
1.5 CuratorCache
我们发现NodeCache、PathChildrenCache、CuratorCache已经过时了,被CuratorCache直接替代了,那么这个要怎么使用呐?
- CuratorCache curatorCache = CuratorCache.builder(curatorFramework, "/test").build();
-
- //构建监听器
- CuratorCacheListener listener = CuratorCacheListener.builder()
- .forNodeCache(new NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- System.out.println("节点改变了...");
- }
- })
- .build();
-
- //添加监听
- curatorCache.listenable().addListener(listener);
- //开启监听
- curatorCache.start();
- //让线程休眠30s(为了方便测试)
- Thread.sleep(1000 * 30);
path children cache:
和NodeCache一样,调用CuratorCacheListenerBuilder.forPathChildrenCache();
tree cache:
和NodeCache一样,调用CuratorCacheListenerBuilder. forTreeCache ();
购买完整视频,请前往:http://www.mark-to-win.com/TeacherV2.html?id=287