From 2d44655ce322a810213e2cb8c2c532d0875abd39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Wed, 4 Sep 2024 22:25:19 +0200 Subject: [PATCH] Store and use TypeCodecContext in PgBytesPgByteDataReader (#357) --- analysis_options.yaml | 1 + lib/src/buffer.dart | 9 ++++++--- lib/src/message_window.dart | 14 +++++++++----- lib/src/messages/server_messages.dart | 16 ++++++++++++++-- lib/src/types/type_codec.dart | 16 +++++++++++++++- lib/src/v3/connection.dart | 25 ++++++++++++++++++------- lib/src/v3/protocol.dart | 13 +++++++------ test/framer_test.dart | 4 ++-- 8 files changed, 72 insertions(+), 26 deletions(-) diff --git a/analysis_options.yaml b/analysis_options.yaml index 3011fa50..e5825823 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -11,6 +11,7 @@ analyzer: unused_import: error unused_local_variable: error dead_code: error + deprecated_member_use_from_same_package: ignore linter: rules: diff --git a/lib/src/buffer.dart b/lib/src/buffer.dart index a776b114..376324c6 100644 --- a/lib/src/buffer.dart +++ b/lib/src/buffer.dart @@ -1,6 +1,7 @@ import 'dart:convert'; import 'package:buffer/buffer.dart'; +import 'package:postgres/src/types/type_codec.dart'; /// This class doesn't add much over using `List` instead, however, /// it creates a nice explicit type difference from both `String` and `List`, @@ -42,17 +43,19 @@ class PgByteDataWriter extends ByteDataWriter { const _emptyString = ''; class PgByteDataReader extends ByteDataReader { - final Encoding encoding; + final TypeCodecContext typeCodecContext; PgByteDataReader({ - required this.encoding, + required this.typeCodecContext, }); + Encoding get encoding => typeCodecContext.encoding; + String readNullTerminatedString() { final bytes = readUntilTerminatingByte(0); if (bytes.isEmpty) { return _emptyString; } - return encoding.decode(bytes); + return typeCodecContext.encoding.decode(bytes); } } diff --git a/lib/src/message_window.dart b/lib/src/message_window.dart index 0c96f234..5165b6e4 100644 --- a/lib/src/message_window.dart +++ b/lib/src/message_window.dart @@ -1,9 +1,9 @@ import 'dart:collection'; -import 'dart:convert'; import 'dart:typed_data'; import 'package:buffer/buffer.dart'; import 'package:charcode/ascii.dart'; +import 'package:postgres/src/types/type_codec.dart'; import 'buffer.dart'; import 'messages/server_messages.dart'; @@ -35,11 +35,11 @@ Map _messageTypeMap = { }; class MessageFramer { - final Encoding _encoding; - late final _reader = PgByteDataReader(encoding: _encoding); + final TypeCodecContext _typeCodecContext; + late final _reader = PgByteDataReader(typeCodecContext: _typeCodecContext); final messageQueue = Queue(); - MessageFramer(this._encoding); + MessageFramer(this._typeCodecContext); int? _type; int _expectedLength = 0; @@ -116,7 +116,11 @@ ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) { if (code == ReplicationMessageId.primaryKeepAlive) { return PrimaryKeepAliveMessage.parse(reader); } else if (code == ReplicationMessageId.xLogData) { - return XLogDataMessage.parse(reader.read(length - 1), reader.encoding); + return XLogDataMessage.parse( + reader.read(length - 1), + reader.encoding, + typeCodecContext: reader.typeCodecContext, + ); } else { final bb = BytesBuffer(); bb.addByte(code); diff --git a/lib/src/messages/server_messages.dart b/lib/src/messages/server_messages.dart index 77edda7a..e70ca0fb 100644 --- a/lib/src/messages/server_messages.dart +++ b/lib/src/messages/server_messages.dart @@ -2,6 +2,7 @@ import 'dart:convert'; import 'dart:typed_data'; import 'package:meta/meta.dart'; +import 'package:postgres/src/types/type_codec.dart'; import '../buffer.dart'; import '../time_converters.dart'; @@ -366,8 +367,19 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage { /// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method /// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll /// return [XLogDataMessage] with raw data. - static XLogDataMessage parse(Uint8List bytes, Encoding encoding) { - final reader = PgByteDataReader(encoding: encoding)..add(bytes); + /// + @Deprecated( + 'It is likely that this method signature will change or will be removed in ' + 'an upcoming release. Please file a new issue on GitHub if you are using it.') + static XLogDataMessage parse( + Uint8List bytes, + Encoding encoding, { + TypeCodecContext? typeCodecContext, + }) { + final reader = PgByteDataReader( + typeCodecContext: typeCodecContext ?? + TypeCodecContext.withDefaults(encoding: encoding)) + ..add(bytes); final walStart = LSN(reader.readUint64()); final walEnd = LSN(reader.readUint64()); final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64()); diff --git a/lib/src/types/type_codec.dart b/lib/src/types/type_codec.dart index 5786500e..9c728fa2 100644 --- a/lib/src/types/type_codec.dart +++ b/lib/src/types/type_codec.dart @@ -104,8 +104,22 @@ class TypeCodecContext { required this.typeRegistry, }); + factory TypeCodecContext.withDefaults({ + Encoding? encoding, + RelationTracker? relationTracker, + RuntimeParameters? runtimeParameters, + TypeRegistry? typeRegistry, + }) { + return TypeCodecContext( + encoding: encoding ?? utf8, + relationTracker: relationTracker ?? RelationTracker(), + runtimeParameters: runtimeParameters ?? RuntimeParameters(), + typeRegistry: typeRegistry ?? TypeRegistry(), + ); + } + PgByteDataReader newPgByteDataReader([Uint8List? bytes]) { - final reader = PgByteDataReader(encoding: encoding); + final reader = PgByteDataReader(typeCodecContext: this); if (bytes != null) { reader.add(bytes); } diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 97e4d386..26dcb7eb 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -200,7 +200,18 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { final settings = connectionSettings is ResolvedConnectionSettings ? connectionSettings : ResolvedConnectionSettings(connectionSettings, null); - var (channel, secure) = await _connect(endpoint, settings); + final typeCodecContext = TypeCodecContext( + encoding: settings.encoding, + // TODO: share this between pooled connections + relationTracker: RelationTracker(), + runtimeParameters: RuntimeParameters(), + typeRegistry: settings.typeRegistry, + ); + var (channel, secure) = await _connect( + endpoint, + settings, + typeCodecContext: typeCodecContext, + ); if (_debugLog) { channel = channel.transform(StreamChannelTransformer( @@ -226,9 +237,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { settings, channel, secure, - // TODO: share this between pooled connections - relationTracker: RelationTracker(), - runtimeParameters: RuntimeParameters(), + relationTracker: typeCodecContext.relationTracker, + runtimeParameters: typeCodecContext.runtimeParameters, ); await connection._startup(); if (connection._settings.onOpen != null) { @@ -239,8 +249,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { static Future<(StreamChannel, bool)> _connect( Endpoint endpoint, - ResolvedConnectionSettings settings, - ) async { + ResolvedConnectionSettings settings, { + required TypeCodecContext typeCodecContext, + }) async { final host = endpoint.host; final port = endpoint.port; @@ -331,7 +342,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection { return ( StreamChannel>(adaptedStream, outgoingSocket) - .transform(messageTransformer(settings.encoding)), + .transform(messageTransformer(typeCodecContext)), secure, ); } diff --git a/lib/src/v3/protocol.dart b/lib/src/v3/protocol.dart index e436ed7d..49e03293 100644 --- a/lib/src/v3/protocol.dart +++ b/lib/src/v3/protocol.dart @@ -1,8 +1,8 @@ import 'dart:async'; -import 'dart:convert'; import 'dart:typed_data'; import 'package:async/async.dart'; +import 'package:postgres/src/types/type_codec.dart'; import 'package:stream_channel/stream_channel.dart'; import '../buffer.dart'; @@ -34,9 +34,9 @@ class AggregatedClientMessage extends ClientMessage { } StreamChannelTransformer> messageTransformer( - Encoding encoding) { + TypeCodecContext typeCodecContext) { return StreamChannelTransformer( - _readMessages(encoding), + _readMessages(typeCodecContext), StreamSinkTransformer.fromHandlers( handleData: (message, out) { if (message is! ClientMessage) { @@ -47,16 +47,17 @@ StreamChannelTransformer> messageTransformer( return; } - out.add(message.asBytes(encoding: encoding)); + out.add(message.asBytes(encoding: typeCodecContext.encoding)); }, ), ); } -StreamTransformer _readMessages(Encoding encoding) { +StreamTransformer _readMessages( + TypeCodecContext typeCodecContext) { return StreamTransformer.fromBind((rawStream) { return Stream.multi((listener) { - final framer = MessageFramer(encoding); + final framer = MessageFramer(typeCodecContext); var paused = false; diff --git a/test/framer_test.dart b/test/framer_test.dart index cc400108..0eba460f 100644 --- a/test/framer_test.dart +++ b/test/framer_test.dart @@ -1,4 +1,3 @@ -import 'dart:convert'; import 'dart:typed_data'; import 'package:buffer/buffer.dart'; @@ -6,12 +5,13 @@ import 'package:postgres/src/message_window.dart'; import 'package:postgres/src/messages/logical_replication_messages.dart'; import 'package:postgres/src/messages/server_messages.dart'; import 'package:postgres/src/messages/shared_messages.dart'; +import 'package:postgres/src/types/type_codec.dart'; import 'package:test/test.dart'; void main() { late MessageFramer framer; setUp(() { - framer = MessageFramer(utf8); + framer = MessageFramer(TypeCodecContext.withDefaults()); }); tearDown(() {