Skip to content

Commit

Permalink
Extract progress & cancellation for streamed responses to DioMixin
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhnroyal committed Dec 17, 2023
1 parent f6ee2cc commit 6f814c6
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 58 deletions.
2 changes: 1 addition & 1 deletion dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ See the [Migration Guide][] for the complete breaking changes list.**

- Provide fix suggestions for `dart fix`.
- Fix `receiveTimeout` for streamed responses.
- Fix cancellation and missing progress handling for streamed responses and downloads when using `IOHttpClientAdapter`.
- Fix cancellation and missing progress handling for streamed responses when using `IOHttpClientAdapter`.

## 5.4.0

Expand Down
5 changes: 5 additions & 0 deletions dio/lib/src/adapter.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:convert';
import 'dart:typed_data';

import 'headers.dart';
import 'options.dart';
import 'redirect_record.dart';

Expand Down Expand Up @@ -89,6 +90,10 @@ class ResponseBody {
/// HTTP status code.
int statusCode;

/// Content length of the response or -1 if not specified
int get contentLength =>
int.parse(headers[Headers.contentLengthHeader]?.first ?? '-1');

/// Returns the reason phrase corresponds to the status code.
/// The message can be [HttpRequest.statusText]
/// or [HttpClientResponse.reasonPhrase].
Expand Down
11 changes: 0 additions & 11 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,6 @@ class IOHttpClientAdapter implements HttpClientAdapter {
cancelOnError: true,
);

cancelFuture?.whenComplete(() {
/// Close the stream upon a cancellation.
responseSubscription.cancel();
if (!responseSink.isClosed) {
/// If the request was aborted via [Request.abort], then the
/// [responseSubscription] may have emitted a done event already.
responseSink.addError(options.cancelToken!.cancelError!);
responseSink.close();
}
});

final headers = <String, List<String>>{};
responseStream.headers.forEach((key, values) {
headers[key] = values;
Expand Down
4 changes: 4 additions & 0 deletions dio/lib/src/dio_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'headers.dart';
import 'interceptors/imply_content_type.dart';
import 'options.dart';
import 'response.dart';
import 'response/response_stream_handler.dart';
import 'transformer.dart';
import 'transformers/background_transformer.dart';

Expand Down Expand Up @@ -546,6 +547,9 @@ abstract class DioMixin implements Dio {
T != String &&
reqOpt.responseType == ResponseType.json) {
data = null;
} else if (reqOpt.responseType == ResponseType.stream &&
data is ResponseBody) {
data.stream = handleResponseStream(reqOpt, data);
}
ret.data = data;
} else {
Expand Down
52 changes: 52 additions & 0 deletions dio/lib/src/response/response_stream_handler.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:dio/dio.dart';

/// An internal helper function to handle things around
/// streamed responses.
Stream<Uint8List> handleResponseStream(
RequestOptions options,
ResponseBody response,
) {
final source = response.stream;

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

late int totalLength;
if (options.onReceiveProgress != null) {
totalLength = response.contentLength;
}

int dataLength = 0;
responseSubscription = source.listen(
(data) {
responseSink.add(data);
options.onReceiveProgress?.call(
dataLength += data.length,
totalLength,
);
},
onError: (error, stackTrace) {
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

options.cancelToken?.whenCancel.whenComplete(() {
/// Close the response stream upon a cancellation.
responseSubscription.cancel();
if (!responseSink.isClosed) {
responseSink.addError(options.cancelToken!.cancelError!);
responseSink.close();
}
});
return responseSink.stream;
}
4 changes: 1 addition & 3 deletions dio/lib/src/transformers/sync_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ class SyncTransformer extends Transformer {

final int totalLength;
if (options.onReceiveProgress != null) {
totalLength = int.parse(
responseBody.headers[Headers.contentLengthHeader]?.first ?? '-1',
);
totalLength = responseBody.contentLength;
} else {
totalLength = 0;
}
Expand Down
16 changes: 7 additions & 9 deletions dio/test/cancel_token_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,23 @@ void main() {
final dio = Dio()..options.baseUrl = 'https://httpbun.com/';

final cancelToken = CancelToken();

final response = await dio.get(
'bytes/${1024 * 1024 * 10}',
'bytes/${1024 * 1024 * 100}',
options: Options(responseType: ResponseType.stream),
cancelToken: cancelToken,
onReceiveProgress: (c, t) {
if (c > 5000) {
cancelToken.cancel();
}
},
);

expect(response.statusCode, 200);

Future.delayed(const Duration(milliseconds: 750), () {
cancelToken.cancel();
});

await expectLater(
(response.data as ResponseBody).stream.last,
throwsDioException(
DioExceptionType.cancel,
stackTraceContains: 'test/cancel_token_test.dart',
matcher: (DioException e) => e.message!
.contains('The request was manually cancelled by the user'),
),
);
},
Expand Down
4 changes: 2 additions & 2 deletions dio/test/download_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ void main() {
},
);

expect(count, total);

final f = File(savePath);
expect(f.readAsStringSync(), equals('I am a text file'));
expect(count, f.readAsBytesSync().length);
expect(count, total);
});

test('download2', () async {
Expand Down
207 changes: 207 additions & 0 deletions dio/test/response/response_stream_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:dio/dio.dart';
import 'package:dio/src/response/response_stream_handler.dart';
import 'package:test/test.dart';

import '../utils.dart';

void main() {
group(handleResponseStream, () {
late StreamController<Uint8List> source;

setUp(() {
source = StreamController<Uint8List>();
});

test('completes', () async {
final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
),
ResponseBody(
source.stream,
200,
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
Uint8List.fromList([1, 2]),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));
source.add(Uint8List.fromList([1, 2]));
source.close();
});

test('unsubscribes from source on cancel', () async {
final cancelToken = CancelToken();
final stream = handleResponseStream(
RequestOptions(
cancelToken: cancelToken,
),
ResponseBody(
source.stream,
200,
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
emitsError(matchesDioException(
DioExceptionType.cancel,
stackTraceContains: 'test/response/response_stream_test.dart',
)),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));

expect(source.hasListener, isTrue);
cancelToken.cancel();

await Future.delayed(Duration(milliseconds: 100), () {
expect(source.hasListener, isFalse);
});
});

test('sends progress with total', () async {
int count = 0;
int total = 0;

final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
onReceiveProgress: (c, t) {
count = c;
total = t;
},
),
ResponseBody(
source.stream,
200,
headers: {
Headers.contentLengthHeader: ['6'],
},
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
Uint8List.fromList([1, 2]),
Uint8List.fromList([3, 4, 5]),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 1);
expect(total, 6);
});

source.add(Uint8List.fromList([1, 2]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 3);
expect(total, 6);
});

source.add(Uint8List.fromList([3, 4, 5]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 6);
expect(total, 6);
});

source.close();
});

test('sends progress without total', () async {
int count = 0;
int total = 0;

final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
onReceiveProgress: (c, t) {
count = c;
total = t;
},
),
ResponseBody(
source.stream,
200,
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
Uint8List.fromList([1, 2]),
Uint8List.fromList([3, 4, 5]),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 1);
expect(total, -1);
});

source.add(Uint8List.fromList([1, 2]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 3);
expect(total, -1);
});

source.add(Uint8List.fromList([3, 4, 5]));
await Future.delayed(Duration(milliseconds: 100), () {
expect(count, 6);
expect(total, -1);
});

source.close();
});

test('emits error on source error', () async {
final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
),
ResponseBody(
source.stream,
200,
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
emitsError(isA<FormatException>()),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));
source.addError(FormatException());
source.close();

await Future.delayed(Duration(milliseconds: 100), () {
expect(source.hasListener, isFalse);
});
});
});
}
Loading

0 comments on commit 6f814c6

Please sign in to comment.