Java基础、中级、高级、架构面试资料

java 使用 Redis 实现分布式锁

JAVA herman 3047浏览 0评论
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog2,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云

今天本来我想写其他的,但是有网友在微信公众号后台留言,说我之前说的分布式锁还没写完呢?于是便有了本文,基于 Redis 实现分布式锁的教程。

分布式锁

既然有人想看,那我就写!为你服务!

前面我写到了使用数据库的悲观锁 for update 实现分布式锁,其实使用乐观锁也可以实现。但是乐观锁比悲观锁更麻烦!基于 Redis 实现的悲观锁,在实际应用中可能用的更多。

java 使用 Redis 实现分布式锁

分布式锁其实最终解决的是不同 JVM 进程竞争同一资源问题。使用 Redis 实现分布式锁,主要用到的是 Redis 的 set 函数。

我们先来看代码:

package com.xttblog.lock;
import java.util.Collections;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class XttblogLock {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;
    private static void validParam(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
        if (null == jedisPool) {
            throw new IllegalArgumentException("jedisPool obj is null");
        }
        if (null == lockKey || "".equals(lockKey)) {
            throw new IllegalArgumentException("lock key  is blank");
        }
        if (null == requestId || "".equals(requestId)) {
            throw new IllegalArgumentException("requestId is blank");
        }
        if (expireTime < 0) {
            throw new IllegalArgumentException("expireTime is not allowed less zero");
        }
    }
    /**
     * @param jedis
     * @param lockKey
     * @param requestId
     * @param expireTime
     * @return
     */
    public static boolean tryLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
        validParam(jedisPool, lockKey, requestId, expireTime);
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return false;
    }
    /**
     * 
     * @param jedis
     * @param lockKey
     * @param requestId
     * @param expireTime
     */
    public static void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
        validParam(jedisPool, lockKey, requestId, expireTime);
        while (true) {
            if (tryLock(jedisPool, lockKey, requestId, expireTime)) {
                return;
            }
        }
    }
    /**
     * 
     * @param jedis
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean unLock(JedisPool jedisPool, String lockKey, String requestId) {
        validParam(jedisPool, lockKey, requestId, 1);
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Object result = jedis.eval(script, Collections.singletonList(lockKey),Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return false;
    }
}

看不懂上面的代码?看我来给你解读一下!

首先 Jedis 的 public String set(final String key, final String value, final String nxxx, final String expx,final int time)方法。

前两个值是 key 和 value 我就不多说。nxxx为模式,这里我们设置为NX,意思是说如果key不存在则插入该key对应的value并返回OK,否者什么都不做返回null。expx这里我们设置为PX,意思是设置key的过期时间为time 毫秒。

先通过tryLock方法尝试获取锁,内部是具体调用Redis的set方法,多个线程同时调用tryLock时候会同时调用set方法,但是set方法本身是保证原子性的,对应同一个key来说,多个线程调用set方法时候只有一个线程返回OK,其它线程因为key已经存在会返回null,所以返回OK的线程就相当与获取到了锁,其它返回null的线程则相当于获取锁失败。

注意,我们需要保证value(requestId)值是唯一的。因为在后面释放锁时会用到。

再通过lock 方法让使用tryLock获取锁失败的线程本地自旋转重试获取锁,这类似JUC里面的CAS。

现在来看解锁。Redis有一个叫做eval的函数,支持Lua脚本执行,并且能够保证脚本执行的原子性,也就是在执行脚本期间,其它执行redis命令的线程都会被阻塞。

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

对上面的 Lua 脚本解释如下:

其中keys[1]为unLock方法传递的key,argv[1]为unLock方法传递的requestId;脚本redis.call(‘get’, KEYS[1])的作用是获取key对应的value值,这里会返回通过Lock方法传递的requetId, 然后看当前传递的RequestId是否等于key对应的值,等于则说明当前要释放锁的线程就是获取锁的线程,则继续执行redis.call(‘del’, KEYS[1])脚本,删除key对应的值。

基于 Redis 的分布式锁,其实是利用了redis单实例的特性,并结合redis的set方法和eval函数实现了一个简单的分布式锁,但是这个实现还是明显有问题的。虽然使用set方法设置了超时时间,以避免线程获取到锁后redis挂了后锁没有被释放的情况,但是超时时间设置为多少合适那?如果设置太小,可能会存在线程获取锁后执行业务逻辑时间大于锁超时时间,那么就会存在逻辑还没执行完,锁已经因为超时自动释放了,而其他线程可能获取到锁,那么之前获取锁的线程的业务逻辑的执行就没有保证原子性。

另外还有一个问题是Lock方法里面是自旋调用tryLock进行重试,这就会导致像JUC中的AtomicLong一样,在高并发下多个线程竞争同一个资源时候造成大量线程占用cpu进行重试操作。这时候其实可以随机生成一个等待时间,等时间到后在进行重试,以减少潜在的同时对一个资源进行竞争的并发量。

没有完美的方案,只有更好的方案!你赞不赞同?请留言!

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » java 使用 Redis 实现分布式锁