Skip to content

Commit

Permalink
Extract receiveTimeout handling from adapters to response stream handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhnroyal committed Dec 19, 2023
1 parent 6a62c8a commit f5cebf8
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 155 deletions.
62 changes: 1 addition & 61 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -198,69 +198,12 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
}

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
responseSink.close();
responseSubscription.cancel();
responseStream.detachSocket().then((socket) => socket.destroy());
stopWatchReceiveTimeout();
});
}

responseSubscription = responseStream.cast<Uint8List>().listen(
(data) {
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

final headers = <String, List<String>>{};
responseStream.headers.forEach((key, values) {
headers[key] = values;
});
return ResponseBody(
responseSink.stream,
responseStream.cast(),
responseStream.statusCode,
headers: headers,
isRedirect:
Expand All @@ -270,10 +213,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
.toList(),
statusMessage: responseStream.reasonPhrase,
onClose: () {
responseSink.close();
responseSubscription.cancel();
responseStream.detachSocket().then((socket) => socket.destroy());
stopWatchReceiveTimeout();
},
);
}
Expand Down
7 changes: 3 additions & 4 deletions dio/lib/src/dio_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ abstract class DioMixin implements Dio {
);
final statusOk = reqOpt.validateStatus(responseBody.statusCode);
if (statusOk || reqOpt.receiveDataWhenStatusError == true) {
responseBody.stream = handleResponseStream(reqOpt, responseBody);

Object? data = await transformer.transformResponse(
reqOpt,
responseBody,
Expand All @@ -547,13 +549,10 @@ abstract class DioMixin implements Dio {
T != String &&
reqOpt.responseType == ResponseType.json) {
data = null;
} else if (reqOpt.responseType == ResponseType.stream &&
data is ResponseBody) {
data.stream = handleResponseStream(reqOpt, data);
}
ret.data = data;
} else {
await responseBody.stream.listen(null).cancel();
responseBody.close();
}
checkCancelled(cancelToken);
if (statusOk) {
Expand Down
63 changes: 53 additions & 10 deletions dio/lib/src/response/response_stream_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,80 @@ import 'dart:typed_data';

import 'package:dio/dio.dart';

/// An internal helper function to handle things around
/// streamed responses.
/// An internal helper which handles functionality
/// common to all adapters. This function ensures that
/// all resources are closed when the request is finished
/// or cancelled.
///
/// - [options.receiveTimeout] between received chunks
/// - [options.onReceiveProgress] progress for received chunks
/// - [options.cancelToken] for cancellation while receiving
Stream<Uint8List> handleResponseStream(
RequestOptions options,
ResponseBody response,
) {
final source = response.stream;

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

late int totalLength;
int receivedLength = 0;
if (options.onReceiveProgress != null) {
totalLength = response.contentLength;
}

int dataLength = 0;
final receiveTimeout = options.receiveTimeout ?? Duration.zero;
final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
response.close();
responseSink.close();
responseSubscription.cancel();
stopWatchReceiveTimeout();
});
}

responseSubscription = source.listen(
(data) {
responseSink.add(data);
options.onReceiveProgress?.call(
dataLength += data.length,
totalLength,
);
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
options.onReceiveProgress?.call(
receivedLength += data.length,
totalLength,
);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
Expand Down
39 changes: 2 additions & 37 deletions dio/lib/src/transformers/sync_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,43 +62,8 @@ class SyncTransformer extends Transformer {
return responseBody;
}

final int totalLength;
if (options.onReceiveProgress != null) {
totalLength = responseBody.contentLength;
} else {
totalLength = 0;
}

final streamCompleter = Completer<void>();
int finalLength = 0;
// Keep references to the data chunks and concatenate them later.
final chunks = <Uint8List>[];
final subscription = responseBody.stream.listen(
(Uint8List chunk) {
finalLength += chunk.length;
chunks.add(chunk);
options.onReceiveProgress?.call(finalLength, totalLength);
},
onError: (Object error, StackTrace stackTrace) {
streamCompleter.completeError(error, stackTrace);
},
onDone: () {
streamCompleter.complete();
},
cancelOnError: true,
);
options.cancelToken?.whenCancel.then((_) {
return subscription.cancel();
});
await streamCompleter.future;

// Copy all chunks into the final bytes.
final responseBytes = Uint8List(finalLength);
int chunkOffset = 0;
for (final chunk in chunks) {
responseBytes.setAll(chunkOffset, chunk);
chunkOffset += chunk.length;
}
final chunks = await responseBody.stream.toList();
final responseBytes = Uint8List.fromList(chunks.expand((c) => c).toList());

// Return the finalized bytes if the response type is bytes.
if (responseType == ResponseType.bytes) {
Expand Down
47 changes: 39 additions & 8 deletions dio/test/response/response_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ void main() {

test('completes', () async {
final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
),
RequestOptions(),
ResponseBody(
source.stream,
200,
Expand Down Expand Up @@ -80,7 +78,6 @@ void main() {

final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
onReceiveProgress: (c, t) {
count = c;
total = t;
Expand Down Expand Up @@ -132,7 +129,6 @@ void main() {

final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
onReceiveProgress: (c, t) {
count = c;
total = t;
Expand Down Expand Up @@ -177,9 +173,7 @@ void main() {

test('emits error on source error', () async {
final stream = handleResponseStream(
RequestOptions(
cancelToken: CancelToken(),
),
RequestOptions(),
ResponseBody(
source.stream,
200,
Expand All @@ -203,5 +197,42 @@ void main() {
expect(source.hasListener, isFalse);
});
});

test('emits error on receiveTimeout', () async {
final stream = handleResponseStream(
RequestOptions(
receiveTimeout: Duration(milliseconds: 100),
),
ResponseBody(
source.stream,
200,
),
);

expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
Uint8List.fromList([1]),
emitsError(matchesDioException(
DioExceptionType.receiveTimeout,
stackTraceContains: 'test/response/response_stream_test.dart',
)),
emitsDone,
]),
);

source.add(Uint8List.fromList([0]));
await Future.delayed(Duration(milliseconds: 90), () {
source.add(Uint8List.fromList([1]));
});
await Future.delayed(Duration(milliseconds: 110), () {
source.add(Uint8List.fromList([2]));
});

await Future.delayed(Duration(milliseconds: 100), () {
expect(source.hasListener, isFalse);
});
});
});
}
35 changes: 0 additions & 35 deletions plugins/http2_adapter/lib/src/http2_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,37 +121,6 @@ class Http2Adapter implements HttpClientAdapter {
bool needResponse = false;

final receiveTimeout = options.receiveTimeout ?? Duration.zero;
final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
responseSink.close();
responseSubscription.cancel();
stream.terminate();
stopWatchReceiveTimeout();
});
}

late int statusCode;
responseSubscription = stream.incomingMessages.listen(
Expand All @@ -175,14 +144,12 @@ class Http2Adapter implements HttpClientAdapter {
}
} else if (message is DataStreamMessage) {
if (needResponse) {
watchReceiveTimeout();
responseSink.add(
message.bytes is Uint8List
? message.bytes as Uint8List
: Uint8List.fromList(message.bytes),
);
} else {
stopWatchReceiveTimeout();
responseSubscription.cancel().whenComplete(() {
stream.terminate();
responseSink.close();
Expand All @@ -200,12 +167,10 @@ class Http2Adapter implements HttpClientAdapter {
} else {
responseSink.addError(error, stackTrace);
}
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
Expand Down

0 comments on commit f5cebf8

Please sign in to comment.