Skip to content

Commit

Permalink
Cancel streamed download cancellation for Http2Adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhnroyal committed Nov 14, 2023
1 parent 33ddefe commit df5b186
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 7 deletions.
2 changes: 2 additions & 0 deletions dio/test/cancel_token_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ void main() {
cancelToken.cancel();
});

expect(response.statusCode, 200);

await expectLater(
(response.data as ResponseBody).stream.last,
throwsA(predicate((DioException e) =>
Expand Down
1 change: 1 addition & 0 deletions plugins/http2_adapter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Implement `sendTimeout` and `receiveTimeout` for the adapter.
- Fix redirect not working when requestStream is null.
- Fix cancellation for streamed responses and downloads.

## 2.3.1+1

Expand Down
16 changes: 9 additions & 7 deletions plugins/http2_adapter/lib/src/http2_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ class Http2Adapter implements HttpClientAdapter {
// Creates a new outgoing stream.
final stream = transport.makeRequest(headers);

// ignore: unawaited_futures
cancelFuture?.whenComplete(() {
Future(() {
stream.terminate();
});
});

List<Uint8List>? list;
final hasRequestData = requestStream != null;
if (!excludeMethods.contains(options.method) && hasRequestData) {
Expand Down Expand Up @@ -110,6 +103,7 @@ class Http2Adapter implements HttpClientAdapter {
await stream.outgoingMessages.close();

final sc = StreamController<Uint8List>();

final responseHeaders = Headers();
final completer = Completer();
late int statusCode;
Expand Down Expand Up @@ -158,6 +152,14 @@ class Http2Adapter implements HttpClientAdapter {
cancelOnError: true,
);

/// Cancel any up/download streams if the [CancelToken] is cancelled
/// and propagate the [DioException] to the response stream.
cancelFuture?.whenComplete(() {
stream.terminate();
sc.addError(options.cancelToken!.cancelError!);
subscription.cancel();
});

final receiveTimeout = options.receiveTimeout;
if (receiveTimeout != null) {
await completer.future.timeout(
Expand Down
1 change: 1 addition & 0 deletions plugins/http2_adapter/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ dev_dependencies:
crypto: ^3.0.2
lints: any
test: ^1.16.4
path: ^1.8.0
94 changes: 94 additions & 0 deletions plugins/http2_adapter/test/request_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
@TestOn('vm')
import 'dart:io';

import 'package:dio/dio.dart';
import 'package:dio_http2_adapter/dio_http2_adapter.dart';
import 'package:path/path.dart' as p;
import 'package:test/test.dart';

void main() {
late Directory tmp;

final dio = Dio()
..httpClientAdapter = Http2Adapter(null)
..options.baseUrl = 'https://httpbun.com/';

setUpAll(() {
tmp = Directory.systemTemp.createTempSync('dio_http_download_test');
addTearDown(() {
tmp.deleteSync(recursive: true);
});
});

group('requests', () {
test('download README.md', () async {
final path = p.join(tmp.path, 'README.md');

int count = 0;
int total = 0;
final source = File('README.md');
await dio.download(
'/payload',
path,
options: Options(
method: 'POST',
contentType: 'text/plain',
),
data: source.openRead(),
onReceiveProgress: (c, t) {
total = t;
count = c;
},
);

final f = File(path);
expect(f.readAsStringSync(), source.readAsStringSync());
expect(count, await source.length());

// TODO: disabled pending https://github.com/sharat87/httpbun/issues/13
// expect(count, total);
});

test('cancels request', () async {
final cancelToken = CancelToken();

expectLater(
dio.get(
'get',
cancelToken: cancelToken,
),
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.cancel &&
e.message!
.contains('The request was manually cancelled by the user'))),
);

Future.delayed(const Duration(milliseconds: 50), () {
cancelToken.cancel();
});
});

test('cancels streamed responses', () async {
final cancelToken = CancelToken();
final response = await dio.get(
'bytes/${1024 * 1024 * 10}',
options: Options(responseType: ResponseType.stream),
cancelToken: cancelToken,
);

Future.delayed(const Duration(milliseconds: 750), () {
cancelToken.cancel();
});

expect(response.statusCode, 200);

await expectLater(
(response.data as ResponseBody).stream.last,
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.cancel &&
e.message!
.contains('The request was manually cancelled by the user'))),
);
});
});
}

0 comments on commit df5b186

Please sign in to comment.