Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streamed response cancellation and progress #2068

Merged
merged 9 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ See the [Migration Guide][] for the complete breaking changes list.**

- Provide fix suggestions for `dart fix`.
- Fix `receiveTimeout` for streamed responses.
- Fix cancellation for streamed responses and downloads when using `IOHttpClientAdapter`.
- Fix receive progress for streamed responses and downloads when using `IOHttpClientAdapter`.
kuhnroyal marked this conversation as resolved.
Show resolved Hide resolved

## 5.4.0

Expand Down
25 changes: 22 additions & 3 deletions dio/lib/src/adapter.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:meta/meta.dart';

import 'headers.dart';
import 'options.dart';
import 'redirect_record.dart';

Expand Down Expand Up @@ -57,28 +60,34 @@ class ResponseBody {
this.statusMessage,
this.isRedirect = false,
this.redirects,
void Function()? onClose,
Map<String, List<String>>? headers,
}) : headers = headers ?? {};
}) : headers = headers ?? {},
_onClose = onClose;

ResponseBody.fromString(
String text,
this.statusCode, {
this.statusMessage,
this.isRedirect = false,
void Function()? onClose,
Map<String, List<String>>? headers,
}) : stream = Stream.value(Uint8List.fromList(utf8.encode(text))),
headers = headers ?? {};
headers = headers ?? {},
_onClose = onClose;

ResponseBody.fromBytes(
List<int> bytes,
this.statusCode, {
this.statusMessage,
this.isRedirect = false,
void Function()? onClose,
Map<String, List<String>>? headers,
}) : stream = Stream.value(
bytes is Uint8List ? bytes : Uint8List.fromList(bytes),
),
headers = headers ?? {};
headers = headers ?? {},
_onClose = onClose;

/// Whether this response is a redirect.
final bool isRedirect;
Expand All @@ -89,6 +98,10 @@ class ResponseBody {
/// HTTP status code.
int statusCode;

/// Content length of the response or -1 if not specified
int get contentLength =>
int.parse(headers[Headers.contentLengthHeader]?.first ?? '-1');

/// Returns the reason phrase corresponds to the status code.
/// The message can be [HttpRequest.statusText]
/// or [HttpClientResponse.reasonPhrase].
Expand All @@ -102,4 +115,10 @@ class ResponseBody {

/// The extra field which will pass-through to the [Response.extra].
Map<String, dynamic> extra = {};

final void Function()? _onClose;

/// Closes the request & frees the underlying resources.
@internal
void close() => _onClose?.call();
}
70 changes: 6 additions & 64 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
requestStream,
cancelFuture,
));
if (cancelFuture != null) {
cancelFuture.whenComplete(() => operation.cancel());
}
cancelFuture?.whenComplete(() => operation.cancel());
return operation.value;
}

Expand Down Expand Up @@ -106,9 +104,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
request = await reqFuture;
}

if (cancelFuture != null) {
cancelFuture.whenComplete(() => request.abort());
}
cancelFuture?.whenComplete(() => request.abort());

// Set Headers
options.headers.forEach((key, value) {
Expand Down Expand Up @@ -202,69 +198,12 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
}

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
responseSink.close();
responseSubscription.cancel();
responseStream.detachSocket().then((socket) => socket.destroy());
stopWatchReceiveTimeout();
});
}

