1、Redis 锁的使用

作为非常流行的 NoSQL 数据库,Redis 已成为后端开发非常熟悉的中间件,除了常被用来做缓存外,还经常被用做分布式锁。

它的语法如下:

setNx key value

Set key to hold string value if key does not exist. In that case,
it is equal to SET. When key already holds a value, no operation is performed.
SETNX is short for "SET if Not eXists".

setNx 命令是 set if not exists 的缩写,在指定的 key 不存在时,为 key 设置指定的值。

设置成功则命令返回 1,设置失败则返回 0。

2、Redis 分布式锁常见问题

是否用了 Redis 作为分布式锁就万事大吉了呢?当然不是,如果用不好可能还有一堆坑等着跳。

例如使用上述的命令加锁,如果加锁成功,在业务执行过程中宕机了,锁就无法被释放,其它线程永远得不到执行逻辑的机会。

key 加上过期时间就可以解决这个问题,但是在加锁和设置过期时间的间隔发生宕机,同样会出现锁无法释放的场景。

在 2.6.12 版本,Redis 可以通过如下命令实现原子性的加锁并且设置过期时间。

SET key value NX PX 10000

PX milliseconds -- Set the specified expire time, in milliseconds.
NX -- Only set the key if it does not already exist.

即便使用原子命令加锁并设置过期时间,还有其他场景问题需要解决。

问题场景:

设置锁过期时间为 N 秒,A 线程拿到锁,业务执行时间超过 N 秒时,锁过期被释放,B 线程拿到锁,A 线程执行完毕释放掉锁,C 线程又拿到锁

这里有两个问题,一是加锁与释放锁混乱,二是未执行完毕逻辑锁被过期释放

  • 解决加锁与释放混乱,可以采用将 value 设置为随机数的方式,A 线程设置的锁,只能由 A 线程释放,释放锁时先判断 value 是否一致

  • 如何解决未执行完毕业务就过期的问题,那就把过期时间设置足够大,过期时间为 N*1000 秒,但是如果执行过程中宕机或发布重启应用,锁长时间不被释放,也会影响正常业务进行

  • 使用第三方库 redisson 可以帮助我们解决这个问题

2、redisson 介绍

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。Redisson 提供了使用 Redis 的最简单和最便捷的方法。
Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

我们引入 redisson, 代替 redisTemplete 或者 jedis 的分布式锁。

redisson 默认锁过期时间 30 秒,通过一个线程(watchDog) 来监控锁,当达到一定时间时,对锁自动续期。

3、redisson 原理

RedissonLock 是 redisson 提供的可重入锁(Reentrant Lock),实现了 java.util.concurrent.locks.Lock,是 Lock 的分布式版本。

官方给的使用 API 如下

RLock lock = redisson.getLock("lockName");
// 最常见的使用方法
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

无论是 .lock 还是 tryLock ,底层都会执行一段 Lua 脚本

return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

redisson 使用是 hash 类型来存储锁信息

  • KEY[1] 指的是 lockName,也是 redis 的 key
  • ARGV[2] 指的是客户端的 ID , 这样的一种格式:8743c9c0-0795-4907-87fd-6c719a6b4586:1
  • ARGV[1] 指的是过期时间

如果没有显式地为锁设置过期时间,将默认为 30 秒,并开启一个异步线程 watchDog 来监控,每隔 30 / 3 = 10 秒则重新将过期时间设置为 30 秒。

    if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,                                         commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
            // lock acquired
            if (ttlRemaining == null) {
                // 开启一个线程来续期
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;

4、实践

通过本地的单机 Redis 来试一下 redisson 的续期。

    public static void main(String[] args) throws InterruptedException {
        RedissonClient redisson = redissonClient();
        RLock lock = redisson.getLock("my-test-lock");
        boolean lockSuccess = lock.tryLock(1, -1, TimeUnit.SECONDS);
        if (!lockSuccess) {
            log.info("主线程加锁失败...");
            return;
        }
        log.info(LocalDateTime.now() + "=> add my-test-lock");

        //开启一个新的线程来尝试获取锁
        newAThread(redisson);

        //模拟业务逻辑,执行了 50 秒
        TimeUnit.SECONDS.sleep(50);

        lock.unlock();
        log.info(LocalDateTime.now() + "=> delete my-test-lock");
    }

    public static void newAThread(RedissonClient redisson) {
        new Thread(() -> {
            while (true) {
                try {
                    RLock anotherLock = redisson.getLock("my-test-lock");
                    boolean lockSuccess = anotherLock.tryLock(1, -1, TimeUnit.SECONDS);
                    if (!lockSuccess) {
                        log.info(LocalDateTime.now() + "-> try lock failed");
                    } else {
                        log.info(LocalDateTime.now() + "-> try lock success");
                        anotherLock.unlock();
                        log.info(LocalDateTime.now() + "-> delete lock success");
                        break;
                    }
                    TimeUnit.SECONDS.sleep(5);
                } catch (Exception e) {
                    log.error("ex", e);
                }
            }
        }).start();
    }

    private static RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        // 设置锁默认过期时间为 20 秒
        config.setLockWatchdogTimeout(20 * 1000);
        return Redisson.create(config);
    }

运行结果如下:

执行结果.png

通过 Redis 命令 hgetall keyttl key 可以分别查看 key 的内容和过期时间,可以看到过期sh。

命令查看.png

5、总结

redisson 提供的锁续期机制可以非常方便地解决我们上面使用 Redis 锁的问题场景。
当然这也只是解决部分场景而已,对使用过程中的细节和极端情况,还需要深入思考,一点一点寻找合适的解决方案。