Skip to content

Commit

Permalink
JedisCluster, JedisPool, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Nov 3, 2021
1 parent 3a0f61a commit 2874412
Show file tree
Hide file tree
Showing 70 changed files with 7,250 additions and 6,151 deletions.
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -2859,4 +2859,9 @@ public Response<SearchResult> ftSearch(byte[] indexName, Query query) {
throw new UnsupportedOperationException("Not supported yet.");
//return appendCommand(provider.getNode(key), commandObjects.ftCreate(indexName, query));
}

@Override
public Response<Long> waitReplicas(int replicas, long timeout) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
19 changes: 16 additions & 3 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class Connection implements Closeable {
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;

Expand All @@ -40,7 +41,7 @@ public Connection(final String host, final int port) {
}

public Connection(final HostAndPort hostAndPort) {
this(hostAndPort, DefaultJedisClientConfig.builder().build());
this(new DefaultJedisSocketFactory(hostAndPort));
}

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
Expand All @@ -55,6 +56,7 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Pool<Connection> pool) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
this.memberOf = pool;
Expand All @@ -70,8 +72,13 @@ public String toString() {
return "Connection{" + socketFactory + "}";
}

public int getSoTimeout() {
return soTimeout;
}

public void setSoTimeout(int soTimeout) {
socketFactory.setSocketTimeout(soTimeout);
// socketFactory.setSocketTimeout(soTimeout);
this.soTimeout = soTimeout;
if (this.socket != null) {
try {
this.socket.setSoTimeout(soTimeout);
Expand All @@ -96,7 +103,8 @@ public void setTimeoutInfinite() {

public void rollbackTimeout() {
try {
socket.setSoTimeout(socketFactory.getSocketTimeout());
// socket.setSoTimeout(socketFactory.getSocketTimeout());
socket.setSoTimeout(this.soTimeout);
} catch (SocketException ex) {
broken = true;
throw new JedisConnectionException(ex);
Expand Down Expand Up @@ -202,6 +210,7 @@ public void connect() throws JedisConnectionException {
if (!isConnected()) {
try {
socket = socketFactory.createSocket();
soTimeout = socket.getSoTimeout(); //?

outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
Expand Down Expand Up @@ -317,6 +326,10 @@ public Object getOne() {
return readProtocolWithCheckingBroken();
}

public void setBroken() {
broken = true;
}

public boolean isBroken() {
return broken;
}
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ protected HostAndPort getSocketHostAndPort() {
}
return hap;
}

@Override
public void setSocketTimeout(int socketTimeout) {
this.socketTimeout = socketTimeout;
}

@Override
public int getSocketTimeout() {
return socketTimeout;
}
//
// @Override
// public void setSocketTimeout(int socketTimeout) {
// this.socketTimeout = socketTimeout;
// }
//
// @Override
// public int getSocketTimeout() {
// return socketTimeout;
// }

@Override
public String toString() {
Expand Down
131 changes: 84 additions & 47 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.io.Closeable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -40,6 +39,8 @@ public class Jedis implements ServerCommands, DatabaseCommands, JedisCommands, J
private final RedisCommandObjects commandObjects = new RedisCommandObjects();
private int db = 0;
private Transaction transaction = null;
private boolean isInMulti = false;
private boolean isInWatch = false;
private Pipeline pipeline = null;
protected static final byte[][] DUMMY_ARRAY = new byte[0][];

Expand Down Expand Up @@ -69,7 +70,7 @@ public Jedis(final String uriString) {
}

public Jedis(final HostAndPort hp) {
this(hp, DefaultJedisClientConfig.builder().build());
connection = new Connection(hp);
}

public Jedis(final String host, final int port) {
Expand Down Expand Up @@ -302,7 +303,26 @@ public Jedis(final Connection connection) {

@Override
public String toString() {
return "BinaryJedis{" + connection + '}';
return "Jedis{" + connection + '}';
}

// Legacy
public Connection getClient() {
return getConnection();
}

public Connection getConnection() {
return connection;
}

// Legacy
public void connect() {
connection.connect();
}

// Legacy
public void disconnect() {
connection.disconnect();
}

public boolean isConnected() {
Expand All @@ -324,6 +344,11 @@ public void resetState() {
}
//
// connection.resetState();
if (isInWatch) {
connection.sendCommand(UNWATCH);
connection.getStatusCodeReply();
isInWatch = false;
}
}

transaction = null;
Expand All @@ -349,12 +374,33 @@ public void close() {
}
}

public Connection getConnection() {
return connection;
// Legacy
public Transaction multi() {
// client.multi();
// client.getOne(); // expected OK
// transaction = new Transaction(client);
transaction = new Transaction(this);
return transaction;
}

public Connection getClient() {
return getConnection();
// Legacy
public Pipeline pipelined() {
// pipeline = new Pipeline();
// pipeline.setClient(connection);
pipeline = new Pipeline(this);
return pipeline;
}

// Legacy
protected void checkIsInMultiOrPipeline() {
// if (connection.isInMulti()) {
if (transaction != null) {
throw new IllegalStateException(
"Cannot use Jedis when in Multi. Please use Transaction or reset jedis state.");
} else if (pipeline != null && pipeline.hasPipelinedResponse()) {
throw new IllegalStateException(
"Cannot use Jedis when in Pipeline. Please use Pipeline or reset jedis state.");
}
}

public int getDB() {
Expand Down Expand Up @@ -483,6 +529,7 @@ public String quit() {
connection.sendCommand(QUIT);
String quitReturn = connection.getStatusCodeReply();
connection.disconnect();
connection.setBroken();
return quitReturn;
}

Expand Down Expand Up @@ -2189,35 +2236,13 @@ public Set<Tuple> zpopmin(final byte[] key, final int count) {
return connection.executeCommand(commandObjects.zpopmin(key, count));
}

public Pipeline pipelined() {
// pipeline = new Pipeline();
// pipeline.setClient(connection);
pipeline = new Pipeline(connection);
return pipeline;
}

public Transaction multi() {
connection.sendCommand(MULTI);
connection.getOne(); // expected OK
transaction = new Transaction(connection);
return transaction;
}

protected void checkIsInMultiOrPipeline() {
// if (connection.isInMulti()) {
if (transaction != null) {
throw new IllegalStateException(
"Cannot use Jedis when in Multi. Please use Transaction or reset jedis state.");
} else if (pipeline != null && pipeline.hasPipelinedResponse()) {
throw new IllegalStateException(
"Cannot use Jedis when in Pipeline. Please use Pipeline or reset jedis state.");
}
}

public String watch(final byte[]... keys) {
checkIsInMultiOrPipeline();
connection.sendCommand(WATCH, keys);
return connection.getStatusCodeReply();
// return connection.getStatusCodeReply();
String status = connection.getStatusCodeReply();
isInWatch = true;
return status;
}

public String unwatch() {
Expand Down Expand Up @@ -3260,6 +3285,7 @@ public String shutdown() throws JedisException {
} catch (JedisConnectionException jce) {
// expected
status = null;
connection.setBroken();
}
return status;
}
Expand All @@ -3271,6 +3297,7 @@ public void shutdown(final SaveMode saveMode) throws JedisException {
throw new JedisException(connection.getStatusCodeReply());
} catch (JedisConnectionException jce) {
// expected
connection.setBroken();
}
}

Expand Down Expand Up @@ -3751,19 +3778,19 @@ public List<Object> slowlogGetBinary(final long entries) {

@Override
public Long objectRefcount(final byte[] key) {
connection.sendCommand(OBJECT, REFCOUNT);
connection.sendCommand(OBJECT, REFCOUNT.getRaw(), key);
return connection.getIntegerReply();
}

@Override
public byte[] objectEncoding(final byte[] key) {
connection.sendCommand(OBJECT, ENCODING);
connection.sendCommand(OBJECT, ENCODING.getRaw(), key);
return connection.getBinaryBulkReply();
}

@Override
public Long objectIdletime(final byte[] key) {
connection.sendCommand(OBJECT, IDLETIME);
connection.sendCommand(OBJECT, IDLETIME.getRaw(), key);
return connection.getIntegerReply();
}

Expand All @@ -3775,7 +3802,7 @@ public List<byte[]> objectHelpBinary() {

@Override
public Long objectFreq(final byte[] key) {
connection.sendCommand(OBJECT, FREQ);
connection.sendCommand(OBJECT, FREQ.getRaw(), key);
return connection.getIntegerReply();
}

Expand Down Expand Up @@ -3886,13 +3913,20 @@ public Long memoryUsage(final byte[] key) {
@Override
public Long memoryUsage(final byte[] key, final int samples) {
checkIsInMultiOrPipeline();
connection.sendCommand(MEMORY, USAGE.getRaw(), key, toByteArray(samples));
connection.sendCommand(MEMORY, USAGE.getRaw(), key, SAMPLES.getRaw(), toByteArray(samples));
return connection.getIntegerReply();
}

@Override
public String failover() {
return failover(null);
checkIsInMultiOrPipeline();
connection.sendCommand(FAILOVER);
connection.setTimeoutInfinite();
try {
return connection.getStatusCodeReply();
} finally {
connection.rollbackTimeout();
}
}

@Override
Expand Down Expand Up @@ -4142,7 +4176,7 @@ public String migrate(final String host, final int port, final int destinationDB
final int timeout, final MigrateParams params, final byte[]... keys) {
checkIsInMultiOrPipeline();
CommandArguments args = new CommandArguments(MIGRATE).add(host).add(port).add(new byte[0]).add(destinationDB)
.add(timeout).addParams(params).keys((Object[]) keys);
.add(timeout).addParams(params).add(Keyword.KEYS).keys((Object[]) keys);
connection.sendCommand(args);
return connection.getStatusCodeReply();
}
Expand Down Expand Up @@ -6250,7 +6284,10 @@ public Set<Tuple> zpopmin(final String key, final int count) {
public String watch(final String... keys) {
checkIsInMultiOrPipeline();
connection.sendCommand(WATCH, keys);
return connection.getStatusCodeReply();
// return connection.getStatusCodeReply();
String status = connection.getStatusCodeReply();
isInWatch = true;
return status;
}

/**
Expand Down Expand Up @@ -7490,19 +7527,19 @@ public List<Slowlog> slowlogGet(final long entries) {

@Override
public Long objectRefcount(final String key) {
connection.sendCommand(OBJECT, REFCOUNT);
connection.sendCommand(OBJECT, REFCOUNT.name(), key);
return connection.getIntegerReply();
}

@Override
public String objectEncoding(final String key) {
connection.sendCommand(OBJECT, ENCODING);
connection.sendCommand(OBJECT, ENCODING.name(), key);
return connection.getBulkReply();
}

@Override
public Long objectIdletime(final String key) {
connection.sendCommand(OBJECT, IDLETIME);
connection.sendCommand(OBJECT, IDLETIME.name(), key);
return connection.getIntegerReply();
}

Expand All @@ -7514,7 +7551,7 @@ public List<String> objectHelp() {

@Override
public Long objectFreq(final String key) {
connection.sendCommand(OBJECT, FREQ);
connection.sendCommand(OBJECT, FREQ.name(), key);
return connection.getIntegerReply();
}

Expand Down Expand Up @@ -7900,7 +7937,7 @@ public String clientList() {
@Override
public String clientList(ClientType type) {
checkIsInMultiOrPipeline();
connection.sendCommand(CLIENT, LIST.getRaw(), type.getRaw());
connection.sendCommand(CLIENT, LIST.getRaw(), Keyword.TYPE.getRaw(), type.getRaw());
return connection.getBulkReply();
}

Expand Down Expand Up @@ -7939,7 +7976,7 @@ public String migrate(final String host, final int port, final int destinationDB
final int timeout, final MigrateParams params, final String... keys) {
checkIsInMultiOrPipeline();
CommandArguments args = new CommandArguments(MIGRATE).add(host).add(port).add(new byte[0]).add(destinationDB)
.add(timeout).addParams(params).keys((Object[]) keys);
.add(timeout).addParams(params).add(Keyword.KEYS).keys((Object[]) keys);
connection.sendCommand(args);
return connection.getStatusCodeReply();
}
Expand Down
Loading

0 comments on commit 2874412

Please sign in to comment.