Redisson — 分布式本地缓存RLocalCachedMap的实现

​ 基于 Redisson 3.16.8,对 RedissonLocalCachedMap 的功能及核心方法进行了深入解析,并重点探讨其关键参数对整体缓存行为的影响,为后续定向优化提供参考

RLocalCachedMap

​ 在看 Redisson 源码时发现一个挺有意思的工具:RLocalCachedMap,它的主要实现类是 RedissonLocalCachedMap。简单来说,它是一个既在本地内存缓存数据、又能和 Redis 同步的 Map,也可以理解成 带本地缓存的分布式 Map

它的核心思路很简单:

  • 读操作:优先从 JVM 本地内存获取,如果本地没有,就fallback到从远程Redis中获取,再把结果缓存到本地。这样大多数情况下直接走内存,不走网络IO,速度非常快
  • 写操作(新增、更新、删除):不仅写入 Redis,还会通过 Redis 的 Pub/Sub 通知其他节点,把它们本地的副本中对应的缓存数据更新掉,从而保持多节点数据一致性

​ 换句话说,就是用客户端内存换取网络开销,非常适合那些读多写少缓存数据量不大的分布式场景。比如:经常访问的配置、热点字典数据等等。接下来我重点分析下其实现类RedissonLocalCachedMap的一些核心方法

RedissonLocalCachedMap

核心字段

// 本地缓存更新日志的有效期(毫秒),默认 10 分钟。
// 用于 LOAD 类型的重连策略,判断缓存是否需要全量清理。
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);

// 当前RedissonLocalCachedMap实例的id
private byte[] instanceId;

// 本地缓存,提供了三方的caffeine缓存和redisson内部自己设计的缓存
// 默认用redisson自己设计的缓存:org.redisson.api.LocalCachedMapOptions.EvictionPolicy
private ConcurrentMap<CacheKey, CacheValue> cache;
// 默认为1
private int invalidateEntryOnChange;
// 同步策略,默认为SyncStrategy#INVALIDATE。 即变更时删除本地缓存,下一次访问再从 Redis 拉取,属于懒加载策略
private SyncStrategy syncStrategy;

// 存储模式,决定数据存放的位置
// 默认为org.redisson.api.LocalCachedMapOptions.StoreMode#LOCALCACHE_REDIS,即本地缓存和redis缓存都使用
private LocalCachedMapOptions.StoreMode storeMode;

// 是否缓存 value 为 null 的结果,默认 false(不缓存空值)
private boolean storeCacheMiss;

// 负责订阅 Redis 通知并处理本地缓存同步的监听器
private LocalCacheListener listener;

// 提供本地缓存的只读视图,用于遍历和调试
private LocalCacheView<K, V> localCacheView;

构造方法

