1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
| public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String znode; // 当前线程创建的路径
public ZkLock(String connectString, int sessionTimeout) throws IOException {
this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
}
/**
* try-catch-with-resource
*/
public void clode() throws Exception {
zooKeeper.delete(znode, -1);
zooKeeper.close();
log.info("释放锁");
}
/**
* 观察器
*/
@Override
public void process(WatchedEvent watchedEvent) {
// 当前一个节点被删除时,可唤醒获取锁的等待,表示当前线程获取了锁
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notify();
}
}
}
/**
* 获取锁
*/
public boolean getLock(String businessCode) {
try {
String path = "/" + businessCode;
Stat stat = zooKeeper.exists(path, false);
if (null == stat) { // 当前业务若没有持久节点
// 先创建一个持久节点,在持久节点里创建瞬时节点
zooKeeper.create(
path,
businessCode.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, // 公开权限,不使用用户名密码就能访问这个节点
CreateMode.PERSISTENT
);
}
// 创建瞬时有序节点
String subPath = "/" + businessCode + "/" + businessCode + "_";
znode = zooKeeper.create(
subPath,
businessCode.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
// 获取该业务下的所有瞬时节点,进行比较排序
List<String> childrenNodes = zooKeeper.getChildren(path, false);
Collections.sort(childrenNodes);
// 如果列表中的第一个是当前创建的节点,直接获取锁
String firstNode = childrenNodes.get(0);
if (znode.endsWith(firstNode)) {
return true;
}
// 监听前一个节点,等待获取锁
String lastNode = firstNode;
for (String node : childrenNode) {
// 找到当前节点,监听前一个节点
if (znode.endWith(node)) {
zooKeeper.exists("/" + businessCode + "/" + lastNode, this);
break;
} else {
lastNode = node;
}
}
synchronized (this) {
// 让当前线程阻塞,并且会释放锁
// 上面的同步代码块才会获得锁并执行 notify()
wait();
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
}
|