Skip to content

Commit

Permalink
Introduce JedisBroadcast to broadcast commands (#3194)
Browse files Browse the repository at this point in the history
* Introduce JedisBroadcast to broadcast commands

* edit

* Use Supplier in broadcast method signatures

instead of BroadcastResponse

* Address review

* more commands

* broadcasted and ping

* edit

* Apply code review suggestions

* flushdb
  • Loading branch information
sazzad16 authored Dec 7, 2022
1 parent 0560ada commit 6092720
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 0 deletions.
28 changes: 28 additions & 0 deletions src/main/java/redis/clients/jedis/BroadcastResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package redis.clients.jedis;

import java.util.function.Supplier;

/**
* Represents the response from a single node in broadcast mode.
*/
public class BroadcastResponse<T> implements Supplier<T> {

private T response = null;
private RuntimeException exception = null;

public BroadcastResponse(T response) {
this.response = response;
}

public BroadcastResponse(RuntimeException exception) {
this.exception = exception;
}

@Override
public T get() {
if (exception != null) {
throw exception;
}
return response;
}
}
180 changes: 180 additions & 0 deletions src/main/java/redis/clients/jedis/JedisBroadcast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package redis.clients.jedis;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.search.FTCreateParams;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Schema;
import redis.clients.jedis.search.SearchProtocol.SearchCommand;
import redis.clients.jedis.search.SearchProtocol.SearchKeyword;
import redis.clients.jedis.search.schemafields.SchemaField;
import redis.clients.jedis.util.Pool;

public class JedisBroadcast {

private final ConnectionProvider provider;

public JedisBroadcast(UnifiedJedis jedis) {
this(jedis.provider);
}

public JedisBroadcast(ConnectionProvider provider) {
if (provider == null) {
throw new NullPointerException("ConnectionProvider cannot be null.");
}
this.provider = provider;
}

public final <T> Map<?, Supplier<T>> broadcastCommand(CommandObject<T> commandObject) {
Map<?, ?> connectionMap = provider.getConnectionMap();
Map<Object, Supplier<T>> responseMap = new HashMap<>(connectionMap.size(), 1f);
for (Map.Entry<? extends Object, ? extends Object> entry : connectionMap.entrySet()) {
Object key = entry.getKey();
Object connection = entry.getValue();
try {
responseMap.put(key, new BroadcastResponse<>(executeCommand(connection, commandObject)));
} catch (RuntimeException re) {
responseMap.put(key, new BroadcastResponse<>(re));
}
}
return responseMap;
}

private <T> T executeCommand(Object connection, CommandObject<T> commandObject) {
if (connection instanceof Connection) {
return ((Connection) connection).executeCommand(commandObject);
} else if (connection instanceof Pool) {
try (Connection _conn = ((Pool<Connection>) connection).getResource()) {
return _conn.executeCommand(commandObject);
}
}
throw new IllegalStateException(connection.getClass() + "is not supported.");
}

public Map<?, Supplier<String>> ping() {
return broadcastCommand(new CommandObject<>(new CommandArguments(Command.PING),
BuilderFactory.STRING));
}

public Map<?, Supplier<String>> configSet(final String... parameterValues) {
if (parameterValues.length > 0 && parameterValues.length % 2 != 0) {
throw new IllegalArgumentException("It requires 'pair's of config parameter-values.");
}
CommandArguments args = new CommandArguments(Command.CONFIG).add(Keyword.SET)
.addObjects((Object[]) parameterValues);
return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING));
}

public Map<?, Supplier<String>> flushDB() {
return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHDB),
BuilderFactory.STRING));
}

public Map<?, Supplier<String>> flushDB(FlushMode flushMode) {
return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHDB)
.add(flushMode), BuilderFactory.STRING));
}

public Map<?, Supplier<String>> flushAll() {
return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL),
BuilderFactory.STRING));
}

public Map<?, Supplier<String>> flushAll(FlushMode flushMode) {
return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL)
.add(flushMode), BuilderFactory.STRING));
}

public Map<?, Supplier<Long>> waitReplicas(final int replicas, final long timeout) {
CommandArguments args = new CommandArguments(Command.WAIT)
.add(Protocol.toByteArray(replicas)).add(Protocol.toByteArray(timeout));
return broadcastCommand(new CommandObject<>(args, BuilderFactory.LONG));
}

public Map<?, Supplier<List<Boolean>>> scriptExists(String... sha1) {
CommandObject<List<Boolean>> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST);
return broadcastCommand(command);
}

public Map<?, Supplier<List<Boolean>>> scriptExists(byte[]... sha1) {
CommandObject<List<Boolean>> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST);
return broadcastCommand(command);
}

public Map<?, Supplier<String>> scriptLoad(String script) {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.LOAD).add(script), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, Supplier<byte[]>> scriptLoad(byte[] script) {
CommandObject<byte[]> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.LOAD).add(script), BuilderFactory.BINARY);
return broadcastCommand(command);
}

