Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix receiveTimeout for streamed responses #2050

Merged
merged 13 commits into from
Dec 3, 2023
1 change: 1 addition & 0 deletions dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ See the [Migration Guide][] for the complete breaking changes list.**
## Unreleased

- Provide fix suggestions for `dart fix`.
- Fix `receiveTimeout` for streamed responses.

## 5.4.0

Expand Down
46 changes: 28 additions & 18 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
}

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

Expand All @@ -211,7 +215,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
receiveStopwatch.stop();
}

void watchReceiveTimeout(EventSink<Uint8List> sink) {
void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
Expand All @@ -221,40 +225,46 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
sink.addError(
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
sink.close();
responseSink.close();
responseSubscription.cancel();
responseStream.detachSocket().then((socket) => socket.destroy());
stopWatchReceiveTimeout();
});
}

final stream = responseStream.transform<Uint8List>(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
watchReceiveTimeout(sink);
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
sink.add(data is Uint8List ? data : Uint8List.fromList(data));
}
},
handleDone: (sink) {
stopWatchReceiveTimeout();
sink.close();
},
),
responseSubscription = responseStream.cast<Uint8List>().listen(
(data) {
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

final headers = <String, List<String>>{};
responseStream.headers.forEach((key, values) {
headers[key] = values;
});
return ResponseBody(
stream,
responseSink.stream,
responseStream.statusCode,
headers: headers,
isRedirect:
Expand Down
3 changes: 1 addition & 2 deletions dio/test/options_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ void main() {
when(response.reasonPhrase).thenReturn('OK');
when(response.isRedirect).thenReturn(false);
when(response.redirects).thenReturn([]);
when(response.transform(any))
.thenAnswer((_) => Stream<Uint8List>.empty());
when(response.cast()).thenAnswer((_) => Stream<Uint8List>.empty());
return Future.value(request);
});

Expand Down
155 changes: 94 additions & 61 deletions dio/test/timeout_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';

import 'package:dio/dio.dart';
Expand All @@ -12,49 +13,102 @@ void main() {
dio.options.baseUrl = 'https://httpbun.com/';
});

test('catch DioException when connectTimeout', () {
dio.options.connectTimeout = Duration(milliseconds: 3);
group('Timeout exception of', () {
group('connectTimeout', () {
test('with response', () async {
dio.options.connectTimeout = Duration(milliseconds: 3);
await expectLater(
dio.get('/'),
allOf(
throwsA(isA<DioException>()),
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.connectionTimeout &&
e.message!.contains('${dio.options.connectTimeout}'))),
),
);
});

expectLater(
dio.get('/drip-lines?delay=2'),
allOf(
throwsA(isA<DioException>()),
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.connectionTimeout &&
e.message!.contains('0:00:00.003000'))),
),
);
});
test('update between calls', () async {
final client = HttpClient();
final dio = Dio()
..options.baseUrl = 'https://httpbun.com'
..httpClientAdapter = IOHttpClientAdapter(
createHttpClient: () => client,
);

test('catch DioException when receiveTimeout', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
dio.options.connectTimeout = Duration(milliseconds: 5);
await dio
.get('/')
.catchError((e) => Response(requestOptions: RequestOptions()));
expect(client.connectionTimeout, dio.options.connectTimeout);
dio.options.connectTimeout = Duration(milliseconds: 10);
await dio
.get('/')
.catchError((e) => Response(requestOptions: RequestOptions()));
expect(client.connectionTimeout, dio.options.connectTimeout);
}, testOn: 'vm');
});

final matcher = allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>((e) => e.message!.contains('0:00:01.000000')),
),
]);
await expectLater(
dio.get(
'/drip',
queryParameters: {'delay': 2},
),
matcher,
);
await expectLater(
dio.get(
'/drip',
queryParameters: {'delay': 0, 'duration:': 1},
options: Options(responseType: ResponseType.stream),
),
matcher,
);
group('receiveTimeout', () {
test('with normal response', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
await expectLater(
dio.get('/drip', queryParameters: {'delay': 2}),
allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>(
(e) => e.message!.contains('${dio.options.receiveTimeout}'),
),
),
]),
);
});

