Skip to content

Commit

Permalink
Transaction with multi cluster failover connection
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Oct 30, 2023
1 parent 2f95ce8 commit 2e87a2c
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 10 deletions.
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ protected TransactionBase() {

public abstract void multi();

/**
* Must be called before {@link TransactionBase#multi() MULTI}.
*/
public abstract String watch(final String... keys);

/**
* Must be called before {@link TransactionBase#multi() MULTI}.
*/
public abstract String watch(final byte[]... keys);

public abstract String unwatch();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package redis.clients.jedis;

import redis.clients.jedis.activeactive.MultiClusterFailoverPipeline;
import redis.clients.jedis.activeactive.MultiClusterPipeline;
import redis.clients.jedis.activeactive.CircuitBreakerCommandExecutor;
import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -4825,7 +4825,7 @@ public PipelineBase pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterFailoverPipeline((MultiClusterPooledConnectionProvider) provider);
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider);
} else {
return new Pipeline(provider.getConnection(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@

/**
* This is high memory dependent solution as all the appending commands will be hold in memory until
* {@link MultiClusterFailoverPipeline#sync() SYNC}
* (or {@link MultiClusterFailoverPipeline#close() CLOSE}) gets called.
* {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called.
*/
public class MultiClusterFailoverPipeline extends PipelineBase implements Closeable {
public class MultiClusterPipeline extends PipelineBase implements Closeable {

private final CircuitBreakerFailoverConnectionProvider provider;
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();

public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provider) {
public MultiClusterPipeline(MultiClusterPooledConnectionProvider provider) {
super(new CommandObjects());
try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now
RedisProtocol proto = connection.getRedisProtocol();
Expand All @@ -40,7 +39,7 @@ public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provide
}

@Override
public final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Response<T> response = new Response<>(commandObject.getBuilder());
commands.add(KeyValue.of(args, response));
Expand All @@ -54,9 +53,8 @@ public void close() {
}

/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
* get return values from pipelined commands, capture the different Response&lt;?&gt; of the
* commands you execute.
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values
* from pipelined commands, capture the different Response&lt;?&gt; of the commands you execute.
*/
@Override
public void sync() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package redis.clients.jedis.activeactive;

import static redis.clients.jedis.Protocol.Command.DISCARD;
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.graph.ResultSet;
import redis.clients.jedis.util.KeyValue;

/**
* This is high memory dependent solution as all the appending commands will be hold in memory.
*/
public class MultiClusterTransaction extends TransactionBase {

private static final Builder<?> CHEAPEST_BUILDER = BuilderFactory.RAW_OBJECT;

private final CircuitBreakerFailoverConnectionProvider provider;
private final AtomicInteger extraCommandCount = new AtomicInteger();
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();

private boolean inWatch = false;
private boolean inMulti = false;

/**
* A MULTI command will be added to be sent to server. WATCH/UNWATCH/MULTI commands must not be
* called with this object.
* @param provider
*/
public MultiClusterTransaction(CircuitBreakerFailoverConnectionProvider provider) {
this(provider, true);
}

/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public MultiClusterTransaction(CircuitBreakerFailoverConnectionProvider provider, boolean doMulti) {
try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
}

this.provider = provider;
if (doMulti) multi();
}

@Override
public final void multi() {
appendCommand(new CommandObject<>(new CommandArguments(MULTI), CHEAPEST_BUILDER));
extraCommandCount.incrementAndGet();
inMulti = true;
}

/**
* @param keys
* @return {@code null}
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), CHEAPEST_BUILDER));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
}

/**
* @param keys
* @return {@code null}
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), CHEAPEST_BUILDER));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
}

/**
* @return {@code null}
*/
@Override
public final String unwatch() {
appendCommand(new CommandObject<>(new CommandArguments(UNWATCH), CHEAPEST_BUILDER));
extraCommandCount.incrementAndGet();
inWatch = false;
return null;
}

@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Response<T> response = new Response<>(commandObject.getBuilder());
commands.add(KeyValue.of(args, response));
return response;
}

@Override
public void close() {
clear();
}

private void clear() {
if (inMulti) {
discard();
} else if (inWatch) {
unwatch();
}
}

@Override
public final List<Object> exec() {
if (!inMulti) {
throw new IllegalStateException("EXEC without MULTI");
}

try (Connection connection = provider.getConnection()) {

commands.forEach((command) -> connection.sendCommand(command.getKey()));
// following connection.getMany(int) flushes anyway, so no flush here.

// ignore QUEUED (or ERROR)
connection.getMany(commands.size());

connection.sendCommand(EXEC);

List<Object> unformatted = connection.getObjectMultiBulkReply();
if (unformatted == null) {
commands.clear();
return null;
}

List<Object> formatted = new ArrayList<>(unformatted.size() - extraCommandCount.get());
for (int idx = extraCommandCount.get(); idx < unformatted.size(); ++idx) {
try {
Response<?> response = commands.poll().getValue();
response.set(unformatted.get(idx));
formatted.add(response.get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;

} finally {
inMulti = false;
inWatch = false;
}
}

@Override
public final String discard() {
if (!inMulti) {
throw new IllegalStateException("DISCARD without MULTI");
}

try (Connection connection = provider.getConnection()) {

commands.forEach((command) -> connection.sendCommand(command.getKey()));
// following connection.getMany(int) flushes anyway, so no flush here.

// ignore QUEUED (or ERROR)
connection.getMany(commands.size());

connection.sendCommand(DISCARD);

return connection.getStatusCodeReply();
} finally {
inMulti = false;
inWatch = false;
}
}

// RedisGraph commands
@Override
public Response<ResultSet> graphQuery(String name, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<String> graphDelete(String name) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}

@Override
public Response<List<String>> graphProfile(String graphName, String query) {
throw new UnsupportedOperationException("Graph commands are not supported.");
}
// RedisGraph commands
}

0 comments on commit 2e87a2c

Please sign in to comment.