Redisson — 分布式锁的实现

​ 基于 Redisson 3.16.8版本,对互斥型分布式锁的源码做了详细分析,主要包括:Lua 脚本的加解锁逻辑看门狗机制的背景与实现,以及如何通过 Redis 的 pub/sub 功能配合 JDK Semaphore 来实现线程阻塞与唤醒,从而避免忙等待

另外,也简单分析了一下共享型分布式锁的实现思路,比如 RCountDownLatch 和 RSemaphore

RLock

RLock 可以看作是 JDK java.util.concurrent.locks.Lock 接口在分布式场景下的实现,功能上和本地锁一样完整。除此之外,它还实现了 RLockAsync 接口,提供了一系列异步加锁的方法,使用起来更加灵活。它的核心实现类是 RedissonLock,对应的是 JDK 里的非公平 ReentrantLock,这也是我们在实际开发中最常用的分布式锁方案。我们重点看一下RedissonLock的核心实现细节与优化

RedissonLock

加锁

​ 在调用 locktryLock 的时候,Redisson会先跑一段 Lua 脚本来尝试加锁。如果脚本返回 null,那就代表锁拿到了,方法也会立马返回。 所以说,整个加锁过程的关键,其实就藏在下面这段 Lua 脚本里

加锁脚本

​ 这段加锁脚本的逻辑其实很简单,可以分成三种情况

  1. 锁不存在 → 第一次加锁

    exists 判断锁是否存在,如果不存在,说明是第一次加锁

    ​ 此时以锁名作为作为 Redis 的 hash key,并存放一对键值对。键表示具体某个节点的某个线程,即节点id:线程id格式;值初始化为数字1,可表示锁的重入次数。最后再设置整体hash结构的过期时间

  2. 锁已存在,且是当前线程持有 → 锁重入

    直接让重入次数 +1,并重置锁的过期时间

  3. 锁已存在,但被其他线程占用

    ​ 直接返回过期时间(这个时间可以用来判断锁是否获取成功;也能作为下一次重试的延时参考时间,但实际意义并不大)

if (redis.call('exists', KEYS[1]) == 0) then -- 初次加锁
   redis.call('hset', KEYS[1], ARGV[2], 1);
   redis.call('pexpire', KEYS[1], ARGV[1]);
   return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 锁重入
   redis.call('hincrby', KEYS[1], ARGV[2], 1);
   redis.call('pexpire', KEYS[1], ARGV[1]);
   return nil;
end;
return redis.call('pttl', KEYS[1]); -- 锁被其他线程占用,返回剩余过期时间
--[[ 
    keys:
        keys[1]:锁名
    argv:
        argv[1]:过期时间(毫秒,比如30000)
        argv[2]:lockName(节点id:线程id,节点id通常是UUID)
]]

锁续约

背景

​ 给锁设置过期时间,主要是为了避免一种极端情况:线程拿到了锁,但还没来得及释放,结果节点突然挂了,跟 Redis 断开了连接。要是这个锁一直不过期,那后面所有线程都会被堵死,谁都拿不到。

​ 但另一个问题是,如果业务执行时间比较长,锁的过期时间先到了,又没给它续约,就可能导致多个线程同时拿到锁,直接破坏了互斥效果。

​ 为了解决这两个矛盾,Redisson实现了看门狗机制:只要线程还在持有锁,就会定时自动续约,保证业务逻辑还没跑完,锁就不会提前过期

实现

