From e8d9d5d8c5cfd07ac005bb790186303a269855cd Mon Sep 17 00:00:00 2001 From: Abitofevrything <54505189+abitofevrything@users.noreply.github.com> Date: Thu, 2 May 2024 17:33:03 +0200 Subject: [PATCH] Ensure client.close() cleans up any pending async operations (#655) * Avoid leaving pending timers when client.close() is called * Prevent rate limited gateway messages from preventing client from closing * Add test ensuring client disposes all async resources * Increase timeout for cleaning up async objects to 50ms I think this keeps failing on my laptop because its specs are too low, and it takes more than 10ms for isolates to talk to each other --- lib/nyxx.dart | 3 +- lib/src/errors.dart | 6 + lib/src/gateway/gateway.dart | 23 +++- lib/src/gateway/shard_runner.dart | 29 ++-- lib/src/http/handler.dart | 41 +++++- test/integration/async_dispose_test.dart | 161 +++++++++++++++++++++++ 6 files changed, 247 insertions(+), 16 deletions(-) create mode 100644 test/integration/async_dispose_test.dart diff --git a/lib/nyxx.dart b/lib/nyxx.dart index dae1d438b..cfea65c54 100644 --- a/lib/nyxx.dart +++ b/lib/nyxx.dart @@ -14,7 +14,8 @@ export 'src/errors.dart' IntegrationNotFoundException, AlreadyAcknowledgedError, AlreadyRespondedError, - PluginError; + PluginError, + ClientClosedError; export 'src/builders/builder.dart' show Builder, CreateBuilder, UpdateBuilder; export 'src/builders/image.dart' show ImageBuilder; diff --git a/lib/src/errors.dart b/lib/src/errors.dart index c94dfef36..358a025f0 100644 --- a/lib/src/errors.dart +++ b/lib/src/errors.dart @@ -139,3 +139,9 @@ class PluginError extends Error { @override String toString() => message; } + +/// An error thrown when the client is closed while an operation is pending, or when an already closed client is used. +class ClientClosedError extends Error { + @override + String toString() => 'Client is closed'; +} diff --git a/lib/src/gateway/gateway.dart b/lib/src/gateway/gateway.dart index 15c1dd212..084c3d6e9 100644 --- a/lib/src/gateway/gateway.dart +++ b/lib/src/gateway/gateway.dart @@ -93,7 +93,7 @@ class Gateway extends GatewayManager with EventParser { /// See [Shard.latency] for details on how the latency is calculated. Duration get latency => shards.fold(Duration.zero, (previousValue, element) => previousValue + (element.latency ~/ shards.length)); - final List _startTimers = []; + final Set _startOrIdentifyTimers = {}; /// Create a new [Gateway]. Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() { @@ -112,10 +112,15 @@ class Gateway extends GatewayManager with EventParser { final rateLimitKey = shard.id % maxConcurrency; // Delay the shard starting until it is (approximately) also ready to identify. - _startTimers.add(Timer(identifyDelay * (shard.id ~/ maxConcurrency), () { + // This avoids opening many websocket connections simultaneously just to have most + // of them wait for their identify request. + late final Timer startTimer; + startTimer = Timer(identifyDelay * (shard.id ~/ maxConcurrency), () { logger.fine('Starting shard ${shard.id}'); shard.add(StartShard()); - })); + _startOrIdentifyTimers.remove(startTimer); + }); + _startOrIdentifyTimers.add(startTimer); shard.listen( (event) { @@ -137,7 +142,15 @@ class Gateway extends GatewayManager with EventParser { remainingIdentifyRequests--; shard.add(Identify()); - return await Future.delayed(identifyDelay); + + // Don't use Future.delayed so that we can exit early if close() is called. + // If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it. + // This code is roughly equivalent to `await Future.delayed(identifyDelay)` + final delayCompleter = Completer(); + final delayTimer = Timer(identifyDelay, delayCompleter.complete); + _startOrIdentifyTimers.add(delayTimer); + await delayCompleter.future; + _startOrIdentifyTimers.remove(delayTimer); }); } }, @@ -199,7 +212,7 @@ class Gateway extends GatewayManager with EventParser { Future close() async { _closing = true; // Make sure we don't start any shards after we have closed. - for (final timer in _startTimers) { + for (final timer in _startOrIdentifyTimers) { timer.cancel(); } await Future.wait(shards.map((shard) => shard.close())); diff --git a/lib/src/gateway/shard_runner.dart b/lib/src/gateway/shard_runner.dart index 00992a7df..756e08577 100644 --- a/lib/src/gateway/shard_runner.dart +++ b/lib/src/gateway/shard_runner.dart @@ -69,8 +69,13 @@ class ShardRunner { await connection!.add(e); } catch (error, s) { controller.add(ErrorReceived(error: error, stackTrace: s)); - // Try to send the event again. - sendController.add(e); + + // Prevent the recursive call to add() from looping too often. + await Future.delayed(Duration(milliseconds: 100)); + // Try to send the event again, unless we are disposing (in which case the controller will be closed). + if (!sendController.isClosed) { + sendController.add(e); + } } }) ..pause(); @@ -122,6 +127,8 @@ class ShardRunner { while (true) { try { // Check for dispose requests. If we should be disposing, exit the loop. + // Do this now instead of after the connection is closed in case we get + // a dispose request before the shard is even started. if (disposing) { controller.add(Disconnecting(reason: 'Dispose requested')); return; @@ -130,11 +137,6 @@ class ShardRunner { // Initialize lastHeartbeatAcked to `true` so we don't immediately disconnect in heartbeat(). lastHeartbeatAcked = true; - // Pause the send subscription until we are connected. - if (!sendHandler.isPaused) { - sendHandler.pause(); - } - // Open the websocket connection. connection = await ShardConnection.connect(gatewayUri.toString(), this); connection!.onSent.listen(controller.add); @@ -232,6 +234,12 @@ class ShardRunner { // Prevents the while-true loop from looping too often when no internet is available. await Future.delayed(Duration(milliseconds: 100)); } finally { + // Pause the send subscription until we are connected again. + // The handler may already be paused if the error occurred before we had identified. + if (!sendHandler.isPaused) { + sendHandler.pause(); + } + // Reset connection properties. await connection?.close(4000); connection = null; @@ -400,7 +408,12 @@ class ShardConnection extends Stream implements StreamSink { final rateLimitLimit = event.opcode == Opcode.heartbeat ? 0 : rateLimitHeartbeatReservation; while (rateLimitCount - _currentRateLimitCount <= rateLimitLimit) { - await _currentRateLimitEnd.future; + try { + await _currentRateLimitEnd.future; + } catch (e) { + // Swap out stack trace so the error message makes more sense. + Error.throwWithStackTrace(e, StackTrace.current); + } } if (event.opcode == Opcode.heartbeat) { diff --git a/lib/src/http/handler.dart b/lib/src/http/handler.dart index 268baf1c9..2bbc34494 100644 --- a/lib/src/http/handler.dart +++ b/lib/src/http/handler.dart @@ -5,6 +5,7 @@ import 'package:http/http.dart' hide MultipartRequest; import 'package:logging/logging.dart'; import 'package:nyxx/src/api_options.dart'; import 'package:nyxx/src/client.dart'; +import 'package:nyxx/src/errors.dart'; import 'package:nyxx/src/http/bucket.dart'; import 'package:nyxx/src/http/request.dart'; import 'package:nyxx/src/http/response.dart'; @@ -94,6 +95,8 @@ class HttpHandler { /// If no requests have been completed, this getter returns [Duration.zero]. Duration get realLatency => _realLatencies.isEmpty ? Duration.zero : (_realLatencies.reduce((a, b) => a + b) ~/ _realLatencies.length); + final Set> _pendingRateLimits = {}; + /// Create a new [HttpHandler]. /// /// {@macro http_handler} @@ -204,7 +207,19 @@ class HttpHandler { if (waitTime > Duration.zero) { logger.finer('Holding ${request.loggingId} for $waitTime'); _onRateLimitController.add((request: request, delay: waitTime, isGlobal: isGlobal, isAnticipated: true)); - await Future.delayed(waitTime); + + // Don't use Future.delayed so that we can exit early if close() is called. + // If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it. + // This is roughly equivalent to `await Future.delayed(waitTime)` + final completer = Completer(); + final timer = Timer(waitTime, completer.complete); + _pendingRateLimits.add(completer); + try { + await completer.future; + } finally { + _pendingRateLimits.remove(completer); + timer.cancel(); + } } } while (waitTime > Duration.zero); @@ -281,7 +296,20 @@ class HttpHandler { } _onRateLimitController.add((request: request, delay: retryAfter, isGlobal: isGlobal, isAnticipated: false)); - return Future.delayed(retryAfter, () => execute(request)); + + // Don't use Future.delayed so that we can exit early if close() is called. + // If we use Future.delayed, the program will remain alive until it is complete, even if nothing is waiting on it. + // This is roughly equivalent to `return Future.delayed(retryAfter, () => execute(request))` + final completer = Completer(); + final timer = Timer(retryAfter, completer.complete); + _pendingRateLimits.add(completer); + try { + await completer.future; + return execute(request); + } finally { + _pendingRateLimits.remove(completer); + timer.cancel(); + } } on TypeError { logger.shout('Invalid rate limit body for ${request.loggingId}! Your client is probably cloudflare banned!'); } @@ -302,6 +330,15 @@ class HttpHandler { } void close() { + // Timers associated with these completers will be cancelled in + // the finally block from the try/catch that the completer is awaited in. + for (final completer in _pendingRateLimits) { + completer.completeError( + ClientClosedError(), + StackTrace.current, + ); + } + httpClient.close(); _onRequestController.close(); _onResponseController.close(); diff --git a/test/integration/async_dispose_test.dart b/test/integration/async_dispose_test.dart new file mode 100644 index 000000000..4fc65e7a8 --- /dev/null +++ b/test/integration/async_dispose_test.dart @@ -0,0 +1,161 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:nyxx/nyxx.dart'; +import 'package:test/test.dart'; + +void main() { + final testToken = Platform.environment['TEST_TOKEN']; + + test('client.close() disposes all async resources', skip: testToken != null ? false : 'No test token provided', () async { + final receivePort = ReceivePort(); + + Future createAndDisposeClient(SendPort sendPort) async { + // Re-declare so we don't attempt to copy the outer context to the isolate. + final testToken = Platform.environment['TEST_TOKEN']; + final testGuild = Platform.environment['TEST_GUILD']; + + final client = await Nyxx.connectGatewayWithOptions(GatewayApiOptions( + token: testToken!, + intents: GatewayIntents.allUnprivileged, + totalShards: 10, // Use many shards to ensure the client is still connecting some shards when we close it. + )); + + final user = await client.user.get(); + + await client.onReady.first; + + // Queue many shard messages to ensure the rate limiter schedules them at a later time. + for (int i = 0; i < 200; i++) { + client.gateway.updatePresence(PresenceBuilder(status: CurrentUserStatus.online, isAfk: false)); + } + + // Get a handle to all async resources exposed on the client. + + // Also handles all the onXXX streams, as they are derived from onEvent. + final clientEvents = client.onEvent.listen((_) {}); + final gatewayMessages = client.gateway.messages.listen((_) {}); + final gatewayEvents = client.gateway.events.listen((_) {}); + final membersStream = (testGuild == null ? Stream.empty() : client.gateway.listGuildMembers(Snowflake.parse(testGuild))).listen((_) {}); + final shards = [ + for (final shard in client.gateway.shards) shard.listen((_) {}), + ]; + final shardReceiveStreams = [ + for (final shard in client.gateway.shards) shard.receiveStream.listen((_) {}), + ]; + final requests = client.httpHandler.onRequest.listen((_) {}); + final responses = client.httpHandler.onResponse.listen((_) {}); + final rateLimits = client.httpHandler.onRateLimit.listen((_) {}); + final assetStream = user.avatar.fetchStreamed().listen((_) {}); + + // This single request should be representative of all methods on managers that + // create a request from a known route, execute it using httpHandler.executeSafe, + // and then parse the result (the vast majority of all manager methods). + final userRequest = client.user.fetch(); + final assetRequest = user.avatar.fetch(); + final shardsDone = [ + for (final shard in client.gateway.shards) shard.done, + ]; + final rateLimitedRequest = Future.wait([ + // GET /gateway/bot seems to have high rate limits. 10 requests should be enough to trigger it. + for (int i = 0; i < 10; i++) client.gateway.fetchGatewayBot(), + ]); + + sendPort.send('closing'); + + // Create the future before calling close so that any error handlers are installed before + // the close happens, in order to avoid any uncaught errors. + final disposedFuture = Future.wait([ + // Streams + clientEvents.asFuture(), + gatewayMessages.asFuture(), + gatewayEvents.asFuture(), + membersStream.asFuture(), + ...shards.map((s) => s.asFuture()), + ...shardReceiveStreams.map((s) => s.asFuture()), + requests.asFuture(), + responses.asFuture(), + rateLimits.asFuture(), + assetStream.asFuture(), + + // Futures + userRequest, + assetRequest, + ...shardsDone, + rateLimitedRequest, + ]); + + await client.close(); + + sendPort.send('closed'); + + // Expect all the async operations to finish in some way or another. + try { + await disposedFuture; + } catch (e) { + // Erroring is an accepted way of dealing with `client.close()` being called during an async operation. + } + + sendPort.send('done'); + } + + final isolate = await Isolate.spawn(createAndDisposeClient, receivePort.sendPort, paused: true); + isolate.addOnExitListener(receivePort.sendPort, response: 'exited'); + isolate.resume(isolate.pauseCapability!); + + var isDone = false; + Timer? failTimer; + + final subscription = receivePort.listen((message) { + if (message == 'closing') { + failTimer = Timer(Duration(seconds: 1), () { + receivePort.close(); + + // Plugins can intercept calls to close(), but we should try to make calls to close() without any plugins as + // fast as possible. + // There is some networking involved (closing the WS connection has to send a close frame), so we allow this + // to take _some_ time. Just not too much. + fail('Client took more than a second to close'); + }); + } else if (message == 'closed') { + failTimer!.cancel(); + failTimer = Timer(Duration(milliseconds: 50), () { + receivePort.close(); + + fail('Pending async operations did not complete in 50ms'); + }); + } else if (message == 'done') { + isDone = true; + + failTimer!.cancel(); + failTimer = Timer(Duration(milliseconds: 500), () { + receivePort.close(); + + // If any async operations are still pending in the isolate (which they shouldn't), they will keep it alive. + // Therefore we expect the isolate to exit immediately after client.close() completes, since that should be + // the last async operation performed. + // We allow up to 500ms of "wiggle room" to account for cross-isolate communication and isolate shutdown time. + // This delay may not be enough to prevent this test from failing on very slow devices, or if the OS schedules + // the threads execution in an unfortunate order. If this test is failing, be absolutely sure it's not failing + // for some other reason before increasing this delay. + fail('Isolate did not shut down in 500ms'); + }); + } else { + assert(message == 'exited'); + + failTimer!.cancel(); + + // Completes the test. + receivePort.close(); + + // If isDone is false, then we didn't properly dispose of all async resources and left some "hanging", so + // awaiting them caused the isolate to exit prematurely. + // This also fails the test if the isolate exits because of an error. + expect(isDone, isTrue, reason: 'isolate entrypoint should run to completion'); + } + }); + + await subscription.asFuture(); + }); +}