​ 在 RedissonLocalCachedMap 的构造方法里,init 会创建一个 LocalCacheListener,这是非常核心的组件。我们重点看它的 add 方法,它负责订阅 Redis Channel 并注册消息监听器,后续所有写操作发出的通知都会通过这个 Channel,被 MessageListener 接收并最终更新本地缓存。核心逻辑如下:

  • 根据 ReconnectionStrategy 决定是否添加 StatusListener

    ​ 当客户端订阅 Redis Channel 成功(包括断开重连)时,会触发 onSubscribe 方法。RLocalCachedMap 会根据策略处理本地缓存,策略行为如下

    • ReconnectionStrategy.CLEAR:重连后直接清空本地缓存,不保留任何旧数据
    • ReconnectionStrategy.LOAD:如果上次更新距今超过 10 分钟,则直接清空本地缓存否则只清空断开期间有变动的 key(用了一个Redis ScoredSortedSet数据结构和ScoredSetEvictionTask定时任务保存最近11分钟的操作key,用于增量失效
  • 只有同步策略不是 SyncStrategy.NONE 才会添加 MessageListener,内部处理不同消息的逻辑如下

    • LocalCachedMapDisable禁用指定 key
    • LocalCachedMapEnable启用指定 key
    • LocalCachedMapClear清空本地缓存
    • LocalCachedMapInvalidate删除本地缓存中的某些 key
    • LocalCachedMapUpdate增加或更新本地缓存
public void add(Map<?, ?> cache) {
    this.cache = cache;
    // 使用专属的LocalCachedMessageCodec编解码器处理channel里发布的消息
    // topic格式为,默认也是channelName -> {RLocalCachedMap的name}:topic
    invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor,getInvalidationTopicName());

    // 如果设置了重连策略,则在重新订阅时触发处理
    if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
        reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
                    cache.clear();
                }
                if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD && lastInvalidate > 0) {
                    loadAfterReconnection();
                }
            }
        });
    }

    // 如果启用同步策略,则注册消息监听器
    if (options.getSyncStrategy() != SyncStrategy.NONE) {
        // 开始subscribe redis的channel并注册对应的listener
        syncListenerId = invalidationTopic.addListener(Object.class, new MessageListener<Object>() {
            @Override
            public void onMessage(CharSequence channel, Object msg) {
                if (msg instanceof LocalCachedMapDisable) {
                    LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
                    String requestId = m.getRequestId();
                    Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
                    for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
                        CacheKey key = new CacheKey(keyHash);
                        keysToDisable.add(key);
                    }

                    disableKeys(requestId, keysToDisable, m.getTimeout());

                    RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
                            commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
                    topic.publishAsync(new LocalCachedMapDisableAck());
                }

                if (msg instanceof LocalCachedMapEnable) {
                    LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
                    for (byte[] keyHash : m.getKeyHashes()) {
                        CacheKey key = new CacheKey(keyHash);
                        disabledKeys.remove(key, m.getRequestId());
                    }
                }

                // 清空缓存的消息。比如Map的clear和delete等方法
                if (msg instanceof LocalCachedMapClear) {
                    LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg;
                    if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) { // 排除指定的实例。因为发起方客户端已经操作过了,不再需要从订阅中获取到消息后再处理一次
                        cache.clear();

                        if (clearMsg.isReleaseSemaphore()) {
                            RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId());
                            semaphore.releaseAsync();
                        }
                    }
                }
                // 删除key的消息
                if (msg instanceof LocalCachedMapInvalidate) {
                    LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate) msg;
                    if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { // 首先排除指定的实例
                        for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
                            CacheKey key = new CacheKey(keyHash);
                            cache.remove(key);
                        }
                    }
                }
                // 添加、更新key的消息(操作本地缓存)
                if (msg instanceof LocalCachedMapUpdate) {
                    LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;

                    if (!Arrays.equals(updateMsg.getExcludedId(), instanceId)) { // 首先排除指定的实例
                        for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
                            ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
                            ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
                            try {
                                updateCache(keyBuf, valueBuf);
                            } catch (IOException e) {
                                log.error("Can't decode map entry", e);
                            } finally {
                                keyBuf.release();
                                valueBuf.release();
                            }
                        }
                    }
                }

                // 更新 lastInvalidate 时间,用于 LOAD 重连策略判断
                if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
                    lastInvalidate = System.currentTimeMillis();
                }
            }

        });

        String disabledKeysName = RedissonObject.suffixName(name, DISABLED_KEYS_SUFFIX);
        RListMultimapCache<LocalCachedMapDisabledKey, String> multimap = new RedissonListMultimapCache<LocalCachedMapDisabledKey, String>(
                null, codec, commandExecutor, disabledKeysName);

        for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) {
            Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
            for (String hash : multimap.getAll(key)) {
                CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash));
                keysToDisable.add(cacheKey);
            }

            disableKeys(key.getRequestId(), keysToDisable, key.getTimeout());
        }
    }
}

查询

get

get 方法整体逻辑很直接:先从本地缓存拿,如果有就直接返回;如果本地没有,再根据 storeMode 判断是否需要去远程 Redis 获取数据。拿到远程数据后,会顺便缓存到本地,方便下次直接读取。