test('with streamed response', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
final completer = Completer<void>();
final streamedResponse = await dio.get(
'/drip',
queryParameters: {'delay': 0, 'duration': 20},
options: Options(responseType: ResponseType.stream),
);
(streamedResponse.data as ResponseBody).stream.listen(
(event) {},
onError: (error) {
if (!completer.isCompleted) {
completer.completeError(error);
}
},
onDone: () {
if (!completer.isCompleted) {
completer.complete();
}
},
);
await expectLater(
completer.future,
allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>(
(e) => e.message!.contains('${dio.options.receiveTimeout}'),
),
),
]),
);
}, testOn: 'vm');
});
});

test('no DioException when receiveTimeout > request duration', () async {
Expand All @@ -63,27 +117,6 @@ void main() {
await dio.get('/drip?delay=1&numbytes=1');
});

test('change connectTimeout in run time ', () async {
final dio = Dio();
final adapter = IOHttpClientAdapter();
final http = HttpClient();

adapter.createHttpClient = () => http;
dio.httpClientAdapter = adapter;
dio.options.connectTimeout = Duration(milliseconds: 200);

try {
await dio.get('/');
} on DioException catch (_) {}
expect(http.connectionTimeout?.inMilliseconds == 200, isTrue);

try {
dio.options.connectTimeout = Duration(seconds: 1);
await dio.get('/');
} on DioException catch (_) {}
expect(http.connectionTimeout?.inSeconds == 1, isTrue);
}, testOn: 'vm');

test('ignores zero duration timeouts', () async {
final dio = Dio(
BaseOptions(
Expand Down
36 changes: 21 additions & 15 deletions plugins/http2_adapter/lib/src/http2_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ class Http2Adapter implements HttpClientAdapter {
}
await stream.outgoingMessages.close();

final sc = StreamController<Uint8List>();
final responseSink = StreamController<Uint8List>();
final responseHeaders = Headers();
final responseCompleter = Completer<void>();
late StreamSubscription responseSubscription;
bool needRedirect = false;
bool needResponse = false;

Expand All @@ -139,21 +140,21 @@ class Http2Adapter implements HttpClientAdapter {
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
sc.addError(
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
sc.close();
responseSink.close();
responseSubscription.cancel();
stream.terminate();
stopWatchReceiveTimeout();
});
}

late int statusCode;
late StreamSubscription subscription;
subscription = stream.incomingMessages.listen(
responseSubscription = stream.incomingMessages.listen(
(StreamMessage message) async {
if (message is HeadersStreamMessage) {
for (final header in message.headers) {
Expand All @@ -175,24 +176,20 @@ class Http2Adapter implements HttpClientAdapter {
} else if (message is DataStreamMessage) {
if (needResponse) {
watchReceiveTimeout();
sc.add(
responseSink.add(
message.bytes is Uint8List
? message.bytes as Uint8List
: Uint8List.fromList(message.bytes),
);
} else {
stopWatchReceiveTimeout();
subscription.cancel().whenComplete(() {
responseSubscription.cancel().whenComplete(() {
stream.terminate();
sc.close();
responseSink.close();
});
}
}
},
onDone: () {
stopWatchReceiveTimeout();
sc.close();
},
onError: (Object error, StackTrace stackTrace) {
// If connection is being forcefully terminated, remove the connection.
if (error is TransportConnectionException) {
Expand All @@ -201,9 +198,16 @@ class Http2Adapter implements HttpClientAdapter {
if (!responseCompleter.isCompleted) {
responseCompleter.completeError(error, stackTrace);
} else {
sc.addError(error, stackTrace);
responseSink.addError(error, stackTrace);
}
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);
Expand All @@ -213,7 +217,9 @@ class Http2Adapter implements HttpClientAdapter {
responseFuture = responseFuture.timeout(
receiveTimeout,
onTimeout: () {
subscription.cancel().whenComplete(() => sc.close());
responseSubscription
.cancel()
.whenComplete(() => responseSink.close());
throw DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
Expand All @@ -237,7 +243,7 @@ class Http2Adapter implements HttpClientAdapter {
);
}
return ResponseBody(
sc.stream,
responseSink.stream,
statusCode,
headers: responseHeaders.map,
redirects: redirects,
Expand Down
Loading