Skip to content

Commit

Permalink
Ensure client.close() cleans up any pending async operations (#655)
Browse files Browse the repository at this point in the history
* Avoid leaving pending timers when client.close() is called

* Prevent rate limited gateway messages from preventing client from closing

* Add test ensuring client disposes all async resources

* Increase timeout for cleaning up async objects to 50ms

I think this keeps failing on my laptop because its specs are too low, and it takes more than 10ms for isolates to talk to each other
  • Loading branch information
abitofevrything authored May 2, 2024
1 parent 55c94c3 commit e8d9d5d
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 16 deletions.
3 changes: 2 additions & 1 deletion lib/nyxx.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export 'src/errors.dart'
IntegrationNotFoundException,
AlreadyAcknowledgedError,
AlreadyRespondedError,
PluginError;
PluginError,
ClientClosedError;

export 'src/builders/builder.dart' show Builder, CreateBuilder, UpdateBuilder;
export 'src/builders/image.dart' show ImageBuilder;
Expand Down
6 changes: 6 additions & 0 deletions lib/src/errors.dart
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,9 @@ class PluginError extends Error {
@override
String toString() => message;
}

/// An error thrown when the client is closed while an operation is pending, or when an already closed client is used.
class ClientClosedError extends Error {
@override
String toString() => 'Client is closed';
}
23 changes: 18 additions & 5 deletions lib/src/gateway/gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class Gateway extends GatewayManager with EventParser {
/// See [Shard.latency] for details on how the latency is calculated.
Duration get latency => shards.fold(Duration.zero, (previousValue, element) => previousValue + (element.latency ~/ shards.length));

final List<Timer> _startTimers = [];
final Set<Timer> _startOrIdentifyTimers = {};

/// Create a new [Gateway].
Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() {
Expand All @@ -112,10 +112,15 @@ class Gateway extends GatewayManager with EventParser {
final rateLimitKey = shard.id % maxConcurrency;

// Delay the shard starting until it is (approximately) also ready to identify.
_startTimers.add(Timer(identifyDelay * (shard.id ~/ maxConcurrency), () {
// This avoids opening many websocket connections simultaneously just to have most
// of them wait for their identify request.
late final Timer startTimer;
startTimer = Timer(identifyDelay * (shard.id ~/ maxConcurrency), () {
logger.fine('Starting shard ${shard.id}');
shard.add(StartShard());
}));
_startOrIdentifyTimers.remove(startTimer);
});
_startOrIdentifyTimers.add(startTimer);

shard.listen(
(event) {
Expand All @@ -137,7 +142,15 @@ class Gateway extends GatewayManager with EventParser {

remainingIdentifyRequests--;
shard.add(Identify());
return await Future.delayed(identifyDelay);

// Don't use Future.delayed so that we can exit early if close() is called.
// If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it.
// This code is roughly equivalent to `await Future.delayed(identifyDelay)`
final delayCompleter = Completer<void>();
final delayTimer = Timer(identifyDelay, delayCompleter.complete);
_startOrIdentifyTimers.add(delayTimer);
await delayCompleter.future;
_startOrIdentifyTimers.remove(delayTimer);
});
}
},
Expand Down Expand Up @@ -199,7 +212,7 @@ class Gateway extends GatewayManager with EventParser {
Future<void> close() async {
_closing = true;
// Make sure we don't start any shards after we have closed.
for (final timer in _startTimers) {
for (final timer in _startOrIdentifyTimers) {
timer.cancel();
}
await Future.wait(shards.map((shard) => shard.close()));
Expand Down
29 changes: 21 additions & 8 deletions lib/src/gateway/shard_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ class ShardRunner {
await connection!.add(e);
} catch (error, s) {
controller.add(ErrorReceived(error: error, stackTrace: s));
// Try to send the event again.
sendController.add(e);

// Prevent the recursive call to add() from looping too often.
await Future.delayed(Duration(milliseconds: 100));
// Try to send the event again, unless we are disposing (in which case the controller will be closed).
if (!sendController.isClosed) {
sendController.add(e);
}
}
})
..pause();
Expand Down Expand Up @@ -122,6 +127,8 @@ class ShardRunner {
while (true) {
try {
// Check for dispose requests. If we should be disposing, exit the loop.
// Do this now instead of after the connection is closed in case we get
// a dispose request before the shard is even started.
if (disposing) {
controller.add(Disconnecting(reason: 'Dispose requested'));
return;
Expand All @@ -130,11 +137,6 @@ class ShardRunner {
// Initialize lastHeartbeatAcked to `true` so we don't immediately disconnect in heartbeat().
lastHeartbeatAcked = true;

// Pause the send subscription until we are connected.
if (!sendHandler.isPaused) {
sendHandler.pause();
}

// Open the websocket connection.
connection = await ShardConnection.connect(gatewayUri.toString(), this);
connection!.onSent.listen(controller.add);
Expand Down Expand Up @@ -232,6 +234,12 @@ class ShardRunner {
// Prevents the while-true loop from looping too often when no internet is available.
await Future.delayed(Duration(milliseconds: 100));
} finally {
// Pause the send subscription until we are connected again.
// The handler may already be paused if the error occurred before we had identified.
if (!sendHandler.isPaused) {
sendHandler.pause();
}

// Reset connection properties.
await connection?.close(4000);
connection = null;
Expand Down Expand Up @@ -400,7 +408,12 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {

final rateLimitLimit = event.opcode == Opcode.heartbeat ? 0 : rateLimitHeartbeatReservation;
while (rateLimitCount - _currentRateLimitCount <= rateLimitLimit) {
await _currentRateLimitEnd.future;
try {
await _currentRateLimitEnd.future;
} catch (e) {
// Swap out stack trace so the error message makes more sense.
Error.throwWithStackTrace(e, StackTrace.current);
}
}

if (event.opcode == Opcode.heartbeat) {
Expand Down
41 changes: 39 additions & 2 deletions lib/src/http/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:http/http.dart' hide MultipartRequest;
import 'package:logging/logging.dart';
import 'package:nyxx/src/api_options.dart';
import 'package:nyxx/src/client.dart';
import 'package:nyxx/src/errors.dart';
import 'package:nyxx/src/http/bucket.dart';
import 'package:nyxx/src/http/request.dart';
import 'package:nyxx/src/http/response.dart';
Expand Down Expand Up @@ -94,6 +95,8 @@ class HttpHandler {
/// If no requests have been completed, this getter returns [Duration.zero].
Duration get realLatency => _realLatencies.isEmpty ? Duration.zero : (_realLatencies.reduce((a, b) => a + b) ~/ _realLatencies.length);

final Set<Completer<void>> _pendingRateLimits = {};

/// Create a new [HttpHandler].
///
/// {@macro http_handler}
Expand Down Expand Up @@ -204,7 +207,19 @@ class HttpHandler {
if (waitTime > Duration.zero) {
logger.finer('Holding ${request.loggingId} for $waitTime');
_onRateLimitController.add((request: request, delay: waitTime, isGlobal: isGlobal, isAnticipated: true));
await Future.delayed(waitTime);

// Don't use Future.delayed so that we can exit early if close() is called.
// If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it.
// This is roughly equivalent to `await Future.delayed(waitTime)`
final completer = Completer<void>();
final timer = Timer(waitTime, completer.complete);
_pendingRateLimits.add(completer);
try {
await completer.future;
} finally {
_pendingRateLimits.remove(completer);
timer.cancel();
}
}
} while (waitTime > Duration.zero);

Expand Down Expand Up @@ -281,7 +296,20 @@ class HttpHandler {
}

_onRateLimitController.add((request: request, delay: retryAfter, isGlobal: isGlobal, isAnticipated: false));
return Future.delayed(retryAfter, () => execute(request));

// Don't use Future.delayed so that we can exit early if close() is called.
// If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it.
// This is roughly equivalent to `return Future.delayed(retryAfter, () => execute(request))`
final completer = Completer<void>();
final timer = Timer(retryAfter, completer.complete);
_pendingRateLimits.add(completer);
try {
await completer.future;
return execute(request);
} finally {
_pendingRateLimits.remove(completer);
timer.cancel();
}
} on TypeError {
logger.shout('Invalid rate limit body for ${request.loggingId}! Your client is probably cloudflare banned!');
}
Expand All @@ -302,6 +330,15 @@ class HttpHandler {
}

void close() {
// Timers associated with these completers will be cancelled in
// the finally block from the try/catch that the completer is awaited in.
for (final completer in _pendingRateLimits) {
completer.completeError(
ClientClosedError(),
StackTrace.current,
);
}

httpClient.close();
_onRequestController.close();
_onResponseController.close();
Expand Down
161 changes: 161 additions & 0 deletions test/integration/async_dispose_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';

import 'package:nyxx/nyxx.dart';
import 'package:test/test.dart';

void main() {
final testToken = Platform.environment['TEST_TOKEN'];

test('client.close() disposes all async resources', skip: testToken != null ? false : 'No test token provided', () async {
final receivePort = ReceivePort();

Future<void> createAndDisposeClient(SendPort sendPort) async {
// Re-declare so we don't attempt to copy the outer context to the isolate.
final testToken = Platform.environment['TEST_TOKEN'];
final testGuild = Platform.environment['TEST_GUILD'];

final client = await Nyxx.connectGatewayWithOptions(GatewayApiOptions(
token: testToken!,
intents: GatewayIntents.allUnprivileged,
totalShards: 10, // Use many shards to ensure the client is still connecting some shards when we close it.
));

final user = await client.user.get();

await client.onReady.first;

// Queue many shard messages to ensure the rate limiter schedules them at a later time.
for (int i = 0; i < 200; i++) {
client.gateway.updatePresence(PresenceBuilder(status: CurrentUserStatus.online, isAfk: false));
}

// Get a handle to all async resources exposed on the client.

// Also handles all the onXXX streams, as they are derived from onEvent.
final clientEvents = client.onEvent.listen((_) {});
final gatewayMessages = client.gateway.messages.listen((_) {});
final gatewayEvents = client.gateway.events.listen((_) {});
final membersStream = (testGuild == null ? Stream.empty() : client.gateway.listGuildMembers(Snowflake.parse(testGuild))).listen((_) {});
final shards = [
for (final shard in client.gateway.shards) shard.listen((_) {}),
];
final shardReceiveStreams = [
for (final shard in client.gateway.shards) shard.receiveStream.listen((_) {}),
];
final requests = client.httpHandler.onRequest.listen((_) {});
final responses = client.httpHandler.onResponse.listen((_) {});
final rateLimits = client.httpHandler.onRateLimit.listen((_) {});
final assetStream = user.avatar.fetchStreamed().listen((_) {});

// This single request should be representative of all methods on managers that
// create a request from a known route, execute it using httpHandler.executeSafe,
// and then parse the result (the vast majority of all manager methods).
final userRequest = client.user.fetch();
final assetRequest = user.avatar.fetch();
final shardsDone = [
for (final shard in client.gateway.shards) shard.done,
];
final rateLimitedRequest = Future.wait([
// GET /gateway/bot seems to have high rate limits. 10 requests should be enough to trigger it.
for (int i = 0; i < 10; i++) client.gateway.fetchGatewayBot(),
]);

sendPort.send('closing');

// Create the future before calling close so that any error handlers are installed before
// the close happens, in order to avoid any uncaught errors.
final disposedFuture = Future.wait([
// Streams
clientEvents.asFuture(),
gatewayMessages.asFuture(),
gatewayEvents.asFuture(),
membersStream.asFuture(),
...shards.map((s) => s.asFuture()),
...shardReceiveStreams.map((s) => s.asFuture()),
requests.asFuture(),
responses.asFuture(),
rateLimits.asFuture(),
assetStream.asFuture(),

// Futures
userRequest,
assetRequest,
...shardsDone,
rateLimitedRequest,
]);

await client.close();

sendPort.send('closed');

// Expect all the async operations to finish in some way or another.
try {
await disposedFuture;
} catch (e) {
// Erroring is an accepted way of dealing with `client.close()` being called during an async operation.
}

sendPort.send('done');
}

final isolate = await Isolate.spawn(createAndDisposeClient, receivePort.sendPort, paused: true);
isolate.addOnExitListener(receivePort.sendPort, response: 'exited');
isolate.resume(isolate.pauseCapability!);

var isDone = false;
Timer? failTimer;

final subscription = receivePort.listen((message) {
if (message == 'closing') {
failTimer = Timer(Duration(seconds: 1), () {
receivePort.close();

// Plugins can intercept calls to close(), but we should try to make calls to close() without any plugins as
// fast as possible.
// There is some networking involved (closing the WS connection has to send a close frame), so we allow this
// to take _some_ time. Just not too much.
fail('Client took more than a second to close');
});
} else if (message == 'closed') {
failTimer!.cancel();
failTimer = Timer(Duration(milliseconds: 50), () {
receivePort.close();

fail('Pending async operations did not complete in 50ms');
});
} else if (message == 'done') {
isDone = true;

failTimer!.cancel();
failTimer = Timer(Duration(milliseconds: 500), () {
receivePort.close();

// If any async operations are still pending in the isolate (which they shouldn't), they will keep it alive.
// Therefore we expect the isolate to exit immediately after client.close() completes, since that should be
// the last async operation performed.
// We allow up to 500ms of "wiggle room" to account for cross-isolate communication and isolate shutdown time.
// This delay may not be enough to prevent this test from failing on very slow devices, or if the OS schedules
// the threads execution in an unfortunate order. If this test is failing, be absolutely sure it's not failing
// for some other reason before increasing this delay.
fail('Isolate did not shut down in 500ms');
});
} else {
assert(message == 'exited');

failTimer!.cancel();

// Completes the test.
receivePort.close();

// If isDone is false, then we didn't properly dispose of all async resources and left some "hanging", so
// awaiting them caused the isolate to exit prematurely.
// This also fails the test if the isolate exits because of an error.
expect(isDone, isTrue, reason: 'isolate entrypoint should run to completion');
}
});

await subscription.asFuture();
});
}

0 comments on commit e8d9d5d

Please sign in to comment.