Skip to content

Commit

Permalink
Cancel streamed downloads & fix progress for downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhnroyal committed Nov 14, 2023
1 parent 78f3813 commit c74941f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 20 deletions.
1 change: 1 addition & 0 deletions dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ See the [Migration Guide][] for the complete breaking changes list.**
- Raise warning for `Map`s other than `Map<String, dynamic>` when encoding request data.
- Improve exception messages
- Allow `ResponseDecoder` and `RequestEncoder` to be async
- Fix cancellation and missing progress handling for streamed responses and downloads when using `IOHttpClientAdapter`.

## 5.3.3

Expand Down
59 changes: 40 additions & 19 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
requestStream,
cancelFuture,
));
if (cancelFuture != null) {
cancelFuture.whenComplete(() => operation.cancel());
}
cancelFuture?.whenComplete(() => operation.cancel());
return operation.value;
}

Expand Down Expand Up @@ -106,9 +104,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
request = await reqFuture;
}

if (cancelFuture != null) {
cancelFuture.whenComplete(() => request.abort());
}
cancelFuture?.whenComplete(() => request.abort());

// Set Headers
options.headers.forEach((k, v) {
Expand Down Expand Up @@ -189,23 +185,48 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
}

/// Get the cancellation exception here so it can be used
/// to cancel the stream on the next stream event.
DioException? cancellation;
cancelFuture?.whenComplete(() async {
cancellation = await (cancelFuture as Future<DioException>);
});

/// A helper function to add an error to the stream
/// and close the underlying socket.
void close(EventSink<Uint8List> sink, DioException exception) {
sink.addError(exception);
responseStream.detachSocket().then((socket) => socket.destroy());
}

int dataLength = 0;
final stream = responseStream.transform<Uint8List>(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
stopwatch.stop();
final duration = stopwatch.elapsed;
final receiveTimeout = options.receiveTimeout;
if (receiveTimeout != null && duration > receiveTimeout) {
sink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
responseStream.detachSocket().then((socket) => socket.destroy());
} else {
sink.add(Uint8List.fromList(data));
if (cancellation != null) {
/// Close the stream upon a cancellation.
return close(sink, cancellation!);
}
if (stopwatch.isRunning) {
stopwatch.stop();
final duration = stopwatch.elapsed;
final receiveTimeout = options.receiveTimeout;
if (receiveTimeout != null && duration > receiveTimeout) {
/// Close the stream upon a timeout.
return close(
sink,
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
}
}
options.onReceiveProgress?.call(
dataLength += data.length,
responseStream.contentLength,
);
sink.add(Uint8List.fromList(data));
},
),
);
Expand Down
27 changes: 27 additions & 0 deletions dio/test/cancel_token_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,33 @@ void main() {
CancelToken().cancel();
});

test(
'cancels streamed responses',
() async {
final dio = Dio()..options.baseUrl = 'https://httpbun.com/';

final cancelToken = CancelToken();
final response = await dio.get(
'bytes/${1024 * 1024 * 10}',
options: Options(responseType: ResponseType.stream),
cancelToken: cancelToken,
);

Future.delayed(const Duration(milliseconds: 750), () {
cancelToken.cancel();
});

await expectLater(
(response.data as ResponseBody).stream.last,
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.cancel &&
e.message!
.contains('The request was manually cancelled by the user'))),
);
},
testOn: 'vm',
);

test('cancels multiple requests', () async {
final client = MockHttpClient();
final token = CancelToken();
Expand Down
15 changes: 14 additions & 1 deletion dio/test/download_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,23 @@ import 'utils.dart';
void main() {
setUp(startServer);
tearDown(stopServer);

test('download1', () async {
const savePath = 'test/_download_test.md';
final dio = Dio()..options.baseUrl = serverUrl.toString();
await dio.download('/download', savePath);

int? total;
int? count;
await dio.download(
'/download',
savePath,
onReceiveProgress: (c, t) {
total = t;
count = c;
},
);

expect(count, total);

final f = File(savePath);
expect(f.readAsStringSync(), equals('I am a text file'));
Expand Down

0 comments on commit c74941f

Please sign in to comment.