Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix binary variants of XRANGE and XREAD commands #3571

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2402,24 +2402,24 @@ public final CommandObject<List<StreamEntry>> xrevrange(String key, String end,
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end).add(start).add(COUNT).add(count), BuilderFactory.STREAM_ENTRY_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<Long> xack(String key, String group, StreamEntryID... ids) {
Expand Down Expand Up @@ -2660,18 +2660,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer,
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
Expand All @@ -2682,7 +2682,7 @@ public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] con
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}
// Stream commands

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4646,13 +4646,13 @@ public long hstrlen(final byte[] key, final byte[] field) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer,
public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
Expand All @@ -4671,25 +4671,25 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start, count));
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2968,22 +2968,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3088,12 +3088,13 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3135,22 +3135,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3255,12 +3255,12 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3058,22 +3058,22 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
return executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
return executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
return executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return executeCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3178,12 +3178,12 @@ public List<Object> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}
// Stream commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ default byte[] xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams params) {

long xlen(byte[] key);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end);
List<Object> xrange(byte[] key, byte[] start, byte[] end);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count);
List<Object> xrange(byte[] key, byte[] start, byte[] end, int count);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count);

long xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -74,9 +74,9 @@ List<Object> xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName,

List<Object> xinfoConsumers(byte[] key, byte[] group);

List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);

}
20 changes: 10 additions & 10 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,6 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xgroupDelConsumer(String key, String groupName, String consumerName);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* XDEL key ID [ID ...]
*/
Expand All @@ -148,6 +138,16 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xtrim(String key, XTrimParams params);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* {@code XCLAIM key group consumer min-idle-time <ID-1> ... <ID-N>
* [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ default Response<byte[]> xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams p

Response<Long> xlen(byte[] key);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count);

Response<Long> xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -75,9 +75,9 @@ Response<List<Object>> xautoclaimJustId(byte[] key, byte[] groupName, byte[] con

Response<List<Object>> xinfoConsumers(byte[] key, byte[] group);

Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams);

}