Skip to content

Commit

Permalink
实现 caffeine 的 cache 接口,提供 getAll, putAll, invalidateAll 批处理功能
Browse files Browse the repository at this point in the history
  • Loading branch information
FlyInWind1 committed Oct 27, 2022
1 parent 78d709a commit 5b63f92
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 26 deletions.
18 changes: 18 additions & 0 deletions src/main/java/com/pig4cloud/plugin/cache/enums/CacheOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.pig4cloud.plugin.cache.enums;

import com.fasterxml.jackson.annotation.JsonFormat;

@JsonFormat(shape = JsonFormat.Shape.NUMBER)
public enum CacheOperation {

/**
* 清除 key
*/
EVICT,

/**
* 批量清除
*/
EVICT_BATCH

}
14 changes: 14 additions & 0 deletions src/main/java/com/pig4cloud/plugin/cache/support/CacheManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.pig4cloud.plugin.cache.support;

import com.github.benmanes.caffeine.cache.Cache;

public interface CacheManager extends org.springframework.cache.CacheManager {

/**
* 实现 caffeine 的 {@link Cache} 接口的缓存
* @param name 缓存名
* @return {@link Cache} 的实现
*/
<K, V> Cache<K, V> getCaffeineCache(String name);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pig4cloud.plugin.cache.support;

import com.pig4cloud.plugin.cache.enums.CacheOperation;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -19,6 +20,8 @@ public class CacheMessage implements Serializable {

private String cacheName;

private CacheOperation operation;

private Object key;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public class CacheMessageListener implements MessageListener {
public void onMessage(Message message, byte[] pattern) {
CacheMessage cacheMessage = (CacheMessage) redisSerializer.deserialize(message.getBody());
if (!Objects.equals(cacheMessage.getServerId(), redisCaffeineCacheManager.getServerId())) {
log.debug("receive a redis topic message, clear local cache, the cacheName is {}, the key is {}",
cacheMessage.getCacheName(), cacheMessage.getKey());
redisCaffeineCacheManager.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey());
log.debug(
"receive a redis topic message, clear local cache, the cacheName is {}, operation is {}, the key is {}",
cacheMessage.getCacheName(), cacheMessage.getOperation(), cacheMessage.getKey());
redisCaffeineCacheManager.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey(),
cacheMessage.getOperation());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
package com.pig4cloud.plugin.cache.support;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.pig4cloud.plugin.cache.enums.CacheOperation;
import com.pig4cloud.plugin.cache.properties.CacheConfigProperties;
import com.pig4cloud.plugin.cache.util.CollUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.cache.support.NullValue;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
* @author fuwei.deng
* @version 1.0.0
*/
@Slf4j
@Getter
public class RedisCaffeineCache extends AbstractValueAdaptingCache {
public class RedisCaffeineCache extends AbstractValueAdaptingCache implements Cache<Object, Object> {

private final String name;

Expand Down Expand Up @@ -147,7 +159,7 @@ public void clear() {
stringKeyRedisTemplate.delete(keys);
}

push(null);
push((Object) null);

caffeineCache.invalidateAll();
}
Expand Down Expand Up @@ -187,7 +199,11 @@ protected Duration getExpire(Object value) {
}

protected void push(Object key) {
push(new CacheMessage(this.serverId, this.name, key));
push(key, CacheOperation.EVICT);
}

protected void push(Object key, CacheOperation operation) {
push(new CacheMessage(this.serverId, this.name, operation, key));
}

/**
Expand Down Expand Up @@ -218,12 +234,22 @@ public void clearLocal(Object key) {
}
}

public void clearLocalBatch(Iterable<Object> keys) {
log.debug("clear local cache, the keys is : {}", keys);
caffeineCache.invalidateAll(keys);
}

protected void setRedisValue(Object key, Object value, Duration expire) {
setRedisValue(key, value, expire, stringKeyRedisTemplate.opsForValue());
}

protected void setRedisValue(Object key, Object value, Duration expire,
ValueOperations<Object, Object> valueOperations) {
if (!expire.isNegative() && !expire.isZero()) {
stringKeyRedisTemplate.opsForValue().set(getKey(key), value, expire);
valueOperations.set(getKey(key), value, expire);
}
else {
stringKeyRedisTemplate.opsForValue().set(getKey(key), value);
valueOperations.set(getKey(key), value);
}
}

Expand All @@ -239,4 +265,188 @@ protected Object getCaffeineValue(Object key) {
return caffeineCache.getIfPresent(key);
}

// ---------- 对 Caffeine Cache 接口的实现

@Override
public @Nullable Object getIfPresent(@NonNull Object key) {
ValueWrapper valueWrapper = get(key);
if (valueWrapper == null) {
return null;
}
return valueWrapper.get();
}

@Override
public @Nullable Object get(@NonNull Object key, @NonNull Function<? super Object, ?> mappingFunction) {
return get(key, (Callable<Object>) () -> mappingFunction.apply(key));
}

@Override
@SuppressWarnings("unchecked")
public @NonNull Map<@NonNull Object, @NonNull Object> getAllPresent(@NonNull Iterable<@NonNull ?> keys) {
GetAllContext context = new GetAllContext((Iterable<Object>) keys);
doGetAll(context);
Map<Object, Object> cachedKeyValues = context.cachedKeyValues;
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
return result;
}

@Override
@SuppressWarnings("unchecked")
public @NonNull Map<Object, Object> getAll(@NonNull Iterable<?> keys,
@NonNull Function<Iterable<?>, @NonNull Map<Object, Object>> mappingFunction) {
GetAllContext context = new GetAllContext((Iterable<Object>) keys);
context.saveRedisAbsentKeys = true;
doGetAll(context);
int redisAbsentCount = context.redisAbsentCount;
Map<Object, Object> cachedKeyValues = context.cachedKeyValues;
if (redisAbsentCount == 0) {
// 所有 key 全部命中缓存
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
return result;
}
// 从 mappingFunction 中获取值
Map<Object, Object> mappingKeyValues = mappingFunction.apply(context.redisAbsentKeys);
putAll(mappingKeyValues);
Map<Object, Object> result = new HashMap<>(cachedKeyValues.size() + mappingKeyValues.size(), 1);
cachedKeyValues.forEach((k, v) -> result.put(k, fromStoreValue(v)));
result.putAll(mappingKeyValues);
return result;
}

@SuppressWarnings("unchecked")
protected void doGetAll(GetAllContext context) {
context.cachedKeyValues = caffeineCache.getAll(context.allKeys, keyIterable -> {
Collection<Object> caffeineAbsentKeys = CollUtil.toCollection((Iterable<Object>) keyIterable);
Collection<Object> redisKeys = CollUtil.trans(caffeineAbsentKeys, this::getKey);
// 从 redis 批量获取
List<Object> redisValues = stringKeyRedisTemplate.opsForValue().multiGet(redisKeys);
Objects.requireNonNull(redisValues);
// 统计 redis 中没有的 key 数量
int redisAbsentCount = 0;
for (Object value : redisValues) {
if (value == null) {
redisAbsentCount++;
}
}
context.redisAbsentCount = redisAbsentCount;
HashMap<Object, Object> result = new HashMap<>(caffeineAbsentKeys.size() - redisAbsentCount, 1);
boolean saveCacheAbsentKeys = context.saveRedisAbsentKeys;
if (saveCacheAbsentKeys) {
// mappingFunction 的参数
context.redisAbsentKeys = new ArrayList<>(redisAbsentCount);
}
int index = 0;
for (Object key : caffeineAbsentKeys) {
Object redisValue = redisValues.get(index);
if (redisValue != null) {
result.put(key, redisValue);
}
else if (saveCacheAbsentKeys) {
context.redisAbsentKeys.add(key);
}
index++;
}
return result;
});
}

protected static class GetAllContext {

public GetAllContext(Iterable<Object> allKeys) {
this.allKeys = allKeys;
}

protected Iterable<Object> allKeys;

/**
* 是否将redis未查询到的key保存到 {@link #redisAbsentKeys}
*/
protected boolean saveRedisAbsentKeys = false;

/**
* redis中未查询到的key
*/
protected List<Object> redisAbsentKeys;

/**
* redis中未查询到的key数量
*/
protected int redisAbsentCount;

/**
* caffeine和redis中缓存的键值,未经过{@link #fromStoreValue}转换
*/
protected Map<Object, Object> cachedKeyValues;

}

@Override
public void putAll(@NonNull Map<?, ?> map) {
stringKeyRedisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
@SuppressWarnings("unchecked")
public <K, V> Object execute(@NonNull RedisOperations<K, V> operations) throws DataAccessException {
ValueOperations<Object, Object> valueOperations = (ValueOperations<Object, Object>) operations
.opsForValue();
map.forEach((k, v) -> {
Object o = toStoreValue(v);
Duration expire = getExpire(o);
setRedisValue(k, o, expire, valueOperations);
setCaffeineValue(k, o);
});
return null;
}
});
push(new ArrayList<>(map.keySet()), CacheOperation.EVICT_BATCH);
}

@Override
public void invalidate(@NonNull Object key) {
evict(key);
}

@Override
public void invalidateAll(@NonNull Iterable<@NonNull ?> keys) {
Collection<?> keysColl = CollUtil.toCollection(keys);
Collection<Object> redisKeys = CollUtil.trans(keysColl, this::getKey);
stringKeyRedisTemplate.delete(redisKeys);
push(keysColl, CacheOperation.EVICT_BATCH);
caffeineCache.invalidateAll(keysColl);
}

@Override
public void invalidateAll() {
this.clear();
}

// ---------- 单纯的代理 caffeineCache

@Override
public @NonNegative long estimatedSize() {
return caffeineCache.estimatedSize();
}

@Override
public @NonNull CacheStats stats() {
return caffeineCache.stats();
}

@Override
public @NonNull ConcurrentMap<@NonNull Object, @NonNull Object> asMap() {
return caffeineCache.asMap();
}

@Override
public void cleanUp() {
caffeineCache.cleanUp();
}

@Override
public @NonNull Policy<Object, Object> policy() {
return caffeineCache.policy();
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.pig4cloud.plugin.cache.support;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.pig4cloud.plugin.cache.enums.CacheOperation;
import com.pig4cloud.plugin.cache.properties.CacheConfigProperties;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.core.RedisTemplate;

import java.time.Duration;
Expand Down Expand Up @@ -63,6 +63,12 @@ public Cache getCache(String name) {
return oldCache == null ? cache : oldCache;
}

@Override
@SuppressWarnings("unchecked")
public <K, V> com.github.benmanes.caffeine.cache.Cache<K, V> getCaffeineCache(String name) {
return (com.github.benmanes.caffeine.cache.Cache<K, V>) getCache(name);
}

public RedisCaffeineCache createCache(String name) {
return new RedisCaffeineCache(name, stringKeyRedisTemplate, caffeineCache(), cacheConfigProperties);
}
Expand Down Expand Up @@ -118,13 +124,23 @@ public Collection<String> getCacheNames() {
}

public void clearLocal(String cacheName, Object key) {
clearLocal(cacheName, key, CacheOperation.EVICT);
}

@SuppressWarnings("unchecked")
public void clearLocal(String cacheName, Object key, CacheOperation operation) {
Cache cache = cacheMap.get(cacheName);
if (cache == null) {
return;
}

RedisCaffeineCache redisCaffeineCache = (RedisCaffeineCache) cache;
redisCaffeineCache.clearLocal(key);
if (CacheOperation.EVICT_BATCH.equals(operation)) {
redisCaffeineCache.clearLocalBatch((Iterable<Object>) key);
}
else {
redisCaffeineCache.clearLocal(key);
}
}

}
Loading

0 comments on commit 5b63f92

Please sign in to comment.