From 2e87a2ccd1820b3bf14dc82ebaaf2a40a04824cb Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 30 Oct 2023 17:13:26 +0600 Subject: [PATCH] Transaction with multi cluster failover connection --- .../redis/clients/jedis/TransactionBase.java | 6 + .../redis/clients/jedis/UnifiedJedis.java | 4 +- ...ipeline.java => MultiClusterPipeline.java} | 14 +- .../activeactive/MultiClusterTransaction.java | 238 ++++++++++++++++++ 4 files changed, 252 insertions(+), 10 deletions(-) rename src/main/java/redis/clients/jedis/activeactive/{MultiClusterFailoverPipeline.java => MultiClusterPipeline.java} (89%) create mode 100644 src/main/java/redis/clients/jedis/activeactive/MultiClusterTransaction.java diff --git a/src/main/java/redis/clients/jedis/TransactionBase.java b/src/main/java/redis/clients/jedis/TransactionBase.java index 66e828b193..483d82f485 100644 --- a/src/main/java/redis/clients/jedis/TransactionBase.java +++ b/src/main/java/redis/clients/jedis/TransactionBase.java @@ -11,8 +11,14 @@ protected TransactionBase() { public abstract void multi(); + /** + * Must be called before {@link TransactionBase#multi() MULTI}. + */ public abstract String watch(final String... keys); + /** + * Must be called before {@link TransactionBase#multi() MULTI}. + */ public abstract String watch(final byte[]... keys); public abstract String unwatch(); diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 0266d49572..70a0651003 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -1,6 +1,6 @@ package redis.clients.jedis; -import redis.clients.jedis.activeactive.MultiClusterFailoverPipeline; +import redis.clients.jedis.activeactive.MultiClusterPipeline; import redis.clients.jedis.activeactive.CircuitBreakerCommandExecutor; import java.net.URI; import java.time.Duration; @@ -4825,7 +4825,7 @@ public PipelineBase pipelined() { if (provider == null) { throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass()); } else if (provider instanceof MultiClusterPooledConnectionProvider) { - return new MultiClusterFailoverPipeline((MultiClusterPooledConnectionProvider) provider); + return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider); } else { return new Pipeline(provider.getConnection(), true); } diff --git a/src/main/java/redis/clients/jedis/activeactive/MultiClusterFailoverPipeline.java b/src/main/java/redis/clients/jedis/activeactive/MultiClusterPipeline.java similarity index 89% rename from src/main/java/redis/clients/jedis/activeactive/MultiClusterFailoverPipeline.java rename to src/main/java/redis/clients/jedis/activeactive/MultiClusterPipeline.java index 6bd3e3670c..8d1dc627c2 100644 --- a/src/main/java/redis/clients/jedis/activeactive/MultiClusterFailoverPipeline.java +++ b/src/main/java/redis/clients/jedis/activeactive/MultiClusterPipeline.java @@ -21,15 +21,14 @@ /** * This is high memory dependent solution as all the appending commands will be hold in memory until - * {@link MultiClusterFailoverPipeline#sync() SYNC} - * (or {@link MultiClusterFailoverPipeline#close() CLOSE}) gets called. + * {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called. */ -public class MultiClusterFailoverPipeline extends PipelineBase implements Closeable { +public class MultiClusterPipeline extends PipelineBase implements Closeable { private final CircuitBreakerFailoverConnectionProvider provider; private final Queue>> commands = new LinkedList<>(); - public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provider) { + public MultiClusterPipeline(MultiClusterPooledConnectionProvider provider) { super(new CommandObjects()); try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now RedisProtocol proto = connection.getRedisProtocol(); @@ -40,7 +39,7 @@ public MultiClusterFailoverPipeline(MultiClusterPooledConnectionProvider provide } @Override - public final Response appendCommand(CommandObject commandObject) { + protected final Response appendCommand(CommandObject commandObject) { CommandArguments args = commandObject.getArguments(); Response response = new Response<>(commandObject.getBuilder()); commands.add(KeyValue.of(args, response)); @@ -54,9 +53,8 @@ public void close() { } /** - * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to - * get return values from pipelined commands, capture the different Response<?> of the - * commands you execute. + * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values + * from pipelined commands, capture the different Response<?> of the commands you execute. */ @Override public void sync() { diff --git a/src/main/java/redis/clients/jedis/activeactive/MultiClusterTransaction.java b/src/main/java/redis/clients/jedis/activeactive/MultiClusterTransaction.java new file mode 100644 index 0000000000..08f60cfdad --- /dev/null +++ b/src/main/java/redis/clients/jedis/activeactive/MultiClusterTransaction.java @@ -0,0 +1,238 @@ +package redis.clients.jedis.activeactive; + +import static redis.clients.jedis.Protocol.Command.DISCARD; +import static redis.clients.jedis.Protocol.Command.EXEC; +import static redis.clients.jedis.Protocol.Command.MULTI; +import static redis.clients.jedis.Protocol.Command.UNWATCH; +import static redis.clients.jedis.Protocol.Command.WATCH; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; + +import redis.clients.jedis.*; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.graph.ResultSet; +import redis.clients.jedis.util.KeyValue; + +/** + * This is high memory dependent solution as all the appending commands will be hold in memory. + */ +public class MultiClusterTransaction extends TransactionBase { + + private static final Builder CHEAPEST_BUILDER = BuilderFactory.RAW_OBJECT; + + private final CircuitBreakerFailoverConnectionProvider provider; + private final AtomicInteger extraCommandCount = new AtomicInteger(); + private final Queue>> commands = new LinkedList<>(); + + private boolean inWatch = false; + private boolean inMulti = false; + + /** + * A MULTI command will be added to be sent to server. WATCH/UNWATCH/MULTI commands must not be + * called with this object. + * @param provider + */ + public MultiClusterTransaction(CircuitBreakerFailoverConnectionProvider provider) { + this(provider, true); + } + + /** + * A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should + * be {@code doMulti=false}. + * + * @param provider + * @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI + */ + public MultiClusterTransaction(CircuitBreakerFailoverConnectionProvider provider, boolean doMulti) { + try (Connection connection = provider.getConnection()) { // we don't need a healthy connection now + RedisProtocol proto = connection.getRedisProtocol(); + if (proto != null) this.commandObjects.setProtocol(proto); + } + + this.provider = provider; + if (doMulti) multi(); + } + + @Override + public final void multi() { + appendCommand(new CommandObject<>(new CommandArguments(MULTI), CHEAPEST_BUILDER)); + extraCommandCount.incrementAndGet(); + inMulti = true; + } + + /** + * @param keys + * @return {@code null} + */ + @Override + public final String watch(String... keys) { + appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), CHEAPEST_BUILDER)); + extraCommandCount.incrementAndGet(); + inWatch = true; + return null; + } + + /** + * @param keys + * @return {@code null} + */ + @Override + public final String watch(byte[]... keys) { + appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), CHEAPEST_BUILDER)); + extraCommandCount.incrementAndGet(); + inWatch = true; + return null; + } + + /** + * @return {@code null} + */ + @Override + public final String unwatch() { + appendCommand(new CommandObject<>(new CommandArguments(UNWATCH), CHEAPEST_BUILDER)); + extraCommandCount.incrementAndGet(); + inWatch = false; + return null; + } + + @Override + protected final Response appendCommand(CommandObject commandObject) { + CommandArguments args = commandObject.getArguments(); + Response response = new Response<>(commandObject.getBuilder()); + commands.add(KeyValue.of(args, response)); + return response; + } + + @Override + public void close() { + clear(); + } + + private void clear() { + if (inMulti) { + discard(); + } else if (inWatch) { + unwatch(); + } + } + + @Override + public final List exec() { + if (!inMulti) { + throw new IllegalStateException("EXEC without MULTI"); + } + + try (Connection connection = provider.getConnection()) { + + commands.forEach((command) -> connection.sendCommand(command.getKey())); + // following connection.getMany(int) flushes anyway, so no flush here. + + // ignore QUEUED (or ERROR) + connection.getMany(commands.size()); + + connection.sendCommand(EXEC); + + List unformatted = connection.getObjectMultiBulkReply(); + if (unformatted == null) { + commands.clear(); + return null; + } + + List formatted = new ArrayList<>(unformatted.size() - extraCommandCount.get()); + for (int idx = extraCommandCount.get(); idx < unformatted.size(); ++idx) { + try { + Response response = commands.poll().getValue(); + response.set(unformatted.get(idx)); + formatted.add(response.get()); + } catch (JedisDataException e) { + formatted.add(e); + } + } + return formatted; + + } finally { + inMulti = false; + inWatch = false; + } + } + + @Override + public final String discard() { + if (!inMulti) { + throw new IllegalStateException("DISCARD without MULTI"); + } + + try (Connection connection = provider.getConnection()) { + + commands.forEach((command) -> connection.sendCommand(command.getKey())); + // following connection.getMany(int) flushes anyway, so no flush here. + + // ignore QUEUED (or ERROR) + connection.getMany(commands.size()); + + connection.sendCommand(DISCARD); + + return connection.getStatusCodeReply(); + } finally { + inMulti = false; + inWatch = false; + } + } + + // RedisGraph commands + @Override + public Response graphQuery(String name, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, Map params) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, Map params) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphQuery(String name, String query, Map params, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphReadonlyQuery(String name, String query, Map params, long timeout) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response graphDelete(String name) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + + @Override + public Response> graphProfile(String graphName, String query) { + throw new UnsupportedOperationException("Graph commands are not supported."); + } + // RedisGraph commands +}