diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e05fbf..46e1d9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,6 @@ - `Connection.info` (through `ConnectionInfo` class) exposes read-only connection-level information, e.g. acessing access server-provided parameter status values. - Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU). -- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages). - **Allowing custom type codecs**: - `Codec` interface is used for encoding/decoding value by type OIDs or Dart values. - `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides @@ -14,6 +13,8 @@ (for values where type is not specified). - `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching). - **Behaviour / soft-breaking changes**: + - Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages). + - Deprecated some logical replication message parsing method. - Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`. - `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`). - The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat diff --git a/lib/messages.dart b/lib/messages.dart index 4920d80..a1db8de 100644 --- a/lib/messages.dart +++ b/lib/messages.dart @@ -2,6 +2,7 @@ library messages; export 'src/buffer.dart' show PgByteDataWriter; export 'src/messages/client_messages.dart'; -export 'src/messages/logical_replication_messages.dart'; -export 'src/messages/server_messages.dart'; +export 'src/messages/logical_replication_messages.dart' + hide tryAsyncParseLogicalReplicationMessage; +export 'src/messages/server_messages.dart' hide parseXLogDataMessage; export 'src/messages/shared_messages.dart'; diff --git a/lib/src/message_window.dart b/lib/src/message_window.dart index b7bda95..6b2436b 100644 --- a/lib/src/message_window.dart +++ b/lib/src/message_window.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:collection'; import 'dart:typed_data'; @@ -11,7 +12,7 @@ import 'messages/shared_messages.dart'; const int _headerByteSize = 5; -typedef _ServerMessageFn = ServerMessage Function( +typedef _ServerMessageFn = FutureOr Function( PgByteDataReader reader, int length); Map _messageTypeMap = { @@ -50,7 +51,7 @@ class MessageFramer { bool get _isComplete => _expectedLength == 0 || _expectedLength <= _reader.remainingLength; - void addBytes(Uint8List bytes) { + Future addBytes(Uint8List bytes) async { _reader.add(bytes); while (true) { @@ -76,7 +77,7 @@ class MessageFramer { } final targetRemainingLength = _reader.remainingLength - _expectedLength; - final msg = msgMaker(_reader, _expectedLength); + final msg = await msgMaker(_reader, _expectedLength); if (_reader.remainingLength > targetRemainingLength) { throw StateError( 'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength'); @@ -111,7 +112,8 @@ class MessageFramer { /// such as replication messages. /// Returns a [ReplicationMessage] if the message contains such message. /// Otherwise, it'll just return the provided bytes as [CopyDataMessage]. -ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) { +Future _parseCopyDataMessage( + PgByteDataReader reader, int length) async { final code = reader.readUint8(); if (code == ReplicationMessageId.primaryKeepAlive) { return PrimaryKeepAliveMessage.parse(reader); diff --git a/lib/src/messages/logical_replication_messages.dart b/lib/src/messages/logical_replication_messages.dart index cd1f3a4..f220314 100644 --- a/lib/src/messages/logical_replication_messages.dart +++ b/lib/src/messages/logical_replication_messages.dart @@ -1,6 +1,7 @@ import 'dart:typed_data'; import 'package:buffer/buffer.dart'; +import 'package:meta/meta.dart'; import 'package:postgres/src/types/codec.dart'; import '../buffer.dart'; @@ -47,6 +48,8 @@ class XLogDataLogicalMessage implements XLogDataMessage { /// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so, /// [LogicalReplicationMessage] is returned, otherwise `null` is returned. +@Deprecated('This method will be removed from public API. ' + 'Please file a new issue on GitHub if you are using it.') LogicalReplicationMessage? tryParseLogicalReplicationMessage( PgByteDataReader reader, int length) { // the first byte is the msg type @@ -69,13 +72,66 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage( return TypeMessage._parse(reader); case LogicalReplicationMessageTypes.insert: - return InsertMessage._parse(reader); + return InsertMessage._syncParse(reader); case LogicalReplicationMessageTypes.update: - return UpdateMessage._parse(reader); + return UpdateMessage._syncParse(reader); case LogicalReplicationMessageTypes.delete: - return DeleteMessage._parse(reader); + return DeleteMessage._syncParse(reader); + + case LogicalReplicationMessageTypes.truncate: + return TruncateMessage._parse(reader); + + case LogicalReplicationMessageTypes.unsupported: + // wal2json messages starts with `{` as the first byte + if (firstByte == '{'.codeUnits.single) { + // note this needs the full set of bytes unlike other cases + final bb = BytesBuffer(); + bb.addByte(firstByte); + bb.add(reader.read(length - 1)); + try { + return JsonMessage(reader.encoding.decode(bb.toBytes())); + } catch (_) { + // ignore + } + } + return null; + } +} + +/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so, +/// [LogicalReplicationMessage] is returned, otherwise `null` is returned. +@internal +Future tryAsyncParseLogicalReplicationMessage( + PgByteDataReader reader, int length) async { + // the first byte is the msg type + final firstByte = reader.readUint8(); + final msgType = LogicalReplicationMessageTypes.fromByte(firstByte); + switch (msgType) { + case LogicalReplicationMessageTypes.begin: + return BeginMessage._parse(reader); + + case LogicalReplicationMessageTypes.commit: + return CommitMessage._parse(reader); + + case LogicalReplicationMessageTypes.origin: + return OriginMessage._parse(reader); + + case LogicalReplicationMessageTypes.relation: + return RelationMessage._parse(reader); + + case LogicalReplicationMessageTypes.type: + return TypeMessage._parse(reader); + + case LogicalReplicationMessageTypes.insert: + return await InsertMessage._parse(reader); + + case LogicalReplicationMessageTypes.update: + return await UpdateMessage._parse(reader); + + case LogicalReplicationMessageTypes.delete: + return await DeleteMessage._parse(reader); case LogicalReplicationMessageTypes.truncate: return TruncateMessage._parse(reader); @@ -381,7 +437,49 @@ class TupleData { /// TupleData does not consume the entire bytes /// /// It'll read until the types are generated. - factory TupleData._parse(PgByteDataReader reader, int relationId) { + /// + /// NOTE: do not use, will be removed. + factory TupleData._syncParse(PgByteDataReader reader) { + final columnCount = reader.readUint16(); + final columns = []; + for (var i = 0; i < columnCount; i++) { + // reading order matters + final typeId = reader.readUint8(); + final tupleDataType = TupleDataType.fromByte(typeId); + late final int length; + late final String data; + Object? value; + switch (tupleDataType) { + case TupleDataType.text: + case TupleDataType.binary: + length = reader.readUint32(); + data = reader.encoding.decode(reader.read(length)); + value = data; + break; + case TupleDataType.null_: + case TupleDataType.toast: + length = 0; + data = ''; + break; + } + columns.add( + TupleDataColumn( + typeId: typeId, + length: length, + typeOid: null, + data: data, + value: value, + ), + ); + } + return TupleData(columns: columns); + } + + /// TupleData does not consume the entire bytes + /// + /// It'll read until the types are generated. + static Future _parse( + PgByteDataReader reader, int relationId) async { final columnCount = reader.readUint16(); final columns = []; for (var i = 0; i < columnCount; i++) { @@ -390,8 +488,11 @@ class TupleData { final tupleDataType = TupleDataType.fromByte(typeId); late final int length; late final String data; - final typeOid = reader.codecContext.databaseInfo - .getCachedTypeOidForRelationColumn(relationId, i); + final typeOid = await reader.codecContext.databaseInfo + .getColumnTypeOidByRelationIdAndColumnIndex( + relationId: relationId, + columnIndex: i, + ); Object? value; switch (tupleDataType) { case TupleDataType.text: @@ -409,7 +510,7 @@ class TupleData { bytes: bytes, encoding: reader.codecContext.encoding, ) - : reader.codecContext.typeRegistry.decode( + : await reader.codecContext.typeRegistry.decode( EncodedValue.binary( bytes, typeOid: typeOid, @@ -451,13 +552,26 @@ class InsertMessage implements LogicalReplicationMessage { late final int relationId; late final TupleData tuple; - InsertMessage._parse(PgByteDataReader reader) { + InsertMessage._(this.relationId, this.tuple); + + /// NOTE: do not use, will be removed. + InsertMessage._syncParse(PgByteDataReader reader) { relationId = reader.readUint32(); final tupleType = reader.readUint8(); if (tupleType != 'N'.codeUnitAt(0)) { throw Exception("InsertMessage must have 'N' tuple type"); } - tuple = TupleData._parse(reader, relationId); + tuple = TupleData._syncParse(reader); + } + + static Future _parse(PgByteDataReader reader) async { + final relationId = reader.readUint32(); + final tupleType = reader.readUint8(); + if (tupleType != 'N'.codeUnitAt(0)) { + throw Exception("InsertMessage must have 'N' tuple type"); + } + final tuple = await TupleData._parse(reader, relationId); + return InsertMessage._(relationId, tuple); } @override @@ -511,7 +625,11 @@ class UpdateMessage implements LogicalReplicationMessage { /// Byte1('N'): Identifies the following TupleData message as a new tuple. late final TupleData? newTuple; - UpdateMessage._parse(PgByteDataReader reader) { + UpdateMessage._( + this.relationId, this.oldTupleType, this.oldTuple, this.newTuple); + + /// NOTE: do not use, will be removed. + UpdateMessage._syncParse(PgByteDataReader reader) { // reading order matters relationId = reader.readUint32(); var tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); @@ -519,7 +637,32 @@ class UpdateMessage implements LogicalReplicationMessage { if (tupleType == UpdateMessageTuple.oldType || tupleType == UpdateMessageTuple.keyType) { oldTupleType = tupleType; - oldTuple = TupleData._parse(reader, relationId); + oldTuple = TupleData._syncParse(reader); + tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); + } else { + oldTupleType = null; + oldTuple = null; + } + + if (tupleType == UpdateMessageTuple.newType) { + newTuple = TupleData._syncParse(reader); + } else { + throw Exception('Invalid Tuple Type for UpdateMessage'); + } + } + + static Future _parse(PgByteDataReader reader) async { + // reading order matters + final relationId = reader.readUint32(); + UpdateMessageTuple? oldTupleType; + TupleData? oldTuple; + TupleData? newTuple; + var tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); + + if (tupleType == UpdateMessageTuple.oldType || + tupleType == UpdateMessageTuple.keyType) { + oldTupleType = tupleType; + oldTuple = await TupleData._parse(reader, relationId); tupleType = UpdateMessageTuple.fromByte(reader.readUint8()); } else { oldTupleType = null; @@ -527,10 +670,11 @@ class UpdateMessage implements LogicalReplicationMessage { } if (tupleType == UpdateMessageTuple.newType) { - newTuple = TupleData._parse(reader, relationId); + newTuple = await TupleData._parse(reader, relationId); } else { throw Exception('Invalid Tuple Type for UpdateMessage'); } + return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple); } @override @@ -583,18 +727,36 @@ class DeleteMessage implements LogicalReplicationMessage { /// Byte1('N'): Identifies the following TupleData message as a new tuple. late final TupleData oldTuple; - DeleteMessage._parse(PgByteDataReader reader) { + DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple); + + /// NOTE: do not use, will be removed. + DeleteMessage._syncParse(PgByteDataReader reader) { relationId = reader.readUint32(); oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8()); switch (oldTupleType) { case DeleteMessageTuple.keyType: case DeleteMessageTuple.oldType: - oldTuple = TupleData._parse(reader, relationId); + oldTuple = TupleData._syncParse(reader); + break; + case DeleteMessageTuple.unknown: + throw Exception('Unknown tuple type for DeleteMessage'); + } + } + + static Future _parse(PgByteDataReader reader) async { + final relationId = reader.readUint32(); + final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8()); + TupleData? oldTuple; + switch (oldTupleType) { + case DeleteMessageTuple.keyType: + case DeleteMessageTuple.oldType: + oldTuple = await TupleData._parse(reader, relationId); break; case DeleteMessageTuple.unknown: throw Exception('Unknown tuple type for DeleteMessage'); } + return DeleteMessage._(relationId, oldTupleType, oldTuple); } @override diff --git a/lib/src/messages/server_messages.dart b/lib/src/messages/server_messages.dart index 2f43cf2..07b9029 100644 --- a/lib/src/messages/server_messages.dart +++ b/lib/src/messages/server_messages.dart @@ -368,9 +368,8 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { /// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll /// return [XLogDataMessage] with raw data. /// - @Deprecated( - 'It is likely that this method signature will change or will be removed in ' - 'an upcoming release. Please file a new issue on GitHub if you are using it.') + @Deprecated('This method will be removed from public API. ' + 'Please file a new issue on GitHub if you are using it.') static XLogDataMessage parse( Uint8List bytes, Encoding encoding, { @@ -409,6 +408,45 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { 'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)'; } +/// Parses the XLogDataMessage +/// +/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method +/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll +/// return [XLogDataMessage] with raw data. +@internal +Future parseXLogDataMessage( + Uint8List bytes, + Encoding encoding, { + CodecContext? codecContext, +}) async { + final reader = PgByteDataReader( + codecContext: + codecContext ?? CodecContext.withDefaults(encoding: encoding)) + ..add(bytes); + final walStart = LSN(reader.readUint64()); + final walEnd = LSN(reader.readUint64()); + final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); + + final message = await tryAsyncParseLogicalReplicationMessage( + reader, reader.remainingLength); + if (message != null) { + return XLogDataLogicalMessage( + message: message, + bytes: bytes, + time: time, + walEnd: walEnd, + walStart: walStart, + ); + } else { + return XLogDataMessage( + bytes: bytes, + time: time, + walEnd: walEnd, + walStart: walStart, + ); + } +} + class UnknownMessage extends ServerMessage { final int code; final Uint8List bytes; diff --git a/lib/src/types/codec.dart b/lib/src/types/codec.dart index c9a99c1..8591c5d 100644 --- a/lib/src/types/codec.dart +++ b/lib/src/types/codec.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; @@ -60,7 +61,7 @@ enum EncodingFormat { /// Encodes the [input] value and returns an [EncodedValue] object. /// /// May return `null` if the encoder is not able to convert the [input] value. -typedef EncoderFn = EncodedValue? Function( +typedef EncoderFn = FutureOr Function( TypedValue input, CodecContext context); /// Encoder and decoder for a value stored in Postgresql. @@ -68,13 +69,13 @@ abstract class Codec { /// Encodes the [input] value and returns an [EncodedValue] object. /// /// May return `null` if the codec is not able to encode the [input]. - EncodedValue? encode(TypedValue input, CodecContext context); + FutureOr encode(TypedValue input, CodecContext context); /// Decodes the [input] value and returns a Dart value object. /// /// May return [UndecodedBytes] or the same [input] instance if the codec /// is not able to decode the [input]. - Object? decode(EncodedValue input, CodecContext context); + FutureOr decode(EncodedValue input, CodecContext context); } /// Provides access to connection and database information, and also to additional codecs. diff --git a/lib/src/types/type_registry.dart b/lib/src/types/type_registry.dart index 055d550..f10fc77 100644 --- a/lib/src/types/type_registry.dart +++ b/lib/src/types/type_registry.dart @@ -267,12 +267,12 @@ class TypeRegistry { ]); } - EncodedValue? encode(TypedValue input, CodecContext context) { + Future encode(TypedValue input, CodecContext context) async { // check for codec final typeOid = input.type.oid; final codec = typeOid == null ? null : _codecs[typeOid]; if (codec != null) { - final r = codec.encode(input, context); + final r = await codec.encode(input, context); if (r != null) { return r; } @@ -280,7 +280,7 @@ class TypeRegistry { // fallback encoders for (final encoder in _encoders) { - final encoded = encoder(input, context); + final encoded = await encoder(input, context); if (encoded != null) { return encoded; } @@ -288,7 +288,7 @@ class TypeRegistry { throw PgException("Could not infer type of value '${input.value}'."); } - Object? decode(EncodedValue value, CodecContext context) { + Future decode(EncodedValue value, CodecContext context) async { final typeOid = value.typeOid; if (typeOid == null) { throw ArgumentError('`EncodedValue.typeOid` was not provided.'); @@ -297,7 +297,7 @@ class TypeRegistry { // check for codec final codec = _codecs[typeOid]; if (codec != null) { - final r = codec.decode(value, context); + final r = await codec.decode(value, context); if (r != value && r is! UndecodedBytes) { return r; } diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 8f547b4..9c62151 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -694,16 +694,17 @@ class _PgResultStreamSubscription _scheduleStatement(() async { connection._pending = this; - final encodedValues = []; + final encodedFutures = >[]; final context = connection.codecContext; for (final e in statement.parameters) { if (e.isSqlNull) { - encodedValues.add(null); + encodedFutures.add(Future.value(null)); continue; } final f = context.typeRegistry.encode(e, context); - encodedValues.add(f); + encodedFutures.add(f); } + final encodedValues = await Future.wait(encodedFutures); connection._channel.sink.add(AggregatedClientMessage([ BindMessage( @@ -820,7 +821,7 @@ class _PgResultStreamSubscription sqlNulls ??= List.filled(columnCount, false); sqlNulls[i] = true; } - final futureOr = context.typeRegistry.decode( + final futureValue = context.typeRegistry.decode( EncodedValue( input, format: EncodingFormat.fromBinaryFlag(field.isBinaryEncoding), @@ -828,7 +829,7 @@ class _PgResultStreamSubscription ), context, ); - futures.add(futureOr is Future ? futureOr : Future.value(futureOr)); + futures.add(futureValue); } final values = await Future.wait(futures); diff --git a/lib/src/v3/database_info.dart b/lib/src/v3/database_info.dart index ae24a4e..4af90fa 100644 --- a/lib/src/v3/database_info.dart +++ b/lib/src/v3/database_info.dart @@ -17,14 +17,17 @@ class DatabaseInfo { /// /// Returns `null` if the [relationId] is unknown or the [columnIndex] /// is out of bounds. - int? getCachedTypeOidForRelationColumn(int relationId, int columnIndex) { + Future getColumnTypeOidByRelationIdAndColumnIndex({ + required int relationId, + required int columnIndex, + }) async { + if (columnIndex < 0) { + throw ArgumentError('columnIndex must not be negative'); + } final m = _relationMessages[relationId]; if (m == null) { return null; } - if (columnIndex < 0) { - throw ArgumentError('columnIndex must not be negative'); - } if (columnIndex > m.columns.length) { return null; } diff --git a/lib/src/v3/protocol.dart b/lib/src/v3/protocol.dart index fb8e577..7535b68 100644 --- a/lib/src/v3/protocol.dart +++ b/lib/src/v3/protocol.dart @@ -69,8 +69,8 @@ StreamTransformer _readMessages( } } - void handleChunk(Uint8List bytes) { - framer.addBytes(bytes); + Future handleChunk(Uint8List bytes) async { + await framer.addBytes(bytes); emitFinishedMessages(); } diff --git a/test/framer_test.dart b/test/framer_test.dart index bea6db0..c488226 100644 --- a/test/framer_test.dart +++ b/test/framer_test.dart @@ -14,12 +14,12 @@ void main() { framer = MessageFramer(CodecContext.withDefaults()); }); - tearDown(() { - flush(framer); + tearDown(() async { + await flush(framer); }); - test('Perfectly sized message in one buffer', () { - framer.addBytes(bufferWithMessages([ + test('Perfectly sized message in one buffer', () async { + await framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1) ])); @@ -29,8 +29,8 @@ void main() { ]); }); - test('Two perfectly sized messages in one buffer', () { - framer.addBytes(bufferWithMessages([ + test('Two perfectly sized messages in one buffer', () async { + await framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1), messageWithBytes([1, 2, 3, 4], 2) ])); @@ -42,13 +42,13 @@ void main() { ]); }); - test('Header fragment', () { + test('Header fragment', () async { final message = messageWithBytes([1, 2, 3], 1); final fragments = fragmentedMessageBuffer(message, 2); - framer.addBytes(fragments.first); + await framer.addBytes(fragments.first); expect(framer.messageQueue, isEmpty); - framer.addBytes(fragments.last); + await framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -56,18 +56,18 @@ void main() { ]); }); - test('Two header fragments', () { + test('Two header fragments', () async { final message = messageWithBytes([1, 2, 3], 1); final fragments = fragmentedMessageBuffer(message, 2); final moreFragments = fragmentedMessageBuffer(fragments.first, 1); - framer.addBytes(moreFragments.first); + await framer.addBytes(moreFragments.first); expect(framer.messageQueue, isEmpty); - framer.addBytes(moreFragments.last); + await framer.addBytes(moreFragments.last); expect(framer.messageQueue, isEmpty); - framer.addBytes(fragments.last); + await framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -75,16 +75,17 @@ void main() { ]); }); - test('One message + header fragment', () { + test('One message + header fragment', () async { final message1 = messageWithBytes([1, 2, 3], 1); final message2 = messageWithBytes([2, 2, 3], 2); final message2Fragments = fragmentedMessageBuffer(message2, 3); - framer.addBytes(bufferWithMessages([message1, message2Fragments.first])); + await framer + .addBytes(bufferWithMessages([message1, message2Fragments.first])); expect(framer.messageQueue.length, 1); - framer.addBytes(message2Fragments.last); + await framer.addBytes(message2Fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -93,16 +94,17 @@ void main() { ]); }); - test('Message + header, missing rest of buffer', () { + test('Message + header, missing rest of buffer', () async { final message1 = messageWithBytes([1, 2, 3], 1); final message2 = messageWithBytes([2, 2, 3], 2); final message2Fragments = fragmentedMessageBuffer(message2, 5); - framer.addBytes(bufferWithMessages([message1, message2Fragments.first])); + await framer + .addBytes(bufferWithMessages([message1, message2Fragments.first])); expect(framer.messageQueue.length, 1); - framer.addBytes(message2Fragments.last); + await framer.addBytes(message2Fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -111,13 +113,13 @@ void main() { ]); }); - test('Message body spans two packets', () { + test('Message body spans two packets', () async { final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1); final fragments = fragmentedMessageBuffer(message, 8); - framer.addBytes(fragments.first); + await framer.addBytes(fragments.first); expect(framer.messageQueue, isEmpty); - framer.addBytes(fragments.last); + await framer.addBytes(fragments.last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -127,15 +129,15 @@ void main() { test( 'Message spans two packets, started in a packet that contained another message', - () { + () async { final earlierMessage = messageWithBytes([1, 2], 0); final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1); - framer.addBytes(bufferWithMessages( + await framer.addBytes(bufferWithMessages( [earlierMessage, fragmentedMessageBuffer(message, 8).first])); expect(framer.messageQueue, hasLength(1)); - framer.addBytes(fragmentedMessageBuffer(message, 8).last); + await framer.addBytes(fragmentedMessageBuffer(message, 8).last); final messages = framer.messageQueue.toList(); expect(messages, [ @@ -144,21 +146,22 @@ void main() { ]); }); - test('Message spans three packets, only part of header in the first', () { + test('Message spans three packets, only part of header in the first', + () async { final earlierMessage = messageWithBytes([1, 2], 0); final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], 1); - framer.addBytes(bufferWithMessages( + await framer.addBytes(bufferWithMessages( [earlierMessage, fragmentedMessageBuffer(message, 3).first])); expect(framer.messageQueue, hasLength(1)); - framer.addBytes( + await framer.addBytes( fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6) .first); expect(framer.messageQueue, hasLength(1)); - framer.addBytes( + await framer.addBytes( fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6) .last); @@ -170,41 +173,43 @@ void main() { ]); }); - test('Frame with no data', () { - framer.addBytes(bufferWithMessages([messageWithBytes([], 10)])); + test('Frame with no data', () async { + await framer.addBytes(bufferWithMessages([messageWithBytes([], 10)])); final messages = framer.messageQueue.toList(); expect(messages, [UnknownMessage(10, Uint8List(0))]); }); - test('Identify CopyDoneMessage with length equals size length (min)', () { + test('Identify CopyDoneMessage with length equals size length (min)', + () async { // min length final length = [0, 0, 0, 4]; // min length (4 bytes) as 32-bit final bytes = Uint8List.fromList([ SharedMessageId.copyDone, ...length, ]); - framer.addBytes(bytes); + await framer.addBytes(bytes); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as CopyDoneMessage).length, 4); }); - test('Identify CopyDoneMessage when length larger than size length', () { + test('Identify CopyDoneMessage when length larger than size length', + () async { final length = (ByteData(4)..setUint32(0, 42)).buffer.asUint8List(); final bytes = Uint8List.fromList([ SharedMessageId.copyDone, ...length, ]); - framer.addBytes(bytes); + await framer.addBytes(bytes); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as CopyDoneMessage).length, 42); }); - test('Adds XLogDataMessage to queue', () { + test('Adds XLogDataMessage to queue', () async { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); // random data bytes final dataBytes = [1, 2, 3, 4, 5, 6, 7, 8]; @@ -227,13 +232,13 @@ void main() { ...xlogDataMessage, ]; - framer.addBytes(Uint8List.fromList(copyDataBytes)); + await framer.addBytes(Uint8List.fromList(copyDataBytes)); final message = framer.messageQueue.toList().first; expect(message, isA()); expect(message, isNot(isA())); }); - test('Adds XLogDataLogicalMessage with JsonMessage to queue', () { + test('Adds XLogDataLogicalMessage with JsonMessage to queue', () async { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); /// represent an empty json object so we should get a XLogDataLogicalMessage @@ -259,13 +264,13 @@ void main() { ...xlogDataMessage, ]; - framer.addBytes(Uint8List.fromList(copyDataMessage)); + await framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); expect((message as XLogDataLogicalMessage).message, isA()); }); - test('Adds PrimaryKeepAliveMessage to queue', () { + test('Adds PrimaryKeepAliveMessage to queue', () async { final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List(); /// This represent a raw [PrimaryKeepAliveMessage] @@ -285,12 +290,12 @@ void main() { ...xlogDataMessage, ]; - framer.addBytes(Uint8List.fromList(copyDataMessage)); + await framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); }); - test('Adds raw CopyDataMessage for unknown stream message', () { + test('Adds raw CopyDataMessage for unknown stream message', () async { final xlogDataBytes = [ -1, // unknown id ]; @@ -305,7 +310,7 @@ void main() { ...xlogDataBytes, ]; - framer.addBytes(Uint8List.fromList(copyDataMessage)); + await framer.addBytes(Uint8List.fromList(copyDataMessage)); final message = framer.messageQueue.toList().first; expect(message, isA()); }); @@ -331,9 +336,9 @@ Uint8List bufferWithMessages(List> messages) { return Uint8List.fromList(messages.expand((l) => l).toList()); } -void flush(MessageFramer framer) { +Future flush(MessageFramer framer) async { framer.messageQueue.clear(); - framer.addBytes(bufferWithMessages([ + await framer.addBytes(bufferWithMessages([ messageWithBytes([1, 2, 3], 1) ]));