public RFuture<V> getAsync(Object key) {
    checkKey(key);

    CacheKey cacheKey = localCacheView.toCacheKey(key);
    // 1. 优先从本地缓存获取
    CacheValue cacheValue = cache.get(cacheKey);
    if (cacheValue != null && (storeCacheMiss || cacheValue.getValue() != null)) {
        return RedissonPromise.newSucceededFuture((V) cacheValue.getValue());
    }

    // 2. 仅使用本地缓存模式的情况(StoreMode.LOCALCACHE)来构建结果
    if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
        if (hasNoLoader()) { // 无Loader直接构建null
            return RedissonPromise.newSucceededFuture(null);
        }

        // 有Loader:异步加载并写入本地缓存
        CompletableFuture<V> future = loadValue((K) key, false);
        CompletableFuture<V> f = future.thenApply(value -> {
            if (storeCacheMiss || value != null) {
                cachePut(cacheKey, key, value);
            }
            return value;
        });
        return new CompletableFutureWrapper<>(f);
    }

    // 3. 从远程Redis获取并缓存到本地
    RPromise<V> result = new RedissonPromise<>();
    RFuture<V> future = super.getAsync((K) key);
    future.onComplete((value, e) -> {
        if (e != null) {
            result.tryFailure(e);
            return;
        }

        // 缓存到本地
        if (storeCacheMiss || value != null) {
            cachePut(cacheKey, key, value);
        }
        result.trySuccess(value);
    });
    return result;
}

添加

put

​ 源码里 put 的整体流程是:先写本地缓存,再根据 syncStrategy 构建要发布的消息(LocalCachedMapUpdate或LocalCachedMapInvalidate),最后通过 Lua 脚本操作 Redis。重点逻辑如下:

  1. 在 Redis 的哈希表里设置对应字段的值
  2. 根据 invalidateEntryOnChange 判断:
    • 如果1,只发布消息通知其他节点更新本地缓存
    • 如果2,除了发布消息,还会把 key 添加到 ScoredSortedSet,用于断线重连时增量更新
  3. 返回字段原来的旧值
local v = redis.call('hget', KEYS[1], ARGV[1]); -- 从 Redis 哈希表中获取字段的当前值
redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); -- 将哈希表中的字段设置为新值
if ARGV[4] == '1' then -- invalidateEntryOnChange == 1,则只发布消息
    redis.call('publish', KEYS[2], ARGV[3]);
end;
if ARGV[4] == '2' then -- invalidateEntryOnChange == 2,则向ScoredSortedSet添加元素,用于后续断连恢复数据;再发布消息
    redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);
    redis.call('publish', KEYS[2], ARGV[3]);
end;
return v; -- 返回旧值

--[[ 
    KEYS[1]: RLocalCachedMap的name
    KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
    KEYS[3]: ScoredSortedSet的keyName
    
    ARGV[1]:mapKey
    ARGV[2]:mapValue
    ARGV[3]:向Channel发送的消息
    ARGV[4]:invalidateEntryOnChange
    ARGV[5]:System.currentTimeMillis()
    ARGV[6]:entryId,可定位到mapKey
]]
fastPut

​ fastPut的代码逻辑和put一样,只是lua脚本略有区别。它是先发布消息,再写 Redis,并在最后只返回 0/1 表示值是更新还是新增,从而加快整体的响应

if ARGV[4] == '1' then -- invalidateEntryOnChange == 1
    redis.call('publish', KEYS[2], ARGV[3]); -- 向频道 KEYS[2] 发布消息 ARGV[3]
end;
if ARGV[4] == '2' then -- invalidateEntryOnChange == 2
    redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]); -- 向有序集合 KEYS[3] 添加元素 ARGV[6],分值为 ARGV[5]
    redis.call('publish', KEYS[2], ARGV[3]); -- 向频道 KEYS[2] 发布消息 ARGV[3]
end;
if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then -- 将哈希表 KEYS[1] 的字段 ARGV[1] 设置为 ARGV[2]。如果返回 0 表示是更新
    return 0; -- 更新操作
end;
return 1; -- 添加操作

