Skip to content

Commit

Permalink
Fix binary variants of XRANGE and XREAD commands
Browse files Browse the repository at this point in the history
where each individual element in the list is actually a complex object rather than a simple byte array.
  • Loading branch information
sazzad16 committed Oct 9, 2023
1 parent d9b8750 commit d384d61
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 59 deletions.
24 changes: 12 additions & 12 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2402,24 +2402,24 @@ public final CommandObject<List<StreamEntry>> xrevrange(String key, String end,
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end).add(start).add(COUNT).add(count), BuilderFactory.STREAM_ENTRY_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<Long> xack(String key, String group, StreamEntryID... ids) {
Expand Down Expand Up @@ -2660,18 +2660,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer,
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
Expand All @@ -2682,7 +2682,7 @@ public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] con
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}
// Stream commands

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4646,13 +4646,13 @@ public long hstrlen(final byte[] key, final byte[] field) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer,
public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
Expand All @@ -4671,25 +4671,25 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start, count));
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2968,22 +2968,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3088,12 +3088,13 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3135,22 +3135,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3255,12 +3255,12 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3058,22 +3058,22 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
return executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
return executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
return executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return executeCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3178,12 +3178,12 @@ public List<Object> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}
// Stream commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ default byte[] xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams params) {

long xlen(byte[] key);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end);
List<Object> xrange(byte[] key, byte[] start, byte[] end);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count);
List<Object> xrange(byte[] key, byte[] start, byte[] end, int count);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count);

long xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -74,9 +74,9 @@ List<Object> xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName,

List<Object> xinfoConsumers(byte[] key, byte[] group);

List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);

}
20 changes: 10 additions & 10 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,6 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xgroupDelConsumer(String key, String groupName, String consumerName);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* XDEL key ID [ID ...]
*/
Expand All @@ -148,6 +138,16 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xtrim(String key, XTrimParams params);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* {@code XCLAIM key group consumer min-idle-time <ID-1> ... <ID-N>
* [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ default Response<byte[]> xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams p

Response<Long> xlen(byte[] key);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count);

Response<Long> xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -75,9 +75,9 @@ Response<List<Object>> xautoclaimJustId(byte[] key, byte[] groupName, byte[] con

Response<List<Object>> xinfoConsumers(byte[] key, byte[] group);

Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams);

}

0 comments on commit d384d61

Please sign in to comment.