Skip to content

Commit

Permalink
Store and use TypeCodecContext in PgBytesPgByteDataReader (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 4, 2024
1 parent f93856e commit 2d44655
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 26 deletions.
1 change: 1 addition & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ analyzer:
unused_import: error
unused_local_variable: error
dead_code: error
deprecated_member_use_from_same_package: ignore

linter:
rules:
Expand Down
9 changes: 6 additions & 3 deletions lib/src/buffer.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:convert';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/types/type_codec.dart';

/// This class doesn't add much over using `List<int>` instead, however,
/// it creates a nice explicit type difference from both `String` and `List<int>`,
Expand Down Expand Up @@ -42,17 +43,19 @@ class PgByteDataWriter extends ByteDataWriter {
const _emptyString = '';

class PgByteDataReader extends ByteDataReader {
final Encoding encoding;
final TypeCodecContext typeCodecContext;

PgByteDataReader({
required this.encoding,
required this.typeCodecContext,
});

Encoding get encoding => typeCodecContext.encoding;

String readNullTerminatedString() {
final bytes = readUntilTerminatingByte(0);
if (bytes.isEmpty) {
return _emptyString;
}
return encoding.decode(bytes);
return typeCodecContext.encoding.decode(bytes);
}
}
14 changes: 9 additions & 5 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import 'dart:collection';
import 'dart:convert';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:charcode/ascii.dart';
import 'package:postgres/src/types/type_codec.dart';

import 'buffer.dart';
import 'messages/server_messages.dart';
Expand Down Expand Up @@ -35,11 +35,11 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
};

class MessageFramer {
final Encoding _encoding;
late final _reader = PgByteDataReader(encoding: _encoding);
final TypeCodecContext _typeCodecContext;
late final _reader = PgByteDataReader(typeCodecContext: _typeCodecContext);
final messageQueue = Queue<ServerMessage>();

MessageFramer(this._encoding);
MessageFramer(this._typeCodecContext);

int? _type;
int _expectedLength = 0;
Expand Down Expand Up @@ -116,7 +116,11 @@ ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
if (code == ReplicationMessageId.primaryKeepAlive) {
return PrimaryKeepAliveMessage.parse(reader);
} else if (code == ReplicationMessageId.xLogData) {
return XLogDataMessage.parse(reader.read(length - 1), reader.encoding);
return XLogDataMessage.parse(
reader.read(length - 1),
reader.encoding,
typeCodecContext: reader.typeCodecContext,
);
} else {
final bb = BytesBuffer();
bb.addByte(code);
Expand Down
16 changes: 14 additions & 2 deletions lib/src/messages/server_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:meta/meta.dart';
import 'package:postgres/src/types/type_codec.dart';

import '../buffer.dart';
import '../time_converters.dart';
Expand Down Expand Up @@ -366,8 +367,19 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage {
/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
/// return [XLogDataMessage] with raw data.
static XLogDataMessage parse(Uint8List bytes, Encoding encoding) {
final reader = PgByteDataReader(encoding: encoding)..add(bytes);
///
@Deprecated(
'It is likely that this method signature will change or will be removed in '
'an upcoming release. Please file a new issue on GitHub if you are using it.')
static XLogDataMessage parse(
Uint8List bytes,
Encoding encoding, {
TypeCodecContext? typeCodecContext,
}) {
final reader = PgByteDataReader(
typeCodecContext: typeCodecContext ??
TypeCodecContext.withDefaults(encoding: encoding))
..add(bytes);
final walStart = LSN(reader.readUint64());
final walEnd = LSN(reader.readUint64());
final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());
Expand Down
16 changes: 15 additions & 1 deletion lib/src/types/type_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,22 @@ class TypeCodecContext {
required this.typeRegistry,
});

factory TypeCodecContext.withDefaults({
Encoding? encoding,
RelationTracker? relationTracker,
RuntimeParameters? runtimeParameters,
TypeRegistry? typeRegistry,
}) {
return TypeCodecContext(
encoding: encoding ?? utf8,
relationTracker: relationTracker ?? RelationTracker(),
runtimeParameters: runtimeParameters ?? RuntimeParameters(),
typeRegistry: typeRegistry ?? TypeRegistry(),
);
}

PgByteDataReader newPgByteDataReader([Uint8List? bytes]) {
final reader = PgByteDataReader(encoding: encoding);
final reader = PgByteDataReader(typeCodecContext: this);
if (bytes != null) {
reader.add(bytes);
}
Expand Down
25 changes: 18 additions & 7 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,18 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
final settings = connectionSettings is ResolvedConnectionSettings
? connectionSettings
: ResolvedConnectionSettings(connectionSettings, null);
var (channel, secure) = await _connect(endpoint, settings);
final typeCodecContext = TypeCodecContext(
encoding: settings.encoding,
// TODO: share this between pooled connections
relationTracker: RelationTracker(),
runtimeParameters: RuntimeParameters(),
typeRegistry: settings.typeRegistry,
);
var (channel, secure) = await _connect(
endpoint,
settings,
typeCodecContext: typeCodecContext,
);

if (_debugLog) {
channel = channel.transform(StreamChannelTransformer(
Expand All @@ -226,9 +237,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
settings,
channel,
secure,
// TODO: share this between pooled connections
relationTracker: RelationTracker(),
runtimeParameters: RuntimeParameters(),
relationTracker: typeCodecContext.relationTracker,
runtimeParameters: typeCodecContext.runtimeParameters,
);
await connection._startup();
if (connection._settings.onOpen != null) {
Expand All @@ -239,8 +249,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {

static Future<(StreamChannel<Message>, bool)> _connect(
Endpoint endpoint,
ResolvedConnectionSettings settings,
) async {
ResolvedConnectionSettings settings, {
required TypeCodecContext typeCodecContext,
}) async {
final host = endpoint.host;
final port = endpoint.port;

Expand Down Expand Up @@ -331,7 +342,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {

return (
StreamChannel<List<int>>(adaptedStream, outgoingSocket)
.transform(messageTransformer(settings.encoding)),
.transform(messageTransformer(typeCodecContext)),
secure,
);
}
Expand Down
13 changes: 7 additions & 6 deletions lib/src/v3/protocol.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:postgres/src/types/type_codec.dart';
import 'package:stream_channel/stream_channel.dart';

import '../buffer.dart';
Expand Down Expand Up @@ -34,9 +34,9 @@ class AggregatedClientMessage extends ClientMessage {
}

StreamChannelTransformer<Message, List<int>> messageTransformer(
Encoding encoding) {
TypeCodecContext typeCodecContext) {
return StreamChannelTransformer(
_readMessages(encoding),
_readMessages(typeCodecContext),
StreamSinkTransformer.fromHandlers(
handleData: (message, out) {
if (message is! ClientMessage) {
Expand All @@ -47,16 +47,17 @@ StreamChannelTransformer<Message, List<int>> messageTransformer(
return;
}

out.add(message.asBytes(encoding: encoding));
out.add(message.asBytes(encoding: typeCodecContext.encoding));
},
),
);
}

StreamTransformer<Uint8List, ServerMessage> _readMessages(Encoding encoding) {
StreamTransformer<Uint8List, ServerMessage> _readMessages(
TypeCodecContext typeCodecContext) {
return StreamTransformer.fromBind((rawStream) {
return Stream.multi((listener) {
final framer = MessageFramer(encoding);
final framer = MessageFramer(typeCodecContext);

var paused = false;

Expand Down
4 changes: 2 additions & 2 deletions test/framer_test.dart
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/message_window.dart';
import 'package:postgres/src/messages/logical_replication_messages.dart';
import 'package:postgres/src/messages/server_messages.dart';
import 'package:postgres/src/messages/shared_messages.dart';
import 'package:postgres/src/types/type_codec.dart';
import 'package:test/test.dart';

void main() {
late MessageFramer framer;
setUp(() {
framer = MessageFramer(utf8);
framer = MessageFramer(TypeCodecContext.withDefaults());
});

tearDown(() {
Expand Down

0 comments on commit 2d44655

Please sign in to comment.