Skip to content

Commit

Permalink
Protocol v2 support (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
quinchs authored Nov 1, 2023
1 parent dd661c6 commit 8db19fc
Show file tree
Hide file tree
Showing 138 changed files with 3,314 additions and 1,450 deletions.
16 changes: 16 additions & 0 deletions examples/java-examples/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- Log level: OFF, ERROR, WARN, INFO, DEBUG, TRACE, ALL -->

<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<root level="ALL">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
56 changes: 35 additions & 21 deletions src/driver/src/main/java/com/edgedb/driver/binary/PacketReader.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package com.edgedb.driver.binary;

import com.edgedb.driver.binary.codecs.Codec;
import com.edgedb.driver.binary.codecs.CodecContext;
import com.edgedb.driver.binary.packets.shared.Annotation;
import com.edgedb.driver.binary.packets.shared.KeyValue;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.binary.protocol.common.Annotation;
import com.edgedb.driver.binary.protocol.common.KeyValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.joou.UByte;
import org.joou.UInteger;
import org.joou.ULong;
import org.joou.UShort;

import javax.naming.OperationNotSupportedException;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
Expand All @@ -25,7 +22,21 @@
import static org.joou.Unsigned.*;

public class PacketReader {
private final @NotNull ByteBuf buffer;
public static final class ScopedReader extends PacketReader implements AutoCloseable {
public final boolean isNoData;

public ScopedReader(@Nullable ByteBuf buffer) {
super(buffer == null ? Unpooled.EMPTY_BUFFER : buffer);
isNoData = buffer == null;
}

@Override
public void close() {
buffer.release();
}
}

protected final @NotNull ByteBuf buffer;
private static final @NotNull Map<Class<?>, Function<PacketReader, ? extends Number>> numberReaderMap;

private final int initPos;
Expand Down Expand Up @@ -80,20 +91,6 @@ public boolean isEmpty() {
return this.buffer.readableBytes() == 0;
}

public <T> @Nullable T deserializeByteArray(@NotNull Codec<T> codec, CodecContext context) throws EdgeDBException, OperationNotSupportedException {
var buff = readByteArray();

if(buff == null) {
return null;
}

try {
return codec.deserialize(new PacketReader(buff), context);
} finally {
buff.release();
}
}

public byte[] consumeByteArray() {
var arr = new byte[this.buffer.readableBytes()];
this.buffer.readBytes(arr);
Expand Down Expand Up @@ -171,6 +168,23 @@ public short readInt16() {
return arr;
}

/**
* Reads the {@code length} number of bytes and creates a new {@linkplain ScopedReader} wrapping the bytes.
* @param length The number of bytes to read.
* @return A scoped reader whose close method releases the read bytes.
*/
public ScopedReader scopedSlice(int length) {
return new ScopedReader(readBytes(length));
}

/**
* Calls {@code readByteArray()} and creates a new {@linkplain ScopedReader} wrapping the bytes.
* @return A scoped reader whose close method releases the read bytes.
*/
public ScopedReader scopedSlice() {
return new ScopedReader(readByteArray());
}

public @Nullable ByteBuf readByteArray() {
var len = readInt32();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.edgedb.driver.binary;

import com.edgedb.driver.binary.packets.ServerMessageType;
import com.edgedb.driver.binary.packets.receivable.*;
import com.edgedb.driver.binary.packets.sendables.Sendable;
import com.edgedb.driver.binary.protocol.ServerMessageType;
import com.edgedb.driver.binary.protocol.Receivable;
import com.edgedb.driver.binary.protocol.Sendable;
import com.edgedb.driver.clients.EdgeDBBinaryClient;
import com.edgedb.driver.exceptions.ConnectionFailedException;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.util.HexUtils;
Expand All @@ -24,33 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PacketSerializer {
private static final Logger logger = LoggerFactory.getLogger(PacketSerializer.class);
private static final @NotNull Map<ServerMessageType, Function<PacketReader, Receivable>> deserializerMap;
private static final Map<Class<?>, Map<Number, Enum<?>>> binaryEnumMap = new HashMap<>();

static {
deserializerMap = new HashMap<>();

deserializerMap.put(ServerMessageType.AUTHENTICATION, AuthenticationStatus::new);
deserializerMap.put(ServerMessageType.COMMAND_COMPLETE, CommandComplete::new);
deserializerMap.put(ServerMessageType.COMMAND_DATA_DESCRIPTION, CommandDataDescription::new);
deserializerMap.put(ServerMessageType.DATA, Data::new);
deserializerMap.put(ServerMessageType.DUMP_BLOCK, DumpBlock::new);
deserializerMap.put(ServerMessageType.DUMP_HEADER, DumpHeader::new);
deserializerMap.put(ServerMessageType.ERROR_RESPONSE, ErrorResponse::new);
deserializerMap.put(ServerMessageType.LOG_MESSAGE, LogMessage::new);
deserializerMap.put(ServerMessageType.PARAMETER_STATUS, ParameterStatus::new);
deserializerMap.put(ServerMessageType.READY_FOR_COMMAND, ReadyForCommand::new);
deserializerMap.put(ServerMessageType.RESTORE_READY, RestoreReady::new);
deserializerMap.put(ServerMessageType.SERVER_HANDSHAKE, ServerHandshake::new);
deserializerMap.put(ServerMessageType.SERVER_KEY_DATA, ServerKeyData::new);
deserializerMap.put(ServerMessageType.STATE_DATA_DESCRIPTION, StateDataDescription::new);
}

public static <T extends Enum<?> & BinaryEnum<U>, U extends Number> void registerBinaryEnum(Class<T> cls, T @NotNull [] values) {
binaryEnumMap.put(cls, Arrays.stream(values).collect(Collectors.toMap(BinaryEnum::getValue, v -> v)));
}
Expand All @@ -64,7 +44,7 @@ public static <T extends Enum<T> & BinaryEnum<U>, U extends Number> T getEnumVal
return (T)binaryEnumMap.get(enumCls).get(raw);
}

public static @NotNull MessageToMessageDecoder<ByteBuf> createDecoder() {
public static @NotNull MessageToMessageDecoder<ByteBuf> createDecoder(EdgeDBBinaryClient client) {
return new MessageToMessageDecoder<>() {
private final Map<Channel, PacketContract> contracts = new HashMap<>();

Expand Down Expand Up @@ -96,7 +76,7 @@ protected void decode(@NotNull ChannelHandlerContext ctx, @NotNull ByteBuf msg,

// can we read this packet?
if (msg.readableBytes() >= length) {
var packet = PacketSerializer.deserialize(type, length, msg.readSlice((int) length));
var packet = PacketSerializer.deserialize(client, type, length, msg.readSlice((int) length));

if(packet == null) {
logger.error("Got null result for packet type {}", type);
Expand Down Expand Up @@ -175,7 +155,7 @@ public boolean tryComplete(@NotNull ByteBuf other) {

if (data.readableBytes() >= length) {
// read
packet = PacketSerializer.deserialize(messageType, length, data, false);
packet = PacketSerializer.deserialize(client, messageType, length, data, false);

return true;
}
Expand Down Expand Up @@ -218,42 +198,36 @@ protected void encode(@NotNull ChannelHandlerContext ctx, @NotNull Sendable msg,
}

public static @Nullable Receivable deserialize(
ServerMessageType messageType, long length, @NotNull ByteBuf buffer
EdgeDBBinaryClient client, ServerMessageType messageType, long length, @NotNull ByteBuf buffer
) {
var reader = new PacketReader(buffer);
return deserializeSingle(messageType, length, reader, true);
return deserializeSingle(client, messageType, length, reader, true);
}

public static @Nullable Receivable deserialize(
ServerMessageType messageType, long length, @NotNull ByteBuf buffer, boolean verifyEmpty
EdgeDBBinaryClient client, ServerMessageType messageType, long length, @NotNull ByteBuf buffer, boolean verifyEmpty
) {
var reader = new PacketReader(buffer);
return deserializeSingle(messageType, length, reader, verifyEmpty);
return deserializeSingle(client, messageType, length, reader, verifyEmpty);
}

public static @Nullable Receivable deserializeSingle(PacketReader reader) {
public static @Nullable Receivable deserializeSingle(EdgeDBBinaryClient client, PacketReader reader) {
var messageType = reader.readEnum(ServerMessageType.class, Byte.TYPE);
var length = reader.readUInt32().longValue();

return deserializeSingle(messageType, length, reader, false);
return deserializeSingle(client, messageType, length, reader, false);
}

public static @Nullable Receivable deserializeSingle(
ServerMessageType type, long length, @NotNull PacketReader reader,
EdgeDBBinaryClient client, ServerMessageType type, long length, @NotNull PacketReader reader,
boolean verifyEmpty
) {
if(!deserializerMap.containsKey(type)) {
logger.error("Unknown packet type {}", type);
reader.skip(length);
return null;
}

try {
return deserializerMap.get(type).apply(reader);
return client.getProtocolProvider().readPacket(type, (int)length, reader);
}
catch (Exception x) {
logger.error("Failed to deserialize packet", x);
throw x;
return null;
}
finally {
// ensure we read the entire packet
Expand All @@ -263,9 +237,15 @@ protected void encode(@NotNull ChannelHandlerContext ctx, @NotNull Sendable msg,
}
}

public static HttpResponse.BodyHandler<List<Receivable>> PACKET_BODY_HANDLER = new PacketBodyHandler();

public static HttpResponse.BodyHandler<List<Receivable>> createHandler(EdgeDBBinaryClient client) {
return new PacketBodyHandler(client);
}
private static class PacketBodyHandler implements HttpResponse.BodyHandler<List<Receivable>> {
private final EdgeDBBinaryClient client;
public PacketBodyHandler(EdgeDBBinaryClient client) {
this.client = client;
}

@Override
public HttpResponse.BodySubscriber<List<Receivable>> apply(HttpResponse.ResponseInfo responseInfo) {
// ensure success
Expand All @@ -276,7 +256,7 @@ public HttpResponse.BodySubscriber<List<Receivable>> apply(HttpResponse.Response
: new PacketBodySubscriber(responseInfo.statusCode());
}

private static class PacketBodySubscriber implements HttpResponse.BodySubscriber<List<Receivable>> {
private class PacketBodySubscriber implements HttpResponse.BodySubscriber<List<Receivable>> {
private final @Nullable List<@NotNull ByteBuf> buffers;
private final CompletableFuture<List<Receivable>> promise;

Expand Down Expand Up @@ -334,7 +314,7 @@ public void onComplete() {
var data = new ArrayList<Receivable>();

while(completeBuffer.readableBytes() > 0) {
var packet = deserializeSingle(reader);
var packet = deserializeSingle(client, reader);

if(packet == null && completeBuffer.readableBytes() > 0) {
promise.completeExceptionally(
Expand Down
Loading

0 comments on commit 8db19fc

Please sign in to comment.