From c74941f04041d5abdf531313264315df0e549c8c Mon Sep 17 00:00:00 2001 From: Peter Leibiger Date: Mon, 13 Nov 2023 14:17:46 +0100 Subject: [PATCH] Cancel streamed downloads & fix progress for downloads --- dio/CHANGELOG.md | 1 + dio/lib/src/adapters/io_adapter.dart | 59 +++++++++++++++++++--------- dio/test/cancel_token_test.dart | 27 +++++++++++++ dio/test/download_test.dart | 15 ++++++- 4 files changed, 82 insertions(+), 20 deletions(-) diff --git a/dio/CHANGELOG.md b/dio/CHANGELOG.md index b361c1b93..897483d4b 100644 --- a/dio/CHANGELOG.md +++ b/dio/CHANGELOG.md @@ -8,6 +8,7 @@ See the [Migration Guide][] for the complete breaking changes list.** - Raise warning for `Map`s other than `Map` when encoding request data. - Improve exception messages - Allow `ResponseDecoder` and `RequestEncoder` to be async +- Fix cancellation and missing progress handling for streamed responses and downloads when using `IOHttpClientAdapter`. ## 5.3.3 diff --git a/dio/lib/src/adapters/io_adapter.dart b/dio/lib/src/adapters/io_adapter.dart index 65e9ac6cf..e89a927d0 100644 --- a/dio/lib/src/adapters/io_adapter.dart +++ b/dio/lib/src/adapters/io_adapter.dart @@ -75,9 +75,7 @@ class IOHttpClientAdapter implements HttpClientAdapter { requestStream, cancelFuture, )); - if (cancelFuture != null) { - cancelFuture.whenComplete(() => operation.cancel()); - } + cancelFuture?.whenComplete(() => operation.cancel()); return operation.value; } @@ -106,9 +104,7 @@ class IOHttpClientAdapter implements HttpClientAdapter { request = await reqFuture; } - if (cancelFuture != null) { - cancelFuture.whenComplete(() => request.abort()); - } + cancelFuture?.whenComplete(() => request.abort()); // Set Headers options.headers.forEach((k, v) { @@ -189,23 +185,48 @@ class IOHttpClientAdapter implements HttpClientAdapter { } } + /// Get the cancellation exception here so it can be used + /// to cancel the stream on the next stream event. + DioException? cancellation; + cancelFuture?.whenComplete(() async { + cancellation = await (cancelFuture as Future); + }); + + /// A helper function to add an error to the stream + /// and close the underlying socket. + void close(EventSink sink, DioException exception) { + sink.addError(exception); + responseStream.detachSocket().then((socket) => socket.destroy()); + } + + int dataLength = 0; final stream = responseStream.transform( StreamTransformer.fromHandlers( handleData: (data, sink) { - stopwatch.stop(); - final duration = stopwatch.elapsed; - final receiveTimeout = options.receiveTimeout; - if (receiveTimeout != null && duration > receiveTimeout) { - sink.addError( - DioException.receiveTimeout( - timeout: receiveTimeout, - requestOptions: options, - ), - ); - responseStream.detachSocket().then((socket) => socket.destroy()); - } else { - sink.add(Uint8List.fromList(data)); + if (cancellation != null) { + /// Close the stream upon a cancellation. + return close(sink, cancellation!); } + if (stopwatch.isRunning) { + stopwatch.stop(); + final duration = stopwatch.elapsed; + final receiveTimeout = options.receiveTimeout; + if (receiveTimeout != null && duration > receiveTimeout) { + /// Close the stream upon a timeout. + return close( + sink, + DioException.receiveTimeout( + timeout: receiveTimeout, + requestOptions: options, + ), + ); + } + } + options.onReceiveProgress?.call( + dataLength += data.length, + responseStream.contentLength, + ); + sink.add(Uint8List.fromList(data)); }, ), ); diff --git a/dio/test/cancel_token_test.dart b/dio/test/cancel_token_test.dart index c6dc0da8c..a4e7b1cb8 100644 --- a/dio/test/cancel_token_test.dart +++ b/dio/test/cancel_token_test.dart @@ -29,6 +29,33 @@ void main() { CancelToken().cancel(); }); + test( + 'cancels streamed responses', + () async { + final dio = Dio()..options.baseUrl = 'https://httpbun.com/'; + + final cancelToken = CancelToken(); + final response = await dio.get( + 'bytes/${1024 * 1024 * 10}', + options: Options(responseType: ResponseType.stream), + cancelToken: cancelToken, + ); + + Future.delayed(const Duration(milliseconds: 750), () { + cancelToken.cancel(); + }); + + await expectLater( + (response.data as ResponseBody).stream.last, + throwsA(predicate((DioException e) => + e.type == DioExceptionType.cancel && + e.message! + .contains('The request was manually cancelled by the user'))), + ); + }, + testOn: 'vm', + ); + test('cancels multiple requests', () async { final client = MockHttpClient(); final token = CancelToken(); diff --git a/dio/test/download_test.dart b/dio/test/download_test.dart index 9f341ab14..2e19289eb 100644 --- a/dio/test/download_test.dart +++ b/dio/test/download_test.dart @@ -11,10 +11,23 @@ import 'utils.dart'; void main() { setUp(startServer); tearDown(stopServer); + test('download1', () async { const savePath = 'test/_download_test.md'; final dio = Dio()..options.baseUrl = serverUrl.toString(); - await dio.download('/download', savePath); + + int? total; + int? count; + await dio.download( + '/download', + savePath, + onReceiveProgress: (c, t) { + total = t; + count = c; + }, + ); + + expect(count, total); final f = File(savePath); expect(f.readAsStringSync(), equals('I am a text file'));