Skip to content

Commit

Permalink
🐛 Fix receiveTimeout for streamed responses (#2050)
Browse files Browse the repository at this point in the history
Due to httpbun issues, I found a fake test with timeouts which tested
with inappropriate queries (wrote `duration:` instead of `duration`),
and also found an incorrect implementation of the response stream.

### New Pull Request Checklist

- [x] I have read the
[Documentation](https://pub.dev/documentation/dio/latest/)
- [x] I have searched for a similar pull request in the
[project](https://github.com/cfug/dio/pulls) and found none
- [x] I have updated this branch with the latest `main` branch to avoid
conflicts (via merge from master or rebase)
- [x] I have added the required tests to prove the fix/feature I'm
adding
- [x] I have updated the documentation (if necessary)
- [x] I have run the tests without failures
- [x] I have updated the `CHANGELOG.md` in the corresponding package

### Additional context and info (if any)

- Once `EventSink` is called with `add` or `addError`, it cannot be used
again to make these calls, but it won't throw exceptions.
- `HttpClient` seems reused even if you created a new one.

---------

Signed-off-by: Alex Li <[email protected]>
  • Loading branch information
AlexV525 authored Dec 3, 2023
1 parent ff46ffa commit aff5760
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 137 deletions.
1 change: 1 addition & 0 deletions dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ See the [Migration Guide][] for the complete breaking changes list.**
## Unreleased

- Provide fix suggestions for `dart fix`.
- Fix `receiveTimeout` for streamed responses.

## 5.4.0

Expand Down
46 changes: 28 additions & 18 deletions dio/lib/src/adapters/io_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
}

// Use a StreamController to explicitly handle receive timeouts.
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;

final receiveStopwatch = Stopwatch();
Timer? receiveTimer;

Expand All @@ -211,7 +215,7 @@ class IOHttpClientAdapter implements HttpClientAdapter {
receiveStopwatch.stop();
}

void watchReceiveTimeout(EventSink<Uint8List> sink) {
void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
Expand All @@ -221,40 +225,46 @@ class IOHttpClientAdapter implements HttpClientAdapter {
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
sink.addError(
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
sink.close();
responseSink.close();
responseSubscription.cancel();
responseStream.detachSocket().then((socket) => socket.destroy());
stopWatchReceiveTimeout();
});
}

final stream = responseStream.transform<Uint8List>(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
watchReceiveTimeout(sink);
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
sink.add(data is Uint8List ? data : Uint8List.fromList(data));
}
},
handleDone: (sink) {
stopWatchReceiveTimeout();
sink.close();
},
),
responseSubscription = responseStream.cast<Uint8List>().listen(
(data) {
watchReceiveTimeout();
// Always true if the receive timeout was not set.
if (receiveStopwatch.elapsed <= receiveTimeout) {
responseSink.add(data);
}
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);

final headers = <String, List<String>>{};
responseStream.headers.forEach((key, values) {
headers[key] = values;
});
return ResponseBody(
stream,
responseSink.stream,
responseStream.statusCode,
headers: headers,
isRedirect:
Expand Down
3 changes: 1 addition & 2 deletions dio/test/options_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ void main() {
when(response.reasonPhrase).thenReturn('OK');
when(response.isRedirect).thenReturn(false);
when(response.redirects).thenReturn([]);
when(response.transform(any))
.thenAnswer((_) => Stream<Uint8List>.empty());
when(response.cast()).thenAnswer((_) => Stream<Uint8List>.empty());
return Future.value(request);
});

Expand Down
155 changes: 94 additions & 61 deletions dio/test/timeout_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';

import 'package:dio/dio.dart';
Expand All @@ -12,49 +13,102 @@ void main() {
dio.options.baseUrl = 'https://httpbun.com/';
});

test('catch DioException when connectTimeout', () {
dio.options.connectTimeout = Duration(milliseconds: 3);
group('Timeout exception of', () {
group('connectTimeout', () {
test('with response', () async {
dio.options.connectTimeout = Duration(milliseconds: 3);
await expectLater(
dio.get('/'),
allOf(
throwsA(isA<DioException>()),
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.connectionTimeout &&
e.message!.contains('${dio.options.connectTimeout}'))),
),
);
});

expectLater(
dio.get('/drip-lines?delay=2'),
allOf(
throwsA(isA<DioException>()),
throwsA(predicate((DioException e) =>
e.type == DioExceptionType.connectionTimeout &&
e.message!.contains('0:00:00.003000'))),
),
);
});
test('update between calls', () async {
final client = HttpClient();
final dio = Dio()
..options.baseUrl = 'https://httpbun.com'
..httpClientAdapter = IOHttpClientAdapter(
createHttpClient: () => client,
);

test('catch DioException when receiveTimeout', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
dio.options.connectTimeout = Duration(milliseconds: 5);
await dio
.get('/')
.catchError((e) => Response(requestOptions: RequestOptions()));
expect(client.connectionTimeout, dio.options.connectTimeout);
dio.options.connectTimeout = Duration(milliseconds: 10);
await dio
.get('/')
.catchError((e) => Response(requestOptions: RequestOptions()));
expect(client.connectionTimeout, dio.options.connectTimeout);
}, testOn: 'vm');
});

final matcher = allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>((e) => e.message!.contains('0:00:01.000000')),
),
]);
await expectLater(
dio.get(
'/drip',
queryParameters: {'delay': 2},
),
matcher,
);
await expectLater(
dio.get(
'/drip',
queryParameters: {'delay': 0, 'duration:': 1},
options: Options(responseType: ResponseType.stream),
),
matcher,
);
group('receiveTimeout', () {
test('with normal response', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
await expectLater(
dio.get('/drip', queryParameters: {'delay': 2}),
allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>(
(e) => e.message!.contains('${dio.options.receiveTimeout}'),
),
),
]),
);
});

