Redisson — 分布式本地缓存RLocalCachedMap的实现
基于 Redisson 3.16.8,对 RedissonLocalCachedMap 的功能及核心方法进行了深入解析,并重点探讨其关键参数对整体缓存行为的影响,为后续定向优化提供参考
RLocalCachedMap
在看 Redisson 源码时发现一个挺有意思的工具:RLocalCachedMap,它的主要实现类是 RedissonLocalCachedMap。简单来说,它是一个既在本地内存缓存数据、又能和 Redis 同步的 Map,也可以理解成 带本地缓存的分布式 Map
它的核心思路很简单:
- 读操作:优先从 JVM 本地内存获取,如果本地没有,就fallback到从远程Redis中获取,再把结果缓存到本地。这样大多数情况下直接走内存,不走网络IO,速度非常快
- 写操作(新增、更新、删除):不仅写入 Redis,还会通过 Redis 的 Pub/Sub 通知其他节点,把它们本地的副本中对应的缓存数据更新掉,从而保持多节点数据一致性
换句话说,就是用客户端内存换取网络开销,非常适合那些读多写少、缓存数据量不大的分布式场景。比如:经常访问的配置、热点字典数据等等。接下来我重点分析下其实现类RedissonLocalCachedMap的一些核心方法
RedissonLocalCachedMap
核心字段
// 本地缓存更新日志的有效期(毫秒),默认 10 分钟。
// 用于 LOAD 类型的重连策略,判断缓存是否需要全量清理。
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
// 当前RedissonLocalCachedMap实例的id
private byte[] instanceId;
// 本地缓存,提供了三方的caffeine缓存和redisson内部自己设计的缓存
// 默认用redisson自己设计的缓存:org.redisson.api.LocalCachedMapOptions.EvictionPolicy
private ConcurrentMap<CacheKey, CacheValue> cache;
// 默认为1
private int invalidateEntryOnChange;
// 同步策略,默认为SyncStrategy#INVALIDATE。 即变更时删除本地缓存,下一次访问再从 Redis 拉取,属于懒加载策略
private SyncStrategy syncStrategy;
// 存储模式,决定数据存放的位置
// 默认为org.redisson.api.LocalCachedMapOptions.StoreMode#LOCALCACHE_REDIS,即本地缓存和redis缓存都使用
private LocalCachedMapOptions.StoreMode storeMode;
// 是否缓存 value 为 null 的结果,默认 false(不缓存空值)
private boolean storeCacheMiss;
// 负责订阅 Redis 通知并处理本地缓存同步的监听器
private LocalCacheListener listener;
// 提供本地缓存的只读视图,用于遍历和调试
private LocalCacheView<K, V> localCacheView;
构造方法
在 RedissonLocalCachedMap 的构造方法里,init 会创建一个 LocalCacheListener,这是非常核心的组件。我们重点看它的 add 方法,它负责订阅 Redis Channel 并注册消息监听器,后续所有写操作发出的通知都会通过这个 Channel,被 MessageListener 接收并最终更新本地缓存。核心逻辑如下:
根据 ReconnectionStrategy 决定是否添加 StatusListener
当客户端订阅 Redis Channel 成功(包括断开重连)时,会触发
onSubscribe方法。RLocalCachedMap 会根据策略处理本地缓存,策略行为如下- ReconnectionStrategy.CLEAR:重连后直接清空本地缓存,不保留任何旧数据
- ReconnectionStrategy.LOAD:如果上次更新距今超过 10 分钟,则直接清空本地缓存;否则只清空断开期间有变动的 key(用了一个Redis ScoredSortedSet数据结构和ScoredSetEvictionTask定时任务保存最近11分钟的操作key,用于增量失效)
只有同步策略不是 SyncStrategy.NONE 才会添加 MessageListener,内部处理不同消息的逻辑如下
LocalCachedMapDisable:禁用指定 keyLocalCachedMapEnable:启用指定 keyLocalCachedMapClear:清空本地缓存LocalCachedMapInvalidate:删除本地缓存中的某些 keyLocalCachedMapUpdate:增加或更新本地缓存
public void add(Map<?, ?> cache) {
this.cache = cache;
// 使用专属的LocalCachedMessageCodec编解码器处理channel里发布的消息
// topic格式为,默认也是channelName -> {RLocalCachedMap的name}:topic
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor,getInvalidationTopicName());
// 如果设置了重连策略,则在重新订阅时触发处理
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD && lastInvalidate > 0) {
loadAfterReconnection();
}
}
});
}
// 如果启用同步策略,则注册消息监听器
if (options.getSyncStrategy() != SyncStrategy.NONE) {
// 开始subscribe redis的channel并注册对应的listener
syncListenerId = invalidationTopic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (msg instanceof LocalCachedMapDisable) {
LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
String requestId = m.getRequestId();
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
keysToDisable.add(key);
}
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}
if (msg instanceof LocalCachedMapEnable) {
LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
for (byte[] keyHash : m.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
disabledKeys.remove(key, m.getRequestId());
}
}
// 清空缓存的消息。比如Map的clear和delete等方法
if (msg instanceof LocalCachedMapClear) {
LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg;
if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) { // 排除指定的实例。因为发起方客户端已经操作过了,不再需要从订阅中获取到消息后再处理一次
cache.clear();
if (clearMsg.isReleaseSemaphore()) {
RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId());
semaphore.releaseAsync();
}
}
}
// 删除key的消息
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate) msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { // 首先排除指定的实例
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
// 添加、更新key的消息(操作本地缓存)
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
if (!Arrays.equals(updateMsg.getExcludedId(), instanceId)) { // 首先排除指定的实例
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
updateCache(keyBuf, valueBuf);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
}
// 更新 lastInvalidate 时间,用于 LOAD 重连策略判断
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
});
String disabledKeysName = RedissonObject.suffixName(name, DISABLED_KEYS_SUFFIX);
RListMultimapCache<LocalCachedMapDisabledKey, String> multimap = new RedissonListMultimapCache<LocalCachedMapDisabledKey, String>(
null, codec, commandExecutor, disabledKeysName);
for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) {
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (String hash : multimap.getAll(key)) {
CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash));
keysToDisable.add(cacheKey);
}
disableKeys(key.getRequestId(), keysToDisable, key.getTimeout());
}
}
}
查询
get
get 方法整体逻辑很直接:先从本地缓存拿,如果有就直接返回;如果本地没有,再根据 storeMode 判断是否需要去远程 Redis 获取数据。拿到远程数据后,会顺便缓存到本地,方便下次直接读取。
public RFuture<V> getAsync(Object key) {
checkKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(key);
// 1. 优先从本地缓存获取
CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && (storeCacheMiss || cacheValue.getValue() != null)) {
return RedissonPromise.newSucceededFuture((V) cacheValue.getValue());
}
// 2. 仅使用本地缓存模式的情况(StoreMode.LOCALCACHE)来构建结果
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
if (hasNoLoader()) { // 无Loader直接构建null
return RedissonPromise.newSucceededFuture(null);
}
// 有Loader:异步加载并写入本地缓存
CompletableFuture<V> future = loadValue((K) key, false);
CompletableFuture<V> f = future.thenApply(value -> {
if (storeCacheMiss || value != null) {
cachePut(cacheKey, key, value);
}
return value;
});
return new CompletableFutureWrapper<>(f);
}
// 3. 从远程Redis获取并缓存到本地
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.getAsync((K) key);
future.onComplete((value, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
// 缓存到本地
if (storeCacheMiss || value != null) {
cachePut(cacheKey, key, value);
}
result.trySuccess(value);
});
return result;
}
添加
put
源码里 put 的整体流程是:先写本地缓存,再根据 syncStrategy 构建要发布的消息(LocalCachedMapUpdate或LocalCachedMapInvalidate),最后通过 Lua 脚本操作 Redis。重点逻辑如下:
- 在 Redis 的哈希表里设置对应字段的值
- 根据
invalidateEntryOnChange判断:- 如果是
1,只发布消息通知其他节点更新本地缓存 - 如果是
2,除了发布消息,还会把 key 添加到 ScoredSortedSet,用于断线重连时增量更新
- 如果是
- 返回字段原来的旧值
local v = redis.call('hget', KEYS[1], ARGV[1]); -- 从 Redis 哈希表中获取字段的当前值
redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); -- 将哈希表中的字段设置为新值
if ARGV[4] == '1' then -- invalidateEntryOnChange == 1,则只发布消息
redis.call('publish', KEYS[2], ARGV[3]);
end;
if ARGV[4] == '2' then -- invalidateEntryOnChange == 2,则向ScoredSortedSet添加元素,用于后续断连恢复数据;再发布消息
redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);
redis.call('publish', KEYS[2], ARGV[3]);
end;
return v; -- 返回旧值
--[[
KEYS[1]: RLocalCachedMap的name
KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
KEYS[3]: ScoredSortedSet的keyName
ARGV[1]:mapKey
ARGV[2]:mapValue
ARGV[3]:向Channel发送的消息
ARGV[4]:invalidateEntryOnChange
ARGV[5]:System.currentTimeMillis()
ARGV[6]:entryId,可定位到mapKey
]]
fastPut
fastPut的代码逻辑和put一样,只是lua脚本略有区别。它是先发布消息,再写 Redis,并在最后只返回 0/1 表示值是更新还是新增,从而加快整体的响应
if ARGV[4] == '1' then -- invalidateEntryOnChange == 1
redis.call('publish', KEYS[2], ARGV[3]); -- 向频道 KEYS[2] 发布消息 ARGV[3]
end;
if ARGV[4] == '2' then -- invalidateEntryOnChange == 2
redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]); -- 向有序集合 KEYS[3] 添加元素 ARGV[6],分值为 ARGV[5]
redis.call('publish', KEYS[2], ARGV[3]); -- 向频道 KEYS[2] 发布消息 ARGV[3]
end;
if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then -- 将哈希表 KEYS[1] 的字段 ARGV[1] 设置为 ARGV[2]。如果返回 0 表示是更新
return 0; -- 更新操作
end;
return 1; -- 添加操作
--[[
KEYS[1]: RLocalCachedMap的name
KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
KEYS[3]: ScoredSortedSet的keyName
ARGV[1]:mapKey
ARGV[2]:mapValue
ARGV[3]:向Channel发送的消息
ARGV[4]:invalidateEntryOnChange
ARGV[5]:System.currentTimeMillis()
ARGV[6]:entryId,可定位到mapKey
]]
删除
remove
remove 的整体流程是:先删除本地缓存,然后根据 storeMode 决定是否只发消息还是同时操作 Redis。Lua逻辑如下:
- 删除 Redis 的哈希表里对应字段
- 删除成功则根据
invalidateEntryOnChange判断:- 如果是
1,只发布LocalCachedMapInvalidate消息,通知其他节点删除本地缓存 - 如果是
2,除了发布消息,还会把 key 添加到 ScoredSortedSet,用于断线重连时增量操作
- 如果是
- 返回字段原来的旧值
local v = redis.call('hget', KEYS[1], ARGV[1]); -- 获取哈希表 KEYS[1] 中字段 ARGV[1] 的值
if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then -- 成功删除哈希表 KEYS[1] 中字段 ARGV[1]
if ARGV[3] == '1' then -- invalidateEntryOnChange == 1
redis.call('publish', KEYS[2], ARGV[2]); -- 向频道 KEYS[2] 发布消息 ARGV[2]
end;
if ARGV[3] == '2' then -- invalidateEntryOnChange == 2
redis.call('zadd', KEYS[3], ARGV[4], ARGV[5]); -- 向有序集合 KEYS[3] 添加一个元素 ARGV[5],分数为 ARGV[4]
redis.call('publish', KEYS[2], ARGV[2]); -- 向频道 KEYS[2] 发布消息 ARGV[2]
end;
end;
return v -- 返回旧值
--[[
KEYS[1]: RLocalCachedMap的name
KEYS[2]: 订阅的channel,即 -> {RLocalCachedMap的name}:topic
KEYS[3]: ScoredSortedSet的keyName
ARGV[1]:mapKey
ARGV[2]:向Channel发送的消息
ARGV[3]:invalidateEntryOnChange
ARGV[4]:System.currentTimeMillis()
ARGV[5]:entryId,可定位到mapKey
]]
fastRemove
fastRemove 支持批量删除,核心逻辑和 remove 类似,优化操作和fastPut类似:不返回旧值,只返回删除成功的数量,源码就没必要分析了。适合只关心操作结果而不需要旧数据的场景
清空
clear
clear 非常直接:先清空本地缓存,然后删除 Redis 中整个 hash key,并通过 Channel 发布一条 LocalCachedMapClear 消息,让其他节点也清空它们的本地缓存
总结
整体来看这个工具实现还是挺健壮的,ReconnectionStrategy 搭配 ScoredSortedSet,很好地处理了断线重连后的本地缓存策略;本地缓存可以选择性能更强的 CAFFEINE,或者使用 Redisson 自带的多种策略实现;如果只想用本地缓存,可以设置为 StoreMode.LOCALCACHE;再加上丰富的 API,操作也很灵活。总的来说,这个工具还是非常值得一试
构建示例
SpringBoot场景下,简单创建一个CAFFEINE作为本地缓存,缓存策略为本地 + 远程,断线重连策略为 LOAD,同步策略为 INVALIDATE的分布式本地缓存Map
@Bean
public RLocalCachedMap<String, String> buildRLocalCachedMap(RedissonClient redisson) {
LocalCachedMapCacheOptions<String, String> options = LocalCachedMapCacheOptions.<String, String>defaults()
// 断线重连策略:10分钟内决定是否clear操作
.reconnectionStrategy(LocalCachedMapCacheOptions.ReconnectionStrategy.LOAD)
// 使用CAFFEINE作为本地缓存
.cacheProvider(LocalCachedMapCacheOptions.CacheProvider.CAFFEINE)
.syncStrategy(LocalCachedMapCacheOptions.SyncStrategy.INVALIDATE);
return redisson.getLocalCachedMapCache("countryDictCache", options);
}