Skip to content

Commit

Permalink
[WIP] RESP3 Support (#3293)
Browse files Browse the repository at this point in the history
Co-authored-by: Chayim <[email protected]>
  • Loading branch information
sazzad16 and chayim authored Mar 28, 2023
1 parent 4a8b9e7 commit 6e9177b
Show file tree
Hide file tree
Showing 31 changed files with 499 additions and 166 deletions.
135 changes: 102 additions & 33 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
import java.util.*;
import java.util.stream.Collectors;

import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.resps.StreamConsumerFullInfo;
import redis.clients.jedis.resps.StreamFullInfo;
import redis.clients.jedis.resps.StreamGroupFullInfo;
import redis.clients.jedis.resps.LCSMatchResult.MatchedPosition;
import redis.clients.jedis.resps.LCSMatchResult.Position;
import redis.clients.jedis.resps.*;
import redis.clients.jedis.search.aggr.AggregationResult;
import redis.clients.jedis.timeseries.TSKeyedElements;
import redis.clients.jedis.timeseries.TSElement;
import redis.clients.jedis.timeseries.TSKeyValue;
import redis.clients.jedis.util.DoublePrecision;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.KeyValue;
Expand Down Expand Up @@ -116,7 +111,9 @@ public String toString() {
public static final Builder<Double> DOUBLE = new Builder<Double>() {
@Override
public Double build(Object data) {
return DoublePrecision.parseFloatingPointNumber(STRING.build(data));
if (data == null) return null;
else if (data instanceof Double) return (Double) data;
else return DoublePrecision.parseFloatingPointNumber(STRING.build(data));
}

@Override
Expand All @@ -129,15 +126,8 @@ public String toString() {
@Override
@SuppressWarnings("unchecked")
public List<Double> build(Object data) {
if (null == data) {
return null;
}
List<byte[]> values = (List<byte[]>) data;
List<Double> doubles = new ArrayList<>(values.size());
for (byte[] value : values) {
doubles.add(DOUBLE.build(value));
}
return doubles;
if (null == data) return null;
return ((List<Object>) data).stream().map(DOUBLE::build).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -210,6 +200,7 @@ public String toString() {
}
};

// TODO: remove
public static final Builder<byte[]> BYTE_ARRAY = new Builder<byte[]>() {
@Override
public byte[] build(Object data) {
Expand All @@ -222,6 +213,7 @@ public String toString() {
}
};

// TODO: remove
public static final Builder<List<byte[]>> BYTE_ARRAY_LIST = new Builder<List<byte[]>>() {
@Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -300,6 +292,26 @@ public String toString() {
}
};

public static final Builder<Map<byte[], byte[]>> BINARY_MAP_FROM_PAIRS = new Builder<Map<byte[], byte[]>>() {
@Override
@SuppressWarnings("unchecked")
public Map<byte[], byte[]> build(Object data) {
final List<Object> list = (List<Object>) data;
final Map<byte[], byte[]> map = new JedisByteHashMap();
for (Object object : list) {
final List<byte[]> flat = (List<byte[]>) object;
map.put(flat.get(0), flat.get(1));
}

return map;
}

@Override
public String toString() {
return "Map<String, String>";
}
};

public static final Builder<String> STRING = new Builder<String>() {
@Override
public String build(Object data) {
Expand Down Expand Up @@ -445,7 +457,6 @@ public Tuple build(Object data) {
public String toString() {
return "Tuple";
}

};

public static final Builder<KeyedZSetElement> KEYED_ZSET_ELEMENT = new Builder<KeyedZSetElement>() {
Expand Down Expand Up @@ -487,6 +498,20 @@ public String toString() {
}
};

public static final Builder<List<Tuple>> TUPLE_LIST_RESP3 = new Builder<List<Tuple>>() {
@Override
@SuppressWarnings("unchecked")
public List<Tuple> build(Object data) {
if (null == data) return null;
return ((List<Object>) data).stream().map(TUPLE::build).collect(Collectors.toList());
}

@Override
public String toString() {
return "List<Tuple>";
}
};

public static final Builder<Set<Tuple>> TUPLE_ZSET = new Builder<Set<Tuple>>() {
@Override
@SuppressWarnings("unchecked")
Expand All @@ -509,14 +534,26 @@ public String toString() {
}
};

public static final Builder<Set<Tuple>> TUPLE_ZSET_RESP3 = new Builder<Set<Tuple>>() {
@Override
@SuppressWarnings("unchecked")
public Set<Tuple> build(Object data) {
if (null == data) return null;
return ((List<Object>) data).stream().map(TUPLE::build).collect(Collectors.toCollection(LinkedHashSet::new));
}

@Override
public String toString() {
return "ZSet<Tuple>";
}
};

private static final Builder<List<Tuple>> TUPLE_LIST_FROM_PAIRS = new Builder<List<Tuple>>() {
@Override
@SuppressWarnings("unchecked")
public List<Tuple> build(Object data) {
if (data == null) return null;
return ((List<Object>) data).stream()
.map(o -> (List<Object>) o).map(p -> TUPLE.build(p))
.collect(Collectors.toList());
return ((List<List<Object>>) data).stream().map(TUPLE::build).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -911,7 +948,8 @@ public String toString() {
/**
* Create an Access Control Log Entry Result of ACL LOG command
*/
public static final Builder<List<AccessControlLogEntry>> ACCESS_CONTROL_LOG_ENTRY_LIST = new Builder<List<AccessControlLogEntry>>() {
public static final Builder<List<AccessControlLogEntry>> ACCESS_CONTROL_LOG_ENTRY_LIST
= new Builder<List<AccessControlLogEntry>>() {

private final Map<String, Builder> mappingFunctions = createDecoderMap();

Expand All @@ -923,7 +961,7 @@ private Map<String, Builder> createDecoderMap() {
tempMappingFunctions.put(AccessControlLogEntry.CONTEXT, STRING);
tempMappingFunctions.put(AccessControlLogEntry.OBJECT, STRING);
tempMappingFunctions.put(AccessControlLogEntry.USERNAME, STRING);
tempMappingFunctions.put(AccessControlLogEntry.AGE_SECONDS, STRING);
// tempMappingFunctions.put(AccessControlLogEntry.AGE_SECONDS, STRING);
tempMappingFunctions.put(AccessControlLogEntry.CLIENT_INFO, STRING);

return tempMappingFunctions;
Expand All @@ -941,7 +979,8 @@ public List<AccessControlLogEntry> build(Object data) {
for (List<Object> logEntryData : logEntries) {
Iterator<Object> logEntryDataIterator = logEntryData.iterator();
AccessControlLogEntry accessControlLogEntry = new AccessControlLogEntry(
createMapFromDecodingFunctions(logEntryDataIterator, mappingFunctions));
createMapFromDecodingFunctions(logEntryDataIterator, mappingFunctions,
BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS));
list.add(accessControlLogEntry);
}
return list;
Expand Down Expand Up @@ -1108,17 +1147,39 @@ public String toString() {
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
@Override
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
if (data == null) {
return null;
}
List<Object> streams = (List<Object>) data;
if (data == null) return null;
List streamObjects = (List) data;

List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
for (Object streamObj : streams) {
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamObjects.size());
for (Object streamObj : streamObjects) {
List<Object> stream = (List<Object>) streamObj;
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
String streamKey = STRING.build(stream.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<>(streamKey, streamEntries));
}

return result;
}

@Override
public String toString() {
return "List<Entry<String, List<StreamEntry>>>";
}
};

public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE_RESP3
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
@Override
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
if (data == null) return null;
List streamObjects = (List) data;

List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streamObjects.size() / 2);
Iterator iter = streamObjects.iterator();
while (iter.hasNext()) {
String streamKey = STRING.build(iter.next());
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(iter.next());
result.add(new AbstractMap.SimpleEntry<>(streamKey, streamEntries));
}

return result;
Expand Down Expand Up @@ -1436,8 +1497,16 @@ public String toString() {
}
};

private static final List<Builder> BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS
= Arrays.asList(STRING, LONG, DOUBLE);

private static Map<String, Object> createMapFromDecodingFunctions(Iterator<Object> iterator,
Map<String, Builder> mappingFunctions) {
return createMapFromDecodingFunctions(iterator, mappingFunctions, null);
}

private static Map<String, Object> createMapFromDecodingFunctions(Iterator<Object> iterator,
Map<String, Builder> mappingFunctions, Collection<Builder> backupBuilders) {

Map<String, Object> resultMap = new HashMap<>();
while (iterator.hasNext()) {
Expand All @@ -1447,13 +1516,13 @@ private static Map<String, Object> createMapFromDecodingFunctions(Iterator<Objec
resultMap.put(mapKey, mappingFunctions.get(mapKey).build(iterator.next()));
} else { // For future - if we don't find an element in our builder map
Object unknownData = iterator.next();
for (Builder b : mappingFunctions.values()) {
Collection<Builder> builders = backupBuilders != null ? backupBuilders : mappingFunctions.values();
for (Builder b : builders) {
try {
resultMap.put(mapKey, b.build(unknownData));
break;
} catch (ClassCastException e) {
// We continue with next builder

}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;

// TODO: RESP3
public class ClusterPipeline extends MultiNodePipelineBase {

private final ClusterConnectionProvider provider;
Expand Down
Loading

0 comments on commit 6e9177b

Please sign in to comment.