public Map<?, Supplier<String>> scriptFlush() {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.FLUSH), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, Supplier<String>> scriptFlush(FlushMode flushMode) {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.FLUSH).add(flushMode), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, Supplier<String>> scriptKill() {
CommandObject<String> command = new CommandObject<>(new CommandArguments(Command.SCRIPT)
.add(Keyword.KILL), BuilderFactory.STRING);
return broadcastCommand(command);
}

public Map<?, Supplier<String>> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) {
CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName)
.addParams(indexOptions).add(SearchKeyword.SCHEMA);
schema.fields.forEach(field -> args.addParams(field));
return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING));
}

public Map<?, Supplier<String>> ftCreate(String indexName, SchemaField... schemaFields) {
return ftCreate(indexName, Arrays.asList(schemaFields));
}

public Map<?, Supplier<String>> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) {
return ftCreate(indexName, createParams, Arrays.asList(schemaFields));
}

public Map<?, Supplier<String>> ftCreate(String indexName, Iterable<SchemaField> schemaFields) {
return ftCreate(indexName, FTCreateParams.createParams(), schemaFields);
}

public Map<?, Supplier<String>> ftCreate(String indexName, FTCreateParams createParams,
Iterable<SchemaField> schemaFields) {
CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName)
.addParams(createParams).add(SearchKeyword.SCHEMA);
schemaFields.forEach(field -> args.addParams(field));
return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING));
}

public Map<?, Supplier<String>> ftDropIndex(String indexName) {
return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX)
.add(indexName), BuilderFactory.STRING));
}

public Map<?, Supplier<String>> ftDropIndexDD(String indexName) {
return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX)
.add(indexName).add(SearchKeyword.DD), BuilderFactory.STRING));
}
}
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4534,6 +4534,10 @@ public Map<String, Object> graphConfigGet(String configName) {
}
// RedisGraph commands

public JedisBroadcast broadcasted() {
return new JedisBroadcast(this);
}

public Object pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public Connection getConnectionFromSlot(int slot) {
}
}
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis.clients.jedis.providers;

import java.util.Collections;
import java.util.Map;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;

Expand All @@ -8,4 +10,9 @@ public interface ConnectionProvider extends AutoCloseable {
Connection getConnection();

Connection getConnection(CommandArguments args);

default Map<?, ?> getConnectionMap() {
final Connection c = getConnection();
return Collections.singletonMap(c.toString(), c);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis.clients.jedis.providers;

import java.util.Collections;
import java.util.Map;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand All @@ -14,27 +16,33 @@
public class PooledConnectionProvider implements ConnectionProvider {

private final Pool<Connection> pool;
private Object connectionMapKey = "";

public PooledConnectionProvider(HostAndPort hostAndPort) {
this(new ConnectionFactory(hostAndPort));
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionPool(hostAndPort, clientConfig));
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
this.connectionMapKey = hostAndPort;
}

public PooledConnectionProvider(PooledObjectFactory<Connection> factory) {
this(new ConnectionPool(factory));
this.connectionMapKey = factory;
}

public PooledConnectionProvider(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionPool(factory, poolConfig));
this.connectionMapKey = factory;
}

private PooledConnectionProvider(Pool<Connection> pool) {
Expand All @@ -59,4 +67,9 @@ public Connection getConnection() {
public Connection getConnection(CommandArguments args) {
return pool.getResource();
}

@Override
public Map<?, Pool<Connection>> getConnectionMap() {
return Collections.singletonMap(connectionMapKey, pool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,9 @@ private HostAndPort getNodeFromHash(Long hash) {
}
return tail.get(tail.firstKey());
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(resources);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.junit.Test;

import redis.clients.jedis.JedisBroadcast;
import redis.clients.jedis.args.FlushMode;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisDataException;
Expand Down Expand Up @@ -91,4 +95,32 @@ public void testBinaryScriptExists() {
byte[][] arraySha1 = { sha1 };
assertEquals(Collections.singletonList(Boolean.TRUE), cluster.scriptExists(byteKey, arraySha1));
}

@Test
public void broadcast() {
Map<?, Supplier<String>> stringReplies;
String script_1 = "return 'jedis'", script_2 = "return 79", sha1_1, sha1_2;
JedisBroadcast broadcast = new JedisBroadcast(cluster);

stringReplies = broadcast.scriptLoad(script_1);
assertEquals(3, stringReplies.size());
sha1_1 = stringReplies.values().stream().findAny().get().get();
stringReplies.values().forEach(reply -> assertEquals(sha1_1, reply.get()));

stringReplies = broadcast.scriptLoad(script_2);
assertEquals(3, stringReplies.size());
sha1_2 = stringReplies.values().stream().findAny().get().get();
stringReplies.values().forEach(reply -> assertEquals(sha1_2, reply.get()));

Map<?, Supplier<List<Boolean>>> booleanListReplies;
booleanListReplies = broadcast.scriptExists(sha1_1, sha1_2);
assertEquals(3, booleanListReplies.size());
booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(true, true), reply.get()));

broadcast.scriptFlush();

booleanListReplies = broadcast.scriptExists(sha1_1, sha1_2);
assertEquals(3, booleanListReplies.size());
booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(false, false), reply.get()));
}
}
Loading

0 comments on commit 6092720

Please sign in to comment.