diff --git a/dio/CHANGELOG.md b/dio/CHANGELOG.md index 927826a7f..f1ab769e6 100644 --- a/dio/CHANGELOG.md +++ b/dio/CHANGELOG.md @@ -5,7 +5,7 @@ See the [Migration Guide][] for the complete breaking changes list.** ## Unreleased -*None.* +- Fix `receiveTimeout` throws exception after the request has been cancelled. ## 5.4.1 diff --git a/dio/lib/src/response/response_stream_handler.dart b/dio/lib/src/response/response_stream_handler.dart index 5a5e4b903..a0a6725ee 100644 --- a/dio/lib/src/response/response_stream_handler.dart +++ b/dio/lib/src/response/response_stream_handler.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:typed_data'; import 'package:dio/dio.dart'; +import 'package:meta/meta.dart'; /// An internal helper which handles functionality /// common to all adapters. This function ensures that @@ -13,8 +14,9 @@ import 'package:dio/dio.dart'; /// - [options.cancelToken] for cancellation while receiving Stream handleResponseStream( RequestOptions options, - ResponseBody response, -) { + ResponseBody response, { + @visibleForTesting void Function()? onReceiveTimeoutWatchCancelled, +}) { final source = response.stream; final responseSink = StreamController(); late StreamSubscription> responseSubscription; @@ -30,31 +32,35 @@ Stream handleResponseStream( Timer? receiveTimer; void stopWatchReceiveTimeout() { + onReceiveTimeoutWatchCancelled?.call(); receiveTimer?.cancel(); receiveTimer = null; - receiveStopwatch.stop(); + receiveStopwatch + ..stop() + ..reset(); } void watchReceiveTimeout() { if (receiveTimeout <= Duration.zero) { return; } - receiveStopwatch.reset(); - if (!receiveStopwatch.isRunning) { - receiveStopwatch.start(); - } + // Not calling `stopWatchReceiveTimeout` to follow the semantic: + // Watching the new receive timeout does not indicate the watch + // has been cancelled. receiveTimer?.cancel(); + receiveStopwatch + ..reset() + ..start(); receiveTimer = Timer(receiveTimeout, () { - responseSink.addError( + stopWatchReceiveTimeout(); + response.close(); + responseSubscription.cancel(); + responseSink.addErrorAndClose( DioException.receiveTimeout( timeout: receiveTimeout, requestOptions: options, ), ); - response.close(); - responseSink.close(); - responseSubscription.cancel(); - stopWatchReceiveTimeout(); }); } @@ -72,8 +78,7 @@ Stream handleResponseStream( }, onError: (error, stackTrace) { stopWatchReceiveTimeout(); - responseSink.addError(error, stackTrace); - responseSink.close(); + responseSink.addErrorAndClose(error, stackTrace); }, onDone: () { stopWatchReceiveTimeout(); @@ -84,13 +89,20 @@ Stream handleResponseStream( ); options.cancelToken?.whenCancel.whenComplete(() { - /// Close the response stream upon a cancellation. - responseSubscription.cancel(); + stopWatchReceiveTimeout(); + // Close the response stream upon a cancellation. response.close(); - if (!responseSink.isClosed) { - responseSink.addError(options.cancelToken!.cancelError!); - responseSink.close(); - } + responseSubscription.cancel(); + responseSink.addErrorAndClose(options.cancelToken!.cancelError!); }); return responseSink.stream; } + +extension on StreamController { + void addErrorAndClose(Object error, [StackTrace? stackTrace]) { + if (!isClosed) { + addError(error, stackTrace); + close(); + } + } +} diff --git a/dio/test/response/response_stream_test.dart b/dio/test/response/response_stream_test.dart index e3b3e03b0..99deae278 100644 --- a/dio/test/response/response_stream_test.dart +++ b/dio/test/response/response_stream_test.dart @@ -234,5 +234,34 @@ void main() { expect(source.hasListener, isFalse); }); }); + + test('not watching the receive timeout after cancelled', () async { + bool timerCancelled = false; + final cancelToken = CancelToken(); + final stream = handleResponseStream( + RequestOptions( + cancelToken: cancelToken, + receiveTimeout: Duration(seconds: 1), + ), + ResponseBody(source.stream, 200), + onReceiveTimeoutWatchCancelled: () => timerCancelled = true, + ); + expect(source.hasListener, isTrue); + expectLater( + stream, + emitsInOrder([ + Uint8List.fromList([0]), + emitsError(matchesDioException( + DioExceptionType.cancel, + stackTraceContains: 'test/response/response_stream_test.dart', + )), + emitsDone, + ]), + ); + source.add(Uint8List.fromList([0])); + cancelToken.cancel(); + await Future.microtask(() {}); + expect(timerCancelled, isTrue); + }); }); }