test('with streamed response', () async {
dio.options.receiveTimeout = Duration(seconds: 1);
final completer = Completer<void>();
final streamedResponse = await dio.get(
'/drip',
queryParameters: {'delay': 0, 'duration': 20},
options: Options(responseType: ResponseType.stream),
);
(streamedResponse.data as ResponseBody).stream.listen(
(event) {},
onError: (error) {
if (!completer.isCompleted) {
completer.completeError(error);
}
},
onDone: () {
if (!completer.isCompleted) {
completer.complete();
}
},
);
await expectLater(
completer.future,
allOf([
throwsA(isA<DioException>()),
throwsA(
predicate<DioException>(
(e) => e.type == DioExceptionType.receiveTimeout,
),
),
throwsA(
predicate<DioException>(
(e) => e.message!.contains('${dio.options.receiveTimeout}'),
),
),
]),
);
}, testOn: 'vm');
});
});

test('no DioException when receiveTimeout > request duration', () async {
Expand All @@ -63,27 +117,6 @@ void main() {
await dio.get('/drip?delay=1&numbytes=1');
});

test('change connectTimeout in run time ', () async {
final dio = Dio();
final adapter = IOHttpClientAdapter();
final http = HttpClient();

adapter.createHttpClient = () => http;
dio.httpClientAdapter = adapter;
dio.options.connectTimeout = Duration(milliseconds: 200);

try {
await dio.get('/');
} on DioException catch (_) {}
expect(http.connectionTimeout?.inMilliseconds == 200, isTrue);

try {
dio.options.connectTimeout = Duration(seconds: 1);
await dio.get('/');
} on DioException catch (_) {}
expect(http.connectionTimeout?.inSeconds == 1, isTrue);
}, testOn: 'vm');

test('ignores zero duration timeouts', () async {
final dio = Dio(
BaseOptions(
Expand Down
36 changes: 21 additions & 15 deletions plugins/http2_adapter/lib/src/http2_adapter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ class Http2Adapter implements HttpClientAdapter {
}
await stream.outgoingMessages.close();

final sc = StreamController<Uint8List>();
final responseSink = StreamController<Uint8List>();
final responseHeaders = Headers();
final responseCompleter = Completer<void>();
late StreamSubscription responseSubscription;
bool needRedirect = false;
bool needResponse = false;

Expand All @@ -139,21 +140,21 @@ class Http2Adapter implements HttpClientAdapter {
}
receiveTimer?.cancel();
receiveTimer = Timer(receiveTimeout, () {
sc.addError(
responseSink.addError(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
sc.close();
responseSink.close();
responseSubscription.cancel();
stream.terminate();
stopWatchReceiveTimeout();
});
}

late int statusCode;
late StreamSubscription subscription;
subscription = stream.incomingMessages.listen(
responseSubscription = stream.incomingMessages.listen(
(StreamMessage message) async {
if (message is HeadersStreamMessage) {
for (final header in message.headers) {
Expand All @@ -175,24 +176,20 @@ class Http2Adapter implements HttpClientAdapter {
} else if (message is DataStreamMessage) {
if (needResponse) {
watchReceiveTimeout();
sc.add(
responseSink.add(
message.bytes is Uint8List
? message.bytes as Uint8List
: Uint8List.fromList(message.bytes),
);
} else {
stopWatchReceiveTimeout();
subscription.cancel().whenComplete(() {
responseSubscription.cancel().whenComplete(() {
stream.terminate();
sc.close();
responseSink.close();
});
}
}
},
onDone: () {
stopWatchReceiveTimeout();
sc.close();
},
onError: (Object error, StackTrace stackTrace) {
// If connection is being forcefully terminated, remove the connection.
if (error is TransportConnectionException) {
Expand All @@ -201,9 +198,16 @@ class Http2Adapter implements HttpClientAdapter {
if (!responseCompleter.isCompleted) {
responseCompleter.completeError(error, stackTrace);
} else {
sc.addError(error, stackTrace);
responseSink.addError(error, stackTrace);
}
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
onDone: () {
stopWatchReceiveTimeout();
responseSubscription.cancel();
responseSink.close();
},
cancelOnError: true,
);
Expand All @@ -213,7 +217,9 @@ class Http2Adapter implements HttpClientAdapter {
responseFuture = responseFuture.timeout(
receiveTimeout,
onTimeout: () {
subscription.cancel().whenComplete(() => sc.close());
responseSubscription
.cancel()
.whenComplete(() => responseSink.close());
throw DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
Expand All @@ -237,7 +243,7 @@ class Http2Adapter implements HttpClientAdapter {
);
}
return ResponseBody(
sc.stream,
responseSink.stream,
statusCode,
headers: responseHeaders.map,
redirects: redirects,
Expand Down
Loading

0 comments on commit aff5760

Please sign in to comment.