From df5b186b868de748473468238935d490a8c6964a Mon Sep 17 00:00:00 2001 From: Peter Leibiger Date: Tue, 14 Nov 2023 16:06:40 +0100 Subject: [PATCH] Cancel streamed download cancellation for Http2Adapter --- dio/test/cancel_token_test.dart | 2 + plugins/http2_adapter/CHANGELOG.md | 1 + .../http2_adapter/lib/src/http2_adapter.dart | 16 ++-- plugins/http2_adapter/pubspec.yaml | 1 + plugins/http2_adapter/test/request_test.dart | 94 +++++++++++++++++++ 5 files changed, 107 insertions(+), 7 deletions(-) create mode 100644 plugins/http2_adapter/test/request_test.dart diff --git a/dio/test/cancel_token_test.dart b/dio/test/cancel_token_test.dart index a4e7b1cb8..b81adf383 100644 --- a/dio/test/cancel_token_test.dart +++ b/dio/test/cancel_token_test.dart @@ -45,6 +45,8 @@ void main() { cancelToken.cancel(); }); + expect(response.statusCode, 200); + await expectLater( (response.data as ResponseBody).stream.last, throwsA(predicate((DioException e) => diff --git a/plugins/http2_adapter/CHANGELOG.md b/plugins/http2_adapter/CHANGELOG.md index 8e3ef62ce..467a87137 100644 --- a/plugins/http2_adapter/CHANGELOG.md +++ b/plugins/http2_adapter/CHANGELOG.md @@ -4,6 +4,7 @@ - Implement `sendTimeout` and `receiveTimeout` for the adapter. - Fix redirect not working when requestStream is null. +- Fix cancellation for streamed responses and downloads. ## 2.3.1+1 diff --git a/plugins/http2_adapter/lib/src/http2_adapter.dart b/plugins/http2_adapter/lib/src/http2_adapter.dart index e235d0e4a..a2e80feaf 100644 --- a/plugins/http2_adapter/lib/src/http2_adapter.dart +++ b/plugins/http2_adapter/lib/src/http2_adapter.dart @@ -74,13 +74,6 @@ class Http2Adapter implements HttpClientAdapter { // Creates a new outgoing stream. final stream = transport.makeRequest(headers); - // ignore: unawaited_futures - cancelFuture?.whenComplete(() { - Future(() { - stream.terminate(); - }); - }); - List? list; final hasRequestData = requestStream != null; if (!excludeMethods.contains(options.method) && hasRequestData) { @@ -110,6 +103,7 @@ class Http2Adapter implements HttpClientAdapter { await stream.outgoingMessages.close(); final sc = StreamController(); + final responseHeaders = Headers(); final completer = Completer(); late int statusCode; @@ -158,6 +152,14 @@ class Http2Adapter implements HttpClientAdapter { cancelOnError: true, ); + /// Cancel any up/download streams if the [CancelToken] is cancelled + /// and propagate the [DioException] to the response stream. + cancelFuture?.whenComplete(() { + stream.terminate(); + sc.addError(options.cancelToken!.cancelError!); + subscription.cancel(); + }); + final receiveTimeout = options.receiveTimeout; if (receiveTimeout != null) { await completer.future.timeout( diff --git a/plugins/http2_adapter/pubspec.yaml b/plugins/http2_adapter/pubspec.yaml index 1c5be37b1..fa3726fdb 100644 --- a/plugins/http2_adapter/pubspec.yaml +++ b/plugins/http2_adapter/pubspec.yaml @@ -22,3 +22,4 @@ dev_dependencies: crypto: ^3.0.2 lints: any test: ^1.16.4 + path: ^1.8.0 diff --git a/plugins/http2_adapter/test/request_test.dart b/plugins/http2_adapter/test/request_test.dart new file mode 100644 index 000000000..b66a6e3d4 --- /dev/null +++ b/plugins/http2_adapter/test/request_test.dart @@ -0,0 +1,94 @@ +@TestOn('vm') +import 'dart:io'; + +import 'package:dio/dio.dart'; +import 'package:dio_http2_adapter/dio_http2_adapter.dart'; +import 'package:path/path.dart' as p; +import 'package:test/test.dart'; + +void main() { + late Directory tmp; + + final dio = Dio() + ..httpClientAdapter = Http2Adapter(null) + ..options.baseUrl = 'https://httpbun.com/'; + + setUpAll(() { + tmp = Directory.systemTemp.createTempSync('dio_http_download_test'); + addTearDown(() { + tmp.deleteSync(recursive: true); + }); + }); + + group('requests', () { + test('download README.md', () async { + final path = p.join(tmp.path, 'README.md'); + + int count = 0; + int total = 0; + final source = File('README.md'); + await dio.download( + '/payload', + path, + options: Options( + method: 'POST', + contentType: 'text/plain', + ), + data: source.openRead(), + onReceiveProgress: (c, t) { + total = t; + count = c; + }, + ); + + final f = File(path); + expect(f.readAsStringSync(), source.readAsStringSync()); + expect(count, await source.length()); + + // TODO: disabled pending https://github.com/sharat87/httpbun/issues/13 + // expect(count, total); + }); + + test('cancels request', () async { + final cancelToken = CancelToken(); + + expectLater( + dio.get( + 'get', + cancelToken: cancelToken, + ), + throwsA(predicate((DioException e) => + e.type == DioExceptionType.cancel && + e.message! + .contains('The request was manually cancelled by the user'))), + ); + + Future.delayed(const Duration(milliseconds: 50), () { + cancelToken.cancel(); + }); + }); + + test('cancels streamed responses', () async { + final cancelToken = CancelToken(); + final response = await dio.get( + 'bytes/${1024 * 1024 * 10}', + options: Options(responseType: ResponseType.stream), + cancelToken: cancelToken, + ); + + Future.delayed(const Duration(milliseconds: 750), () { + cancelToken.cancel(); + }); + + expect(response.statusCode, 200); + + await expectLater( + (response.data as ResponseBody).stream.last, + throwsA(predicate((DioException e) => + e.type == DioExceptionType.cancel && + e.message! + .contains('The request was manually cancelled by the user'))), + ); + }); + }); +}