Skip to content

Commit

Permalink
Throw exception if connection-level query executation is called while…
Browse files Browse the repository at this point in the history
… inside a transaction callback. (#378)
  • Loading branch information
isoos authored Sep 13, 2024
1 parent 80b9078 commit 6691c3d
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
(for values where type is not specified).
- `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Preparing/executing a stamement on the main connection while in a `runTx` callback will throw an exception.
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
Expand Down
17 changes: 13 additions & 4 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ abstract class _PgSessionBase implements Session {
@override
Future<void> get closed => _sessionClosedCompleter.future;

void _verifyStateBeforeQuery() {
if (_connection._isClosing || _sessionClosed) {
throw PgException(
'Attempting to execute query, but connection is not open.');
}
if (this == _connection && _connection._activeTransaction != null) {
throw PgException(
'Attempting to execute query on connection while inside a `runTx` call.');
}
}

@override
Future<Result> execute(
Object query, {
Expand All @@ -113,10 +124,7 @@ abstract class _PgSessionBase implements Session {
QueryMode? queryMode,
Duration? timeout,
}) async {
if (_connection._isClosing || _sessionClosed) {
throw PgException(
'Attempting to execute query, but connection is not open.');
}
_verifyStateBeforeQuery();
final description = InternalQueryDescription.wrap(
query,
typeRegistry: _connection._settings.typeRegistry,
Expand Down Expand Up @@ -174,6 +182,7 @@ abstract class _PgSessionBase implements Session {
Object query, {
Duration? timeout,
}) async {
_verifyStateBeforeQuery();
return await _prepare(query, timeout: timeout);
}

Expand Down
6 changes: 3 additions & 3 deletions test/timeout_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ void main() {
expect(await conn.execute('SELECT * from t'), hasLength(0));
});

test(
'Query on parent context for transaction completes (with error) after timeout',
test('Timeout is ignored when new statement is run on parent context',
() async {
try {
await conn.runTx((ctx) async {
await conn.execute('SELECT 1', timeout: Duration(seconds: 1));
await ctx.execute('INSERT INTO t (id) VALUES (1)');
});
fail('unreachable');
} on TimeoutException {
} on PgException catch (e) {
expect(e.message, contains('runTx'));
// ignore
}

Expand Down
43 changes: 12 additions & 31 deletions test/transaction_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,19 @@ void main() {
]);
});

test('Query during transaction must wait until transaction is finished',
() async {
final orderEnsurer = [];
final nextCompleter = Completer.sync();
final outResult = conn.runTx((c) async {
orderEnsurer.add(1);
await c.execute('INSERT INTO t (id) VALUES (1)');
orderEnsurer.add(2);
nextCompleter.complete();
final result = await c.execute('SELECT id FROM t');
orderEnsurer.add(3);

return result;
});

await nextCompleter.future;
orderEnsurer.add(11);
await conn.execute('INSERT INTO t (id) VALUES (2)');
orderEnsurer.add(12);
final laterResults = await conn.execute('SELECT id FROM t');
orderEnsurer.add(13);

final firstResult = await outResult;
test('Connection query during transaction will throw exception.', () async {
try {
await conn.runTx((ctx) async {
await conn.execute('SELECT 1');
await ctx.execute('INSERT INTO t (id) VALUES (1)');
});
fail('unreachable');
} on PgException catch (e) {
expect(e.message, contains('runTx'));
// ignore
}

expect(orderEnsurer, [1, 2, 11, 3, 12, 13]);
expect(firstResult, [
[1]
]);
expect(laterResults, [
[1],
[2]
]);
expect(await conn.execute('SELECT * from t'), hasLength(0));
});

test('Make sure two simultaneous transactions cannot be interwoven',
Expand Down
34 changes: 0 additions & 34 deletions test/v3_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -249,40 +249,6 @@ void main() {
]);
});

test('Query during transaction must wait until transaction is finished',
() async {
final orderEnsurer = [];
final nextCompleter = Completer.sync();
final outResult = connection.runTx((c) async {
orderEnsurer.add(1);
await c.execute('INSERT INTO t (id) VALUES (1)');
orderEnsurer.add(2);
nextCompleter.complete();
final result = await c.execute('SELECT id FROM t');
orderEnsurer.add(3);

return result;
});

await nextCompleter.future;
orderEnsurer.add(11);
await connection.execute('INSERT INTO t (id) VALUES (2)');
orderEnsurer.add(12);
final laterResults = await connection.execute('SELECT id FROM t');
orderEnsurer.add(13);

final firstResult = await outResult;

expect(orderEnsurer, [1, 2, 11, 3, 12, 13]);
expect(firstResult, [
[1]
]);
expect(laterResults, [
[1],
[2]
]);
});

test('Make sure two simultaneous transactions cannot be interwoven',
() async {
final orderEnsurer = [];
Expand Down

0 comments on commit 6691c3d

Please sign in to comment.