Featured image of post 使用 ZooKeeper 实现分布式锁

使用 ZooKeeper 实现分布式锁

使用 ZooKeeper 实现

原理

  • 持久节点(红色)

  • 瞬时节点(黄色):不可再有子节点,会话结束后瞬时节点会自动消失

node

  • 利用 Zookeeper 的瞬时有序节点的特性,多线程并发创建瞬时节点时,得到有序的序列

  • 序号最小的线程获得锁,其他的线程则监听自己序号的前一个序号;当前一个线程执行完成,删除自己序号的节点;下一个序号的线程得到通知,将继续执行

  • 创建节点时,已经确定了线程的执行顺序

thread

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class ZkLock implements AutoCloseable, Watcher {
    private ZooKeeper zooKeeper;
    private String znode;  // 当前线程创建的路径

    public ZkLock(String connectString, int sessionTimeout) throws IOException {
        this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
    }

    /**
     * try-catch-with-resource
     */
    public void clode() throws Exception {
        zooKeeper.delete(znode, -1);
        zooKeeper.close();
        log.info("释放锁");
    }

    /**
     * 观察器
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        // 当前一个节点被删除时,可唤醒获取锁的等待,表示当前线程获取了锁
        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
            synchronized (this) {
                notify();
            }
        }
    }

    /**
     * 获取锁
     */
    public boolean getLock(String businessCode) {
        try {
            String path = "/" + businessCode;
            Stat stat = zooKeeper.exists(path, false);
            if (null == stat) {  // 当前业务若没有持久节点
                // 先创建一个持久节点,在持久节点里创建瞬时节点
                zooKeeper.create(
                    path,
                    businessCode.getBytes(StandardCharsets.UTF_8),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,  // 公开权限,不使用用户名密码就能访问这个节点
                    CreateMode.PERSISTENT
                );
            }

            // 创建瞬时有序节点
            String subPath = "/" + businessCode + "/" + businessCode + "_";
            znode = zooKeeper.create(
                subPath,
                businessCode.getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL
            );

            // 获取该业务下的所有瞬时节点,进行比较排序
            List<String> childrenNodes = zooKeeper.getChildren(path, false);
            Collections.sort(childrenNodes);

            // 如果列表中的第一个是当前创建的节点,直接获取锁
            String firstNode = childrenNodes.get(0);
            if (znode.endsWith(firstNode)) {
                return true;
            }

            // 监听前一个节点,等待获取锁
            String lastNode = firstNode;
            for (String node : childrenNode) {
                // 找到当前节点,监听前一个节点
                if (znode.endWith(node)) {
                    zooKeeper.exists("/" + businessCode + "/" + lastNode, this);
                    break;
                } else {
                    lastNode = node;
                }
            }

            synchronized (this) {
                // 让当前线程阻塞,并且会释放锁
                // 上面的同步代码块才会获得锁并执行 notify()
                wait();
            }

            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return false;
    }
}

测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@RestController
public class ZkLockController {

    @GetMapping("/zkLock")
    public String zkLock() {
        log.info("进入方法");
        try (ZkLock zklock = new ZkLock("127.0.0.1:2181", 10000)) {
            if (zkLock.getLock("order")) {
                log.info("获取了锁");
                try {
                    TimeUtil.SECONDS.sleep(10);  // 模拟业务
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.info("完成业务");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }
}

Curator

curator

依赖配置

1
2
3
4
5
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class DemoTest {
    @Test
    public void test() {
        String zookeeperConnectionString = "127.0.0.1:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();

        String lockPath = "/order";
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try {
            // 获取互斥锁,阻塞直到可用,或给定时间到期
            // 同一个线程调用 acquire() 可重入
            if (lock.acquire(30, TimeUnit.SECONDS)) {
                try {
                    log.info("获取锁");
                } finally {
                    // 必须通过 release() 释放锁
                    lock.release();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}