Watcher 机制
Chivas-Regal
ZooKeeper 客户端可以在一个节点上注册一些监听事件,当这些监听事件成立触发时,会将此消息通知给所有注册了 watcher 的客户端,此机制保证了 Zookeeper 可以实现分布式协调服务。
Watcher 有三种:
NodeCache
:只监控某一个特定的节点PathChildrenCache
:监控一个分支节点的子节点TreeCache
:监控一个子树
下面说一下使用方式,分为几个步骤:
- 创建 CuratorCache 缓存器
- 创建 CuratorListener 监听器
- 将监听器注册到缓存器中
- 启动缓存器
这里给出代码实现
// ..client的创建和关闭略过
/* 1. 创建 CuratorCache 缓存器 */
CuratorCache cache = CuratorCache.build(client, "/snopzyz/data");
/* 2. 创建 CuratorListener 监听器 */
CuratorCacheListener listener = CuratorCacheListener.builder()
.forNodeCache(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("我变化啦!");
}
})
.build();
/* 3. 将监听器注册到缓存器中 */
cache.listenable().addListener(listener);
/* 4. 启动缓存器 */
cache.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这样当在对 /snopzyz/data 节点进行写操作时,就会触发回调执行 nodeChanged
当然要使用 PathChildrenCache 模式的话要将创建监听器替换为
CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache("/snopzyz", client, new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("俺孩儿变了!");
}
})
.build();
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
同理 TreeCache 模式:
CuratorCacheListener listener = CuratorCacheListener.builder()
.forTreeCache(client, new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("俺后代有的变了!");
}
})
.build();
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
CuratorCache.build
还可以指定第三个参数来指定缓存的模式
- 当没有时:默认整棵树都会被缓存
CuratorCache.Options.SINGLE_NODE_CACHE
:只缓存一个节点CuratorCache.Options.COMPRESSED_DATA
:节点的值进行压缩CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE
:在连接关闭时不删除缓存
CuratorCacheListener 还提供了如下方法直接供我们指定触发时机
// ..client的创建和关闭略过
/* 1. 创建 CuratorCache 缓存器 */
CuratorCache cache = CuratorCache.build(client, "/snopzyz/data");
/* 2. 创建 CuratorListener 监听器 */
CuratorCacheListener listener = CuratorCacheListener.builder()
// 在创建时
.forCreates(childData -> System.out.print("[forCreates]"))
// 在删除时
.forDeletes(childData -> System.out.print("[forDeletes]"))
// 在修改时
.forChanges((oldNode, node) -> System.out.print("[forChanges]"))
// 在创建与修改时
.forCreatesAndChanges((oldNode, node) -> System.out.print("[forCreatesAndChanges]"))
// 在创建、删除与修改时
.forAll((type, oldData, data) -> System.out.println("[forAll]"))
.build();
/* 3. 将监听器注册到缓存器中 */
cache.listenable().addListener(listener);
/* 4. 启动缓存器 */
cache.start();
/* 测试使用 */
client.create().creatingParentsIfNeeded().forPath("/snopzyz/data");
client.setData().forPath("/snopzyz/data", "hello,cache".getBytes());
client.delete().forPath("/snopzyz/data");
/**
* 执行结果:
*
* [forCreates][forCreatesAndChanges][forAll]
* [forChanges][forCreatesAndChanges][forAll]
* [forDeletes][forAll]
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 实现机制
注册 watcher 时客户端在自己的 ZKWatchManager 中注册后会将注册事件通过 outgoingQueue 队列发送给服务端, 并开启一个线程等待服务端的回调。
然后服务端也将其存储到自己的 WatchManager 中。
当做了数据变更时,服务端扫描自己的 WatchManager 中是否存在 watcher,若存在则发送给客户端通知消息,客户端反序列化头信息处理为 event 对象,会有一个线程循环取出该 event 对象的 watcher 执行 process。
简单来说就是双方都注册 watcher,数据变更时服务端通知客户端执行 process