Java基础、中级、高级、架构面试资料

Java + Zookeeper 实现不可重入的分布式锁

JAVA herman 2598浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog2,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云

看过我博客的网友都知道,我前面写锁写了非常多的文章。包括悲观锁、乐观锁、自旋锁、适应性自旋锁、无锁、偏向锁、轻量级锁、重量级锁、公平锁、非公平锁、可重入锁、非可重入锁、独享锁、共享锁等。本文我们借助 Zookeeper 实现一个不可重入的分布式锁!

前面文章推荐:

关于锁的文章还有很多,我就先贴出上面的 5 个。

分布式锁对 Java 的 JUC 包来说就显得无能为力了。所以我们要借助 Zookeeper 的最小版本,Redis 的 set 函数,数据库锁来实现分布式锁。

我们知道在 Zookeeper 中是使用文件目录的格式存放节点内容,其中节点类型分为:

  • 持久节点(PERSISTENT ):节点创建后,一直存在,直到主动删除了该节点。
  • 临时节点(EPHEMERAL):生命周期和客户端会话绑定,一旦客户端会话失效,这个节点就会自动删除。
  • 序列节点(SEQUENTIAL ):多个线程创建同一个顺序节点时候,每个线程会得到一个带有编号的节点,节点编号是递增不重复的,如下图:

zk 实现分布式锁

如上图,三个线程分别创建路径为 /root/node 的节点,可知在 zk 服务器端会在 root 下存在三个 node 节点,并且线程编号是唯一递增。

具体在节点创建过程中,可以混合使用,比如临时顺序节点(EPHEMERAL_SEQUENTIAL),本文我们就使用临时顺序节点来实现分布式锁。

先说一下实现原理:

创建临时顺序节点,比如 /root/node,假设返回结果为 nodeId。

获取 /root 下所有孩子节点,用自己创建的 nodeId 的序号与所有子节点比较,看看自己是不是编号最小的。如果是最小的则就相当于获取到了锁,如果自己不是最小的,则从所有子节点里面获取比自己次小的一个节点,然后设置监听该节点的事件,然后挂起当前线程。

当最小编号的线程获取锁,处理完业务后删除自己对应的 nodeId,删除后会激活比自己大一号的节点的线程从阻塞变为运行态,被激活的线程应该就是当前 node 序列号最小的了,然后就会获取到锁。

明白原理后,代码写起来就非常的简单了。

public class ZookeeperDistributedLock {
    public final static Joiner j = Joiner.on("|").useForNull("");
    //zk客户端
    private ZooKeeper zk;
    //zk是一个目录结构,root为最外层目录
    private String root = "/locks";
    //锁的名称
    private String lockName;
    //当前线程创建的序列node
    private ThreadLocal<String> nodeId = new ThreadLocal<>();
    //用来同步等待zkclient链接到了服务端
    private CountDownLatch connectedSignal = new CountDownLatch(1);
    private final static int sessionTimeout = 3000;
    private final static byte[] data= new byte[0];
    public ZookeeperDistributedLock(String config, String lockName) {
        this.lockName = lockName;
        try {
            zk = new ZooKeeper(config, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 建立连接
                    if (event.getState() == KeeperState.SyncConnected) {
                        connectedSignal.countDown();
                    }
                }

            });
            connectedSignal.await();
            Stat stat = zk.exists(root, false);
            if (null == stat) {
                // 创建根节点
                zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    class LockWatcher implements Watcher {
        private CountDownLatch latch = null;

        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void process(WatchedEvent event) {

            if (event.getType() == Event.EventType.NodeDeleted)
                latch.countDown();
        }
    }
    public void lock() {
        try {
            // 创建临时子节点
            String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));
            // 取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            TreeSet<String> sortedNodes = new TreeSet<>();
            for(String node :subNodes) {
                sortedNodes.add(root +"/" +node);
            }
            String smallNode = sortedNodes.first();
            String preNode = sortedNodes.lower(myNode);
            if (myNode.equals( smallNode)) {
                // 如果是最小的节点,则表示取得锁
                System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
                this.nodeId.set(myNode);
                return;
            }
            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。
            // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if (stat != null) {
                System.out.println(j.join(Thread.currentThread().getName(), myNode,
                        " waiting for " + root + "/" + preNode + " released lock"));
                latch.await();// 等待,这里应该一直等待其他线程释放锁
                nodeId.set(myNode);
                latch = null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }
    public void unlock() {
        try {
            System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
            if (null != nodeId) {
                zk.delete(nodeId.get(), -1);
            }
            nodeId.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

ZookeeperDistributedLock 的构造函数创建 zkclient,并且注册了监听事件,然后调用 connectedSignal.await() 挂起当前线程。当 zkclient 链接到服务器后,会给监听器发送 SyncConnected 事件,监听器判断当前链接已经建立了,则调用 connectedSignal.countDown(); 激活当前线程,然后创建 root 节点。

获取锁的方法 lock,内部首先创建 /root/lockName 的顺序临时节点,然后获取 /root 下所有的孩子节点,并对子节点进行排序,然后判断自己是不是最小的编号,如果是直接返回 true 标示获取锁成功。否者看比自己小一个号的节点是否存在,存在则注册该节点的事件,然后挂起当前线程,等待比自己小一个数的节点释放锁后发送节点删除事件,事件里面激活当前线程。

释放锁的方法 unlock 比较简单,就是简单的删除获取锁时候创建的节点。

Zookeeper 非常的强大,当你真正的了解后,你会发现它能做很多事情。

光分布锁方面,它都能帮助我们实现好几种,如下面我列举的这些:

  • 可重入锁Shared Reentrant Lock
  • 不可重入锁Shared Lock
  • 可重入读写锁Shared Reentrant Read Write Lock
  • 信号量Shared Semaphore
  • 多锁对象 Multi Shared Lock

其实本文所解释的这个分布式锁,还可以在进一步的优化优化。具体怎么优化,大家可以思考一下!

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » Java + Zookeeper 实现不可重入的分布式锁