​ 加锁成功后,如果没有指定 leaseTime(即锁的固定过期时间),才会启动续约逻辑。核心机制如下:

  • 获取到锁的线程会缓存一个 ExpirationEntry,里面记录了续约任务、正在续约的线程以及对应的重入次数互斥锁情况下只有 1 个线程
  • 系统会按照 internalLockLeaseTime / 3 的间隔定时执行
    • 运行 Lua 续约脚本:如果续约成功,就继续下一轮;如果失败(说明锁 key 已不存在),则直接取消当前锁的续约任务
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    // 1. ============ 尝试执行 Lua 加锁脚本 ============
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 1. ============ 处理加锁结果 ============
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) { // key的ttl返回为空,代表加锁成功
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 没有指定 leaseTime,才启用“看门狗”机制
                // 设置定时任务自动续期,确保锁在任务执行期间不会过期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) { // 锁重入,增加计数
        oldEntry.addThreadId(threadId);
    } else { // 第一次加锁
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

/**
 * 开始续约
 */
private void renewExpiration() {
    // 从全局的续约任务里拿当前锁对应的任务
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 定时任务:每隔 internalLockLeaseTime / 3 时间就跑一次(默认10秒一次)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 执行续约的lua脚本
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) { // true 表示续约成功 -> 继续下一轮定时续约
                    renewExpiration();
                } else {
                    // lua脚本返回 false,说明 Redis 里的锁 key 已经不存在了
                    // 传 null 表示直接取消当前锁的所有续约任务(锁都没了,就没必要再续约了)
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    // 把这次定时任务保存下来,方便后续取消
    ee.setTimeout(task);
}

订阅等待

​ 当线程加锁失败、需要阻塞等待或超时阻塞时,Redisson 会结合 Redis 的 pub/sub 功能和 JDK 的 Semaphore 来实现:

  • 通过 pub/sub 订阅锁的释放通知,让等待线程在锁释放时被唤醒
  • 利用 Semaphore 来管理线程阻塞和超时等待,保证阻塞线程能被安全唤醒

这种设计让分布式锁的阻塞等待既可靠又健壮,避免了传统轮询或 busy-wait 的低效问题。

subscribe操作

org.redisson.pubsub.PublishSubscribe#subscribe 是一个通用的订阅方法(无论是 SemaphoreCountDownLatch 还是 Lock,它们内部都会涉及到订阅)。它的核心逻辑可以拆成四步

  • 获取 channel 对应的 AsyncSemaphore

    ​ 每种锁都有自己的 channelName,一个节点在持有锁期间只需要订阅一次这个 channel

    AsyncSemaphore 内部维护了一个 listener 队列,保证同一个 channel 的订阅任务能按顺序执行,不会乱套。

  • 构造channel的PubSubEntry

    ​ 具体实现由子类提供。例如 Lock 对应的 RedissonLockEntry,其构造方法里初始化了一个 permits 为 0 的 Semaphore,这一步非常关键

  • 构造 channel 的 RedisPubSubListener

    ​ 当 Redis 往这个 channel 里发布消息时,就会触发对应的 RedisPubSubListener,其内部会调用到 org.redisson.pubsub.PublishSubscribe#onMessage,具体处理逻辑由子类实现

  • 完成对应的Promise,方便调用方继续后续逻辑

public CompletableFuture<E> subscribe(String entryName, String channelName) {
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
    CompletableFuture<E> newPromise = new CompletableFuture<>();

    // 设置一个超时检测的定时任务
    int timeout = service.getConnectionManager().getConfig().getTimeout();
    Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {
        newPromise.completeExceptionally(new RedisTimeoutException(
                "Unable to acquire subscription lock after " + timeout + "ms. " +
                        "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
    }, timeout, TimeUnit.MILLISECONDS);

    semaphore.acquire(() -> {
        if (!lockTimeout.cancel()) {
            semaphore.release();
            return;
        }

        E entry = entries.get(entryName);
        if (entry != null) { // 已经订阅过,说明当前节点有其他线程在使用这个订阅(可能锁重入)
            entry.acquire();
            semaphore.release();
            entry.getPromise().whenComplete((r, e) -> {
                if (e != null) {
                    newPromise.completeExceptionally(e);
                    return;
                }
                newPromise.complete(r);
            });
            return;
        }
        // ============= 到这就代表当前客户端是第一次订阅 =====================
        E value = createEntry(newPromise);
        value.acquire(); // 引用计数 + 1

        E oldValue = entries.putIfAbsent(entryName, value);
        if (oldValue != null) {
            // 当前节点内的其他线程抢先 put 成功了,就用老的 entry
            oldValue.acquire();
            semaphore.release();
            oldValue.getPromise().whenComplete((r, e) -> {
                if (e != null) {
                    newPromise.completeExceptionally(e);
                    return;
                }
                newPromise.complete(r);
            });
            return;
        }
        // ===================== 到这一步,说明真正需要发起一次新的 Redis 订阅 =========================

        // 创建监听器:当 Redis 端有消息 publish 到 channel,会触发 listener 回调
        RedisPubSubListener<Object> listener = createListener(channelName, value);
        // 发起订阅请求
        CompletableFuture<PubSubConnectionEntry> s = service.subscribe(LongCodec.INSTANCE, channelName, semaphore,
                listener);

        // 订阅请求完成后,更新 promise 的状态
        s.whenComplete((r, e) -> {
            if (e != null) {
                value.getPromise().completeExceptionally(e);
                return;
            }
            value.getPromise().complete(value);
        });

    });

    return newPromise;
}
Semaphore阻塞

org.redisson.RedissonLock#lock(long, TimeUnit, boolean) 方法中,如果获取锁失败,会进入阻塞等待逻辑。其核心流程如下

  • 订阅锁对应的 channel,即redisson_lock__channel:{锁名} 频道,并同步等待结果
  • 定时阻塞并尝试获取锁
    • 利用了 RedissonLockEntry 内的 Semaphore初始许可为 0),对线程进行阻塞
    • 其他线程释放锁时,会增加 Semaphore 的许可,从而唤醒阻塞线程,再次尝试获取锁
  • 获取锁成功后在finally里尝试取消订阅
    • 如果当前节点上没有其他线程仍在等待,这个订阅就可以取消(内部通过 RedissonLockEntry 的计数器判断是否还需要保留订阅)
 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {

        // 异步订阅频道:redisson_lock__channel:{锁名}
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        // ============= 同步等待订阅完成  =============
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {  // 走到这,就代表订阅成功。死循环获取锁,直到获取成功或抛异常才跳出循环
                // 再次尝试获取锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) { // 获取锁成功,直接返回
                    break;
                }

                if (ttl >= 0) { // redis的key存在ttl,定时尝试 + publish的通知 两种方法
                    try {
                        // 利用jdk的Semaphore,定时尝试获取锁,时间为ttl。
                        // 因为获取到锁的客户端可能宕机导致不能publish解锁的消息,所以这里也需要定时ttl的时间来尝试加锁来解决这种情况
                        /*
                            这个Semaphore初始值就是0,是会阻塞的。
                            只有等待获取到锁的线程解锁时,向这个订阅的频道发送一条解锁消息,Semaphore的有效值才加一
                            这时,当前客户端阻塞在这等待获取锁的线程会被立即唤醒一个,继续尝试获取锁 */
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else { // redis的key不存在过期时间,只有等待 publish的通知 这一种方法
                    if (interruptibly) {
                        commandExecutor.getNow(future).getLatch().acquire();
                    } else {
                        commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally { // 根据当前客户端是否还有其它线程在阻塞获取锁来决定是否需要取消订阅(引用计数思想)
            unsubscribe(commandExecutor.getNow(future), threadId);
        }
    }

解锁

解锁的逻辑相对比较简单,核心如下:

  • 利用之前订阅的 channel 发送解锁通知,所有订阅了这个 channel 的节点都能收到消息。
  • 对于 Lock 来说,LockPubSubRedisPubSubListener 的子类)会收到解锁消息,并将对应 Semaphore 的许可数 (permits) +1,从而唤醒节点内阻塞等待的线程,让它继续尝试获取锁
  • 最后,还需要取消锁的续约任务,避免锁已释放但续约任务仍在运行
解锁脚本

这段解锁脚本的逻辑也简单,逻辑如下

  1. 锁不存在,直接返回null
  2. 锁存在并减少重入次数
    1. 重入次数还是大于0,则返回0表示解锁失败
    2. 重入次数 <= 0,即解锁成功删除锁并再向加锁时订阅的频道发布一个解锁信号(值为0),最终返回1表示解锁成功
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then -- 锁不存在
    return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then -- 锁重入
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0; 
else -- 删除锁并publish信号:0
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end; 
return nil;
--[[
    keys:
        keys[1]:锁名
        keys[2]:channelName,频道名。格式:redisson_lock__channel:{锁名}

    argv:
        argv[1]:0 (解锁信号)
        argv[2]:30000
        argv[3]:真实的lockName(客户端id:线程id)
]]
代码
public RFuture<Void> unlockAsync(long threadId) {
    // lua进行解锁
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        // 无论是否解锁成功,都取消锁续约的定时任务
        // 可能出现redis服务器重启导致锁消失,虽然当前线程获取了锁,但锁已不存在,解锁就会失败
        cancelExpirationRenewal(threadId);

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

RCountDownLatch

​ 这是 Redisson 在分布式环境下对 CountDownLatch 功能的实现,实现类是 RedissonCountDownLatch,整体比锁的实现要简单很多。

共享锁的实现无需区分具体由哪个线程获取,因此只需 用 Redis 的 string 存储计数值,同时结合 JDK 的 AQS 和 Redis 的 pub/sub 机制 来完成线程的阻塞与唤醒。在使用前,需要调用 org.redisson.RedissonCountDownLatch#trySetCount 方法来设置初始计数值。而内部基于 AQS 的 ReclosableLatch 还支持 RCountDownLatch 的重复使用

设置初始值

​ 初始时使用org.redisson.RedissonCountDownLatch#trySetCount方法设置计数值,其lua脚本如下。如果已有订阅channel的节点,则其对应的CountDownLatchPubSub收到message后会阻塞ReclosableLatch,已实现重置的功能

lua脚本

-- 如果 RCountDownLatch 对应的 key 在 Redis 中不存在
if redis.call('exists', KEYS[1]) == 0 then  
    -- 设置 key 的初始计数值(ARGV[2] 即 count)
    redis.call('set', KEYS[1], ARGV[2]);

    -- 发送一个 pub/sub 消息,将RCountDownLatch关联的ReclosableLatch设为阻塞状态
    redis.call('publish', KEYS[2], ARGV[1]);  

    -- 返回 1 表示设置成功
    return 1
else
    -- 如果 key 已经存在,则说明计数值已经初始化过了,直接返回 0
    return 0
end

await和countDown

​ 这两个方法实现较为简单,本质套路与 Lock 类似:

  • await:通过 ReclosableLatch 阻塞当前线程,等待计数值归零
  • countDown:递减计数值,当值减至 0 时,会通过 publish 发送一条解锁消息,由 CountDownLatchPubSub 负责处理,从而唤醒所有阻塞在 ReclosableLatch 上的线程,实现同步

RSemaphore

​ RSemaphore 是 Redisson 在分布式环境下对 JDK Semaphore 功能的实现,默认实现类为 RedissonSemaphore。它的机制与 RCountDownLatch 类似:通过 Redis 的 string 存储 permits 数量,再结合 JDK Semaphore 与 Redis pub/sub 机制 来完成节点线程的阻塞与唤醒

​ 整体实现套路和前面介绍的基本一致,所以无需展开分析。只需记住,在使用前要先调用 org.redisson.api.RSemaphore#trySetPermits 来设置初始的 permits 数量