responseSubscription = responseStream.cast<Uint8List>().listen(
(data) {
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

final headers = <String, List<String>>{};
responseStream.headers.forEach((key, values) {
headers[key] = values;
});
return ResponseBody(
responseSink.stream,
responseStream.cast(),
responseStream.statusCode,
headers: headers,
isRedirect:
Expand All @@ -273,6 +212,9 @@ class IOHttpClientAdapter implements HttpClientAdapter {
.map((e) => RedirectRecord(e.statusCode, e.method, e.location))
.toList(),
statusMessage: responseStream.reasonPhrase,
onClose: () {
responseStream.detachSocket().then((socket) => socket.destroy());
},
);
}

Expand Down
2 changes: 1 addition & 1 deletion dio/lib/src/dio/dio_for_native.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DioForNative with DioMixin implements Dio {
data: data,
options: options,
queryParameters: queryParameters,
cancelToken: cancelToken ?? CancelToken(),
cancelToken: cancelToken,
);
} on DioException catch (e) {
if (e.type == DioExceptionType.badResponse) {
Expand Down
5 changes: 4 additions & 1 deletion dio/lib/src/dio_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'headers.dart';
import 'interceptors/imply_content_type.dart';
import 'options.dart';
import 'response.dart';
import 'response/response_stream_handler.dart';
import 'transformer.dart';
import 'transformers/background_transformer.dart';

Expand Down Expand Up @@ -535,6 +536,8 @@ abstract class DioMixin implements Dio {
);
final statusOk = reqOpt.validateStatus(responseBody.statusCode);
if (statusOk || reqOpt.receiveDataWhenStatusError == true) {
responseBody.stream = handleResponseStream(reqOpt, responseBody);

Object? data = await transformer.transformResponse(
reqOpt,
responseBody,
Expand All @@ -549,7 +552,7 @@ abstract class DioMixin implements Dio {
}
ret.data = data;
} else {
await responseBody.stream.listen(null).cancel();
responseBody.close();
}
checkCancelled(cancelToken);
if (statusOk) {
Expand Down
96 changes: 96 additions & 0 deletions dio/lib/src/response/response_stream_handler.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:dio/dio.dart';

/// An internal helper which handles functionality
/// common to all adapters. This function ensures that
/// all resources are closed when the request is finished
/// or cancelled.
///
/// - [options.receiveTimeout] between received chunks
/// - [options.onReceiveProgress] progress for received chunks
/// - [options.cancelToken] for cancellation while receiving
Stream<Uint8List> handleResponseStream(
RequestOptions options,
ResponseBody response,
) {
final source = response.stream;
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

late int totalLength;
int receivedLength = 0;
if (options.onReceiveProgress != null) {
totalLength = response.contentLength;
}

final receiveTimeout = options.receiveTimeout ?? Duration.zero;
final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
response.close();
responseSink.close();
responseSubscription.cancel();
stopWatchReceiveTimeout();
});
}

responseSubscription = source.listen(
(data) {
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
options.onReceiveProgress?.call(
receivedLength += data.length,
totalLength,
);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

options.cancelToken?.whenCancel.whenComplete(() {
/// Close the response stream upon a cancellation.
responseSubscription.cancel();
response.close();
if (!responseSink.isClosed) {
responseSink.addError(options.cancelToken!.cancelError!);
responseSink.close();
}
});
return responseSink.stream;
}
41 changes: 2 additions & 39 deletions dio/lib/src/transformers/sync_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,8 @@ class SyncTransformer extends Transformer {
return responseBody;
}

final int totalLength;
if (options.onReceiveProgress != null) {
totalLength = int.parse(
responseBody.headers[Headers.contentLengthHeader]?.first ?? '-1',
);
} else {
totalLength = 0;
}

final streamCompleter = Completer<void>();
int finalLength = 0;
// Keep references to the data chunks and concatenate them later.
final chunks = <Uint8List>[];
final subscription = responseBody.stream.listen(
(Uint8List chunk) {
finalLength += chunk.length;
chunks.add(chunk);
options.onReceiveProgress?.call(finalLength, totalLength);
},
onError: (Object error, StackTrace stackTrace) {
streamCompleter.completeError(error, stackTrace);
},
onDone: () {
streamCompleter.complete();
},
cancelOnError: true,
);
options.cancelToken?.whenCancel.then((_) {
return subscription.cancel();
});
await streamCompleter.future;

// Copy all chunks into the final bytes.
final responseBytes = Uint8List(finalLength);
int chunkOffset = 0;
for (final chunk in chunks) {
responseBytes.setAll(chunkOffset, chunk);
chunkOffset += chunk.length;
}
final chunks = await responseBody.stream.toList();
final responseBytes = Uint8List.fromList(chunks.expand((c) => c).toList());

// Return the finalized bytes if the response type is bytes.
if (responseType == ResponseType.bytes) {
Expand Down
Loading