Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-side caching by hashing command arguments #3700

Merged
merged 16 commits into from
Feb 15, 2024
Merged
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<optional>true</optional>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
108 changes: 68 additions & 40 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
@@ -1,71 +1,99 @@
package redis.clients.jedis;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import redis.clients.jedis.exceptions.JedisException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import redis.clients.jedis.util.SafeEncoder;

public class ClientSideCache {
public abstract class ClientSideCache {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

private final Map<ByteBuffer, Object> cache;
private final Map<ByteBuffer, Set<Long>> keyHashes;
private final ReentrantLock writeLock = new ReentrantLock();

public ClientSideCache() {
this.cache = new HashMap<>();
protected ClientSideCache() {
this.keyHashes = new ConcurrentHashMap<>();
}

/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
}
public abstract void invalidateAll();

public final void clear() {
cache.clear();
}
protected abstract void invalidateAll(Iterable<Long> hashes);
chayim marked this conversation as resolved.
Show resolved Hide resolved

public final void invalidateKeys(List list) {
final void invalidate(List list) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
if (list == null) {
clear();
invalidateAll();
return;
}

list.forEach(this::invalidateKey);
list.forEach(this::invalidate0);
}

private void invalidateKey(Object key) {
if (key instanceof byte[]) {
cache.remove(convertKey((byte[]) key));
} else {
throw new JedisException("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
private void invalidate0(Object key) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
if (!(key instanceof byte[])) {
throw new AssertionError("" + key.getClass().getSimpleName() + " is not supported. Value: " + String.valueOf(key));
}
}

protected void setKey(Object key, Object value) {
cache.put(getMapKey(key), value);
}
final ByteBuffer mapKey = makeKey((byte[]) key);

protected <T> T getValue(Object key) {
return (T) getMapValue(key);
Set<Long> hashes = keyHashes.get(mapKey);
if (hashes != null) {
writeLock.lock();
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
try {
invalidateAll(hashes);
keyHashes.remove(mapKey);
} finally {
writeLock.unlock();
}
}
}

private Object getMapValue(Object key) {
return cache.get(getMapKey(key));
}
protected abstract void put(long hash, Object value);

protected abstract Object get(long hash);

private ByteBuffer getMapKey(Object key) {
if (key instanceof byte[]) {
return convertKey((byte[]) key);
} else {
return convertKey(SafeEncoder.encode(String.valueOf(key)));
final <T> T getValue(Function<CommandObject<T>, T> loader, CommandObject<T> command, String... keys) {

final long hash = getHash(command);

T value = (T) get(hash);
if (value != null) {
return value;
}

value = loader.apply(command);
if (value != null) {
writeLock.lock();
try {
put(hash, value);
for (String key : keys) {
ByteBuffer mapKey = makeKey(key);
if (keyHashes.containsKey(mapKey)) {
keyHashes.get(mapKey).add(hash);
} else {
Set<Long> set = new HashSet<>();
set.add(hash);
keyHashes.put(mapKey, set);
}
}
} finally {
writeLock.unlock();
}
}

return value;
}

protected abstract long getHash(CommandObject command);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have different ways of hashing a command? Also, why bind the hashing algorithm to the cache library that we use? Why not allow using Caffeine as a cache store with Guava's hashing routine? That would mean another interface for hash computation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not allow using Caffeine as a cache store with Guava's hashing routine?

@gerzse This is exactly what I'm trying to achieve.

Guava has a nice hash solution where we can do newHasher().put(a).put(b).put(c).hash().
But most other library only allows function.hash(a). So we have to do function.hash(array[function.hash(a), function.hash(b), function.hash(c)]).
We can add an interface for later approach. But it makes the append style approach of Guava unusable (hashing is still possible in second way).
On the other hand, if we create an interface to support the first approach, it makes most hashing libraries unusable.
Now, what is best approach we can do?


private ByteBuffer makeKey(String key) {
return makeKey(SafeEncoder.encode(key));
}

private static ByteBuffer convertKey(byte[] b) {
private static ByteBuffer makeKey(byte[] b) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
return ByteBuffer.wrap(b);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private static void processPush(final RedisInputStream is, ClientSideCache cache
//System.out.println("PUSH: " + SafeEncoder.encodeObject(list));
if (list.size() == 2 && list.get(0) instanceof byte[]
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
cache.invalidateKeys((List) list.get(1));
cache.invalidate((List) list.get(1));
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

private <T> T executeClientSideCacheCommand(CommandObject<T> command, String... keys) {
if (clientSideCache == null) {
return executeCommand(command);
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down Expand Up @@ -749,15 +757,7 @@ public String set(String key, String value, SetParams params) {

@Override
public String get(String key) {
if (clientSideCache != null) {
String cachedValue = clientSideCache.getValue(key);
if (cachedValue != null) return cachedValue;

String value = executeCommand(commandObjects.get(key));
if (value != null) clientSideCache.setKey(key, value);
return value;
}
return executeCommand(commandObjects.get(key));
return executeClientSideCacheCommand(commandObjects.get(key), key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void initMaster(HostAndPort master) {
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.invalidateAll();
}

if (existingPool != null) {
Expand Down
95 changes: 95 additions & 0 deletions src/main/java/redis/clients/jedis/util/CaffeineCSC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package redis.clients.jedis.util;
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import net.openhft.hashing.LongHashFunction;
import redis.clients.jedis.ClientSideCache;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.args.Rawable;

public class CaffeineCSC extends ClientSideCache {

private static final int DEFAULT_MAXIMUM_SIZE = 10_000;
private static final int DEFAULT_EXPIRE_SECONDS = 100;
private static final LongHashFunction DEFAULT_HASH_FUNCTION = LongHashFunction.xx3();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO let's share with users a comment on why this function. Be kind to our future selves and community.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure... we'll also need why Caffeine, why Guava, why not only one, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%


private final Cache<Long, Object> cache;
private final LongHashFunction function;

public CaffeineCSC(Cache<Long, Object> caffeineCache, LongHashFunction hashFunction) {
this.cache = caffeineCache;
this.function = hashFunction;
}

@Override
public final void invalidateAll() {
cache.invalidateAll();
}

@Override
protected void invalidateAll(Iterable<Long> hashes) {
cache.invalidateAll(hashes);
}

@Override
protected void put(long hash, Object value) {
cache.put(hash, value);
}

@Override
protected Object get(long hash) {
return cache.getIfPresent(hash);
}

@Override
protected final long getHash(CommandObject command) {
long[] nums = new long[command.getArguments().size() + 1];
int idx = 0;
for (Rawable raw : command.getArguments()) {
nums[idx++] = function.hashBytes(raw.getRaw());
}
nums[idx] = function.hashInt(command.getBuilder().hashCode());
return function.hashLongs(nums);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {

private long maximumSize = DEFAULT_MAXIMUM_SIZE;
private long expireTime = DEFAULT_EXPIRE_SECONDS;
private final TimeUnit expireTimeUnit = TimeUnit.SECONDS;

private LongHashFunction hashFunction = DEFAULT_HASH_FUNCTION;

private Builder() { }

public Builder maximumSize(int size) {
this.maximumSize = size;
return this;
}

public Builder ttl(int seconds) {
this.expireTime = seconds;
return this;
}

public Builder hashFunction(LongHashFunction function) {
this.hashFunction = function;
return this;
}

public CaffeineCSC build() {
Caffeine cb = Caffeine.newBuilder();

cb.maximumSize(maximumSize);

cb.expireAfterWrite(expireTime, expireTimeUnit);

return new CaffeineCSC(cb.build(), hashFunction);
}
}
}
Loading
Loading