Skip to content

Commit

Permalink
Merge pull request #18 from FlyInWind1/master
Browse files Browse the repository at this point in the history
取消写死的 redisSerializer,以及对 NullValue 的特殊处理
  • Loading branch information
lltx authored Oct 8, 2022
2 parents 94447b2 + d0261ba commit e155222
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 60 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.pig4cloud.plugin.cache.properties.CacheConfigProperties;
import com.pig4cloud.plugin.cache.support.CacheMessageListener;
import com.pig4cloud.plugin.cache.support.RedisCaffeineCacheManager;
import com.pig4cloud.plugin.cache.support.RedisCaffeineCaffeineCacheManagerCustomizer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -15,8 +17,11 @@
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Objects;

/**
* @author fuwei.deng
* @version 1.0.0
Expand All @@ -27,10 +32,15 @@
public class MultilevelCacheAutoConfiguration {

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(RedisTemplate.class)
public RedisCaffeineCacheManager cacheManager(CacheConfigProperties cacheConfigProperties,
@Qualifier("stringKeyRedisTemplate") RedisTemplate<Object, Object> stringKeyRedisTemplate) {
return new RedisCaffeineCacheManager(cacheConfigProperties, stringKeyRedisTemplate);
@Qualifier("stringKeyRedisTemplate") RedisTemplate<Object, Object> stringKeyRedisTemplate,
ObjectProvider<RedisCaffeineCaffeineCacheManagerCustomizer> cacheManagerCustomizers) {
RedisCaffeineCacheManager cacheManager = new RedisCaffeineCacheManager(cacheConfigProperties,
stringKeyRedisTemplate);
cacheManagerCustomizers.orderedStream().forEach(customizer -> customizer.customize(cacheManager));
return cacheManager;
}

/**
Expand All @@ -47,15 +57,26 @@ public RedisTemplate<Object, Object> stringKeyRedisTemplate(RedisConnectionFacto
}

@Bean
@ConditionalOnMissingBean(name = "cacheMessageListenerContainer")
public RedisMessageListenerContainer cacheMessageListenerContainer(CacheConfigProperties cacheConfigProperties,
@Qualifier("stringKeyRedisTemplate") RedisTemplate<Object, Object> stringKeyRedisTemplate,
RedisCaffeineCacheManager redisCaffeineCacheManager) {
@Qualifier("cacheMessageListener") CacheMessageListener cacheMessageListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(stringKeyRedisTemplate.getConnectionFactory());
CacheMessageListener cacheMessageListener = new CacheMessageListener(redisCaffeineCacheManager);
redisMessageListenerContainer
.setConnectionFactory(Objects.requireNonNull(stringKeyRedisTemplate.getConnectionFactory()));
redisMessageListenerContainer.addMessageListener(cacheMessageListener,
new ChannelTopic(cacheConfigProperties.getRedis().getTopic()));
return redisMessageListenerContainer;
}

@Bean
@SuppressWarnings("unchecked")
@ConditionalOnMissingBean(name = "cacheMessageListener")
public CacheMessageListener cacheMessageListener(
@Qualifier("stringKeyRedisTemplate") RedisTemplate<Object, Object> stringKeyRedisTemplate,
RedisCaffeineCacheManager redisCaffeineCacheManager) {
return new CacheMessageListener((RedisSerializer<Object>) stringKeyRedisTemplate.getValueSerializer(),
redisCaffeineCacheManager);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.pig4cloud.plugin.cache.config;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.pig4cloud.plugin.cache.support.CacheMessage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
import org.springframework.cache.support.NullValue;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

/**
* 为 jackson 添加序列化和反序列化 NullValue, CacheMessage 支持
*
* @author FlyInWind
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Jackson2ObjectMapperBuilder.class)
public class CacheJackson2ObjectMapperBuilderCustomizer implements Jackson2ObjectMapperBuilderCustomizer {

@Override
public void customize(Jackson2ObjectMapperBuilder builder) {
// 由于 NullValue 有 final 修饰,而 jackson 配置的是 NON_FINAL
// 会导致 @class 信息不会添加到 json 中,序列化出来的 json 为空,最终报错
// 这里利用 ObjectMapper 的 mixIn 强制添加 @class 信息
builder.mixIn(NullValue.class, UseTypeInfo.class);
// 反序列化会创建新的对象,而由于 NullValue#equal 方法仅通过 == 判断是否相等,会导致 equal 结果为 false
// 这里新建一个 Deserializer 专门返回 NullValue.INSTANCE
builder.deserializers(NullValueDeserializer.INSTANCE);

// CacheMessage需要通过有参构造器构造
builder.mixIn(CacheMessage.class, CacheMessageMix.class);
}

public static class NullValueDeserializer extends StdDeserializer<NullValue> {

public static final NullValueDeserializer INSTANCE = new NullValueDeserializer();

protected NullValueDeserializer() {
super(NullValue.class);
}

@Override
public NullValue deserialize(JsonParser p, DeserializationContext ctx) {
return (NullValue) NullValue.INSTANCE;
}

}

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
public static class UseTypeInfo {

}

public static class CacheMessageMix {

@JsonCreator
public CacheMessageMix(@JsonProperty("cacheName") String cacheName, @JsonProperty("key") Object key) {
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pig4cloud.plugin.cache.support;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
Expand All @@ -11,21 +12,18 @@
* @version 1.0.0
*/
@Slf4j
@Getter
@RequiredArgsConstructor
public class CacheMessageListener implements MessageListener {

private RedisSerializer<Object> javaSerializer = RedisSerializer.java();
private final RedisSerializer<Object> redisSerializer;

private final RedisCaffeineCacheManager redisCaffeineCacheManager;

@Override
public void onMessage(Message message, byte[] pattern) {

/**
* 发送端固定了jdk序列户方式,接收端同样固定了jdk序列化方式进行反序列化。
*/
CacheMessage cacheMessage = (CacheMessage) javaSerializer.deserialize(message.getBody());
log.debug("recevice a redis topic message, clear local cache, the cacheName is {}, the key is {}",
CacheMessage cacheMessage = (CacheMessage) redisSerializer.deserialize(message.getBody());
log.debug("receive a redis topic message, clear local cache, the cacheName is {}, the key is {}",
cacheMessage.getCacheName(), cacheMessage.getKey());
redisCaffeineCacheManager.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.cache.support.NullValue;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

Expand All @@ -24,12 +22,11 @@
* @version 1.0.0
*/
@Slf4j
@Getter
public class RedisCaffeineCache extends AbstractValueAdaptingCache {

@Getter
private final String name;

@Getter
private final Cache<Object, Object> caffeineCache;

private final RedisTemplate<Object, Object> stringKeyRedisTemplate;
Expand All @@ -46,10 +43,6 @@ public class RedisCaffeineCache extends AbstractValueAdaptingCache {

private final Map<String, ReentrantLock> keyLockMap = new ConcurrentHashMap<>();

private RedisSerializer<String> stringSerializer = RedisSerializer.string();

private RedisSerializer<Object> javaSerializer = RedisSerializer.java();

public RedisCaffeineCache(String name, RedisTemplate<Object, Object> stringKeyRedisTemplate,
Cache<Object, Object> caffeineCache, CacheConfigProperties cacheConfigProperties) {
super(cacheConfigProperties.isCacheNullValues());
Expand Down Expand Up @@ -129,7 +122,7 @@ private void doPut(Object key, Object value) {

push(new CacheMessage(this.name, key));

caffeineCache.put(key, value);
setCaffeineValue(key, value);
}

@Override
Expand Down Expand Up @@ -159,7 +152,7 @@ public void clear() {
@Override
protected Object lookup(Object key) {
Object cacheKey = getKey(key);
Object value = caffeineCache.getIfPresent(key);
Object value = getCaffeineValue(key);
if (value != null) {
log.debug("get cache from caffeine, the key is : {}", cacheKey);
return value;
Expand All @@ -169,17 +162,17 @@ protected Object lookup(Object key) {

if (value != null) {
log.debug("get cache from redis and put in caffeine, the key is : {}", cacheKey);
caffeineCache.put(key, value);
setCaffeineValue(key, value);
}
return value;
}

private Object getKey(Object key) {
protected Object getKey(Object key) {
return this.name.concat(":").concat(
StringUtils.isEmpty(cachePrefix) ? key.toString() : cachePrefix.concat(":").concat(key.toString()));
StringUtils.hasLength(cachePrefix) ? cachePrefix.concat(":").concat(key.toString()) : key.toString());
}

private Duration getExpire(Object value) {
protected Duration getExpire(Object value) {
Duration cacheNameExpire = expires.get(this.name);
if (cacheNameExpire == null) {
cacheNameExpire = defaultExpiration;
Expand All @@ -197,20 +190,8 @@ private Duration getExpire(Object value) {
* @date 2018年1月31日 下午3:20:28
* @version 1.0.0
*/
private void push(CacheMessage message) {

/**
* 为了能自定义redisTemplate,发布订阅的序列化方式固定为jdk序列化方式。
*/
Assert.hasText(topic, "a non-empty channel is required");
byte[] rawChannel = stringSerializer.serialize(topic);
byte[] rawMessage = javaSerializer.serialize(message);
stringKeyRedisTemplate.execute((connection) -> {
connection.publish(rawChannel, rawMessage);
return null;
}, true);

// stringKeyRedisTemplate.convertAndSend(topic, message);
protected void push(CacheMessage message) {
stringKeyRedisTemplate.convertAndSend(topic, message);
}

/**
Expand All @@ -230,29 +211,25 @@ public void clearLocal(Object key) {
}
}

private void setRedisValue(Object key, Object value, Duration expire) {

Object convertValue = value;
if (value == null || value == NullValue.INSTANCE) {
convertValue = RedisNullValue.REDISNULLVALUE;
}

protected void setRedisValue(Object key, Object value, Duration expire) {
if (!expire.isNegative() && !expire.isZero()) {
stringKeyRedisTemplate.opsForValue().set(getKey(key), convertValue, expire);
stringKeyRedisTemplate.opsForValue().set(getKey(key), value, expire);
}
else {
stringKeyRedisTemplate.opsForValue().set(getKey(key), convertValue);
stringKeyRedisTemplate.opsForValue().set(getKey(key), value);
}
}

private Object getRedisValue(Object key) {
protected Object getRedisValue(Object key) {
return stringKeyRedisTemplate.opsForValue().get(getKey(key));
}

// NullValue在不同序列化方式中存在问题,因此自定义了RedisNullValue做个转化。
Object value = stringKeyRedisTemplate.opsForValue().get(getKey(key));
if (value != null && value instanceof RedisNullValue) {
value = NullValue.INSTANCE;
}
return value;
protected void setCaffeineValue(Object key, Object value) {
caffeineCache.put(key, value);
}

protected Object getCaffeineValue(Object key) {
return caffeineCache.getIfPresent(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.pig4cloud.plugin.cache.properties.CacheConfigProperties;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
Expand All @@ -19,9 +21,11 @@
* @version 1.0.0
*/
@Slf4j
@Getter
@Setter
public class RedisCaffeineCacheManager implements CacheManager {

private ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<String, Cache>();
private ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<>();

private CacheConfigProperties cacheConfigProperties;

Expand All @@ -47,16 +51,24 @@ public Cache getCache(String name) {
return cache;
}
if (!dynamic && !cacheNames.contains(name)) {
return cache;
return null;
}

cache = new RedisCaffeineCache(name, stringKeyRedisTemplate, caffeineCache(), cacheConfigProperties);
cache = createCache(name);
Cache oldCache = cacheMap.putIfAbsent(name, cache);
log.debug("create cache instance, the cache name is : {}", name);
return oldCache == null ? cache : oldCache;
}

public RedisCaffeineCache createCache(String name) {
return new RedisCaffeineCache(name, stringKeyRedisTemplate, caffeineCache(), cacheConfigProperties);
}

public com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache() {
return caffeineCacheBuilder().build();
}

public Caffeine<Object, Object> caffeineCacheBuilder() {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterAccess(), cacheBuilder::expireAfterAccess);
doIfPresent(cacheConfigProperties.getCaffeine().getExpireAfterWrite(), cacheBuilder::expireAfterWrite);
Expand Down Expand Up @@ -84,10 +96,11 @@ public com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache()
break;
case SOFT:
cacheBuilder.softValues();
break;
default:
}
}
return cacheBuilder.build();
return cacheBuilder;
}

protected static void doIfPresent(Duration duration, Consumer<Duration> consumer) {
Expand Down
Loading

0 comments on commit e155222

Please sign in to comment.