Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "Asynchronous Codec methods + updated (deprecated) message processing."" #376

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
- `Connection.info` (through `ConnectionInfo` class) exposes read-only connection-level information,
e.g. acessing access server-provided parameter status values.
- Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU).
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- **Allowing custom type codecs**:
- `Codec` interface is used for encoding/decoding value by type OIDs or Dart values.
- `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides
Expand All @@ -14,6 +13,8 @@
(for values where type is not specified).
- `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
- `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`).
- The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat
Expand Down
5 changes: 3 additions & 2 deletions lib/messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ library messages;

export 'src/buffer.dart' show PgByteDataWriter;
export 'src/messages/client_messages.dart';
export 'src/messages/logical_replication_messages.dart';
export 'src/messages/server_messages.dart';
export 'src/messages/logical_replication_messages.dart'
hide tryAsyncParseLogicalReplicationMessage;
export 'src/messages/server_messages.dart' hide parseXLogDataMessage;
export 'src/messages/shared_messages.dart';
10 changes: 6 additions & 4 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:collection';
import 'dart:typed_data';

Expand All @@ -11,7 +12,7 @@ import 'messages/shared_messages.dart';

const int _headerByteSize = 5;

typedef _ServerMessageFn = ServerMessage Function(
typedef _ServerMessageFn = FutureOr<ServerMessage> Function(
PgByteDataReader reader, int length);

Map<int, _ServerMessageFn> _messageTypeMap = {
Expand Down Expand Up @@ -50,7 +51,7 @@ class MessageFramer {
bool get _isComplete =>
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;

void addBytes(Uint8List bytes) {
Future<void> addBytes(Uint8List bytes) async {
_reader.add(bytes);

while (true) {
Expand All @@ -76,7 +77,7 @@ class MessageFramer {
}

final targetRemainingLength = _reader.remainingLength - _expectedLength;
final msg = msgMaker(_reader, _expectedLength);
final msg = await msgMaker(_reader, _expectedLength);
if (_reader.remainingLength > targetRemainingLength) {
throw StateError(
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
Expand Down Expand Up @@ -111,7 +112,8 @@ class MessageFramer {
/// such as replication messages.
/// Returns a [ReplicationMessage] if the message contains such message.
/// Otherwise, it'll just return the provided bytes as [CopyDataMessage].
ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
Future<ServerMessage> _parseCopyDataMessage(
PgByteDataReader reader, int length) async {
final code = reader.readUint8();
if (code == ReplicationMessageId.primaryKeepAlive) {
return PrimaryKeepAliveMessage.parse(reader);
Expand Down
190 changes: 176 additions & 14 deletions lib/src/messages/logical_replication_messages.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:meta/meta.dart';
import 'package:postgres/src/types/codec.dart';

import '../buffer.dart';
Expand Down Expand Up @@ -47,6 +48,8 @@ class XLogDataLogicalMessage implements XLogDataMessage {

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@Deprecated('This method will be removed from public API. '
'Please file a new issue on GitHub if you are using it.')
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
PgByteDataReader reader, int length) {
// the first byte is the msg type
Expand All @@ -69,13 +72,66 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage(
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return InsertMessage._parse(reader);
return InsertMessage._syncParse(reader);

case LogicalReplicationMessageTypes.update:
return UpdateMessage._parse(reader);
return UpdateMessage._syncParse(reader);

case LogicalReplicationMessageTypes.delete:
return DeleteMessage._parse(reader);
return DeleteMessage._syncParse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);

case LogicalReplicationMessageTypes.unsupported:
// wal2json messages starts with `{` as the first byte
if (firstByte == '{'.codeUnits.single) {
// note this needs the full set of bytes unlike other cases
final bb = BytesBuffer();
bb.addByte(firstByte);
bb.add(reader.read(length - 1));
try {
return JsonMessage(reader.encoding.decode(bb.toBytes()));
} catch (_) {
// ignore
}
}
return null;
}
}

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@internal
Future<LogicalReplicationMessage?> tryAsyncParseLogicalReplicationMessage(
PgByteDataReader reader, int length) async {
// the first byte is the msg type
final firstByte = reader.readUint8();
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
switch (msgType) {
case LogicalReplicationMessageTypes.begin:
return BeginMessage._parse(reader);

case LogicalReplicationMessageTypes.commit:
return CommitMessage._parse(reader);

case LogicalReplicationMessageTypes.origin:
return OriginMessage._parse(reader);

case LogicalReplicationMessageTypes.relation:
return RelationMessage._parse(reader);

case LogicalReplicationMessageTypes.type:
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return await InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return await UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return await DeleteMessage._parse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);
Expand Down Expand Up @@ -381,7 +437,49 @@ class TupleData {
/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
factory TupleData._parse(PgByteDataReader reader, int relationId) {
///
/// NOTE: do not use, will be removed.
factory TupleData._syncParse(PgByteDataReader reader) {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
// reading order matters
final typeId = reader.readUint8();
final tupleDataType = TupleDataType.fromByte(typeId);
late final int length;
late final String data;
Object? value;
switch (tupleDataType) {
case TupleDataType.text:
case TupleDataType.binary:
length = reader.readUint32();
data = reader.encoding.decode(reader.read(length));
value = data;
break;
case TupleDataType.null_:
case TupleDataType.toast:
length = 0;
data = '';
break;
}
columns.add(
TupleDataColumn(
typeId: typeId,
length: length,
typeOid: null,
data: data,
value: value,
),
);
}
return TupleData(columns: columns);
}

/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
static Future<TupleData> _parse(
PgByteDataReader reader, int relationId) async {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
Expand All @@ -390,8 +488,11 @@ class TupleData {
final tupleDataType = TupleDataType.fromByte(typeId);
late final int length;
late final String data;
final typeOid = reader.codecContext.databaseInfo
.getCachedTypeOidForRelationColumn(relationId, i);
final typeOid = await reader.codecContext.databaseInfo
.getColumnTypeOidByRelationIdAndColumnIndex(
relationId: relationId,
columnIndex: i,
);
Object? value;
switch (tupleDataType) {
case TupleDataType.text:
Expand All @@ -409,7 +510,7 @@ class TupleData {
bytes: bytes,
encoding: reader.codecContext.encoding,
)
: reader.codecContext.typeRegistry.decode(
: await reader.codecContext.typeRegistry.decode(
EncodedValue.binary(
bytes,
typeOid: typeOid,
Expand Down Expand Up @@ -451,13 +552,26 @@ class InsertMessage implements LogicalReplicationMessage {
late final int relationId;
late final TupleData tuple;

InsertMessage._parse(PgByteDataReader reader) {
InsertMessage._(this.relationId, this.tuple);

/// NOTE: do not use, will be removed.
InsertMessage._syncParse(PgByteDataReader reader) {
relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
tuple = TupleData._parse(reader, relationId);
tuple = TupleData._syncParse(reader);
}

static Future<InsertMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
final tuple = await TupleData._parse(reader, relationId);
return InsertMessage._(relationId, tuple);
}

@override
Expand Down Expand Up @@ -511,26 +625,56 @@ class UpdateMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData? newTuple;

UpdateMessage._parse(PgByteDataReader reader) {
UpdateMessage._(
this.relationId, this.oldTupleType, this.oldTuple, this.newTuple);

/// NOTE: do not use, will be removed.
UpdateMessage._syncParse(PgByteDataReader reader) {
// reading order matters
relationId = reader.readUint32();
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = TupleData._parse(reader, relationId);
oldTuple = TupleData._syncParse(reader);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = TupleData._syncParse(reader);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
}

static Future<UpdateMessage> _parse(PgByteDataReader reader) async {
// reading order matters
final relationId = reader.readUint32();
UpdateMessageTuple? oldTupleType;
TupleData? oldTuple;
TupleData? newTuple;
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = await TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = TupleData._parse(reader, relationId);
newTuple = await TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple);
}

@override
Expand Down Expand Up @@ -583,18 +727,36 @@ class DeleteMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData oldTuple;

DeleteMessage._parse(PgByteDataReader reader) {
DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple);

/// NOTE: do not use, will be removed.
DeleteMessage._syncParse(PgByteDataReader reader) {
relationId = reader.readUint32();
oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());

switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = TupleData._parse(reader, relationId);
oldTuple = TupleData._syncParse(reader);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
}

static Future<DeleteMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());
TupleData? oldTuple;
switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = await TupleData._parse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
return DeleteMessage._(relationId, oldTupleType, oldTuple);
}

@override
Expand Down
Loading
Loading