--[[ 
    KEYS[1]: RLocalCachedMap的name
    KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
    KEYS[3]: ScoredSortedSet的keyName
    
    ARGV[1]:mapKey
    ARGV[2]:mapValue
    ARGV[3]:向Channel发送的消息
    ARGV[4]:invalidateEntryOnChange
    ARGV[5]:System.currentTimeMillis()
    ARGV[6]:entryId,可定位到mapKey
]]

删除

remove

remove 的整体流程是:先删除本地缓存,然后根据 storeMode 决定是否只发消息还是同时操作 Redis。Lua逻辑如下:

  1. 删除 Redis 的哈希表里对应字段
  2. 删除成功则根据 invalidateEntryOnChange 判断:
    • 如果1,只发布LocalCachedMapInvalidate消息,通知其他节点删除本地缓存
    • 如果2,除了发布消息,还会把 key 添加到 ScoredSortedSet,用于断线重连时增量操作
  3. 返回字段原来的旧值
local v = redis.call('hget', KEYS[1], ARGV[1]); -- 获取哈希表 KEYS[1] 中字段 ARGV[1] 的值
if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then -- 成功删除哈希表 KEYS[1] 中字段 ARGV[1]
    if ARGV[3] == '1' then -- invalidateEntryOnChange == 1
        redis.call('publish', KEYS[2], ARGV[2]); -- 向频道 KEYS[2] 发布消息 ARGV[2]
    end;
    if ARGV[3] == '2' then -- invalidateEntryOnChange == 2
        redis.call('zadd', KEYS[3], ARGV[4], ARGV[5]); -- 向有序集合 KEYS[3] 添加一个元素 ARGV[5],分数为 ARGV[4]
        redis.call('publish', KEYS[2], ARGV[2]); -- 向频道 KEYS[2] 发布消息 ARGV[2]
    end;
end;
return v -- 返回旧值

--[[ 
    KEYS[1]: RLocalCachedMap的name
    KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
    KEYS[3]: ScoredSortedSet的keyName
    
    ARGV[1]:mapKey
    ARGV[2]:向Channel发送的消息
    ARGV[3]:invalidateEntryOnChange
    ARGV[4]:System.currentTimeMillis()
    ARGV[5]:entryId,可定位到mapKey
]]
fastRemove

fastRemove 支持批量删除,核心逻辑和 remove 类似,优化操作和fastPut类似:不返回旧值,只返回删除成功的数量,源码就没必要分析了。适合只关心操作结果而不需要旧数据的场景

清空

clear

clear 非常直接:先清空本地缓存,然后删除 Redis 中整个 hash key,并通过 Channel 发布一条 LocalCachedMapClear 消息,让其他节点也清空它们的本地缓存

总结

​ 整体来看这个工具实现还是挺健壮的,ReconnectionStrategy 搭配 ScoredSortedSet,很好地处理了断线重连后的本地缓存策略;本地缓存可以选择性能更强的 CAFFEINE,或者使用 Redisson 自带的多种策略实现;如果只想用本地缓存,可以设置为 StoreMode.LOCALCACHE;再加上丰富的 API,操作也很灵活。总的来说,这个工具还是非常值得一试

构建示例

​ SpringBoot场景下,简单创建一个CAFFEINE作为本地缓存,缓存策略为本地 + 远程,断线重连策略为 LOAD,同步策略为 INVALIDATE的分布式本地缓存Map

@Bean
public RLocalCachedMap<String, String> buildRLocalCachedMap(RedissonClient redisson) {
    LocalCachedMapCacheOptions<String, String> options = LocalCachedMapCacheOptions.<String, String>defaults()
            // 断线重连策略:10分钟内决定是否clear操作
            .reconnectionStrategy(LocalCachedMapCacheOptions.ReconnectionStrategy.LOAD)
            // 使用CAFFEINE作为本地缓存
            .cacheProvider(LocalCachedMapCacheOptions.CacheProvider.CAFFEINE)
            .syncStrategy(LocalCachedMapCacheOptions.SyncStrategy.INVALIDATE);
    return redisson.getLocalCachedMapCache("countryDictCache", options);
}