Skip to content

Commit

Permalink
🐛 Stop watching the receive timeout after cancelled (#2117)
Browse files Browse the repository at this point in the history
Fixes #2115

### New Pull Request Checklist

- [x] I have read the
[Documentation](https://pub.dev/documentation/dio/latest/)
- [x] I have searched for a similar pull request in the
[project](https://github.com/cfug/dio/pulls) and found none
- [x] I have updated this branch with the latest `main` branch to avoid
conflicts (via merge from master or rebase)
- [x] I have added the required tests to prove the fix/feature I'm
adding
- [ ] I have updated the documentation (if necessary)
- [x] I have run the tests without failures
- [x] I have updated the `CHANGELOG.md` in the corresponding package
  • Loading branch information
AlexV525 authored Feb 20, 2024
1 parent d85a980 commit a0304b2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
2 changes: 1 addition & 1 deletion dio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ See the [Migration Guide][] for the complete breaking changes list.**

## Unreleased

*None.*
- Fix `receiveTimeout` throws exception after the request has been cancelled.

## 5.4.1

Expand Down
52 changes: 32 additions & 20 deletions dio/lib/src/response/response_stream_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:typed_data';

import 'package:dio/dio.dart';
import 'package:meta/meta.dart';

/// An internal helper which handles functionality
/// common to all adapters. This function ensures that
Expand All @@ -13,8 +14,9 @@ import 'package:dio/dio.dart';
/// - [options.cancelToken] for cancellation while receiving
Stream<Uint8List> handleResponseStream(
RequestOptions options,
ResponseBody response,
) {
ResponseBody response, {
@visibleForTesting void Function()? onReceiveTimeoutWatchCancelled,
}) {
final source = response.stream;
final responseSink = StreamController<Uint8List>();
late StreamSubscription<List<int>> responseSubscription;
Expand All @@ -30,31 +32,35 @@ Stream<Uint8List> handleResponseStream(
Timer? receiveTimer;

void stopWatchReceiveTimeout() {
onReceiveTimeoutWatchCancelled?.call();
receiveTimer?.cancel();
receiveTimer = null;
receiveStopwatch.stop();
receiveStopwatch
..stop()
..reset();
}

void watchReceiveTimeout() {
if (receiveTimeout <= Duration.zero) {
return;
}
receiveStopwatch.reset();
if (!receiveStopwatch.isRunning) {
receiveStopwatch.start();
}
// Not calling `stopWatchReceiveTimeout` to follow the semantic:
// Watching the new receive timeout does not indicate the watch
// has been cancelled.
receiveTimer?.cancel();
receiveStopwatch
..reset()
..start();
receiveTimer = Timer(receiveTimeout, () {
responseSink.addError(
stopWatchReceiveTimeout();
response.close();
responseSubscription.cancel();
responseSink.addErrorAndClose(
DioException.receiveTimeout(
timeout: receiveTimeout,
requestOptions: options,
),
);
response.close();
responseSink.close();
responseSubscription.cancel();
stopWatchReceiveTimeout();
});
}

Expand All @@ -72,8 +78,7 @@ Stream<Uint8List> handleResponseStream(
},
onError: (error, stackTrace) {
stopWatchReceiveTimeout();
responseSink.addError(error, stackTrace);
responseSink.close();
responseSink.addErrorAndClose(error, stackTrace);
},
onDone: () {
stopWatchReceiveTimeout();
Expand All @@ -84,13 +89,20 @@ Stream<Uint8List> handleResponseStream(
);

options.cancelToken?.whenCancel.whenComplete(() {
/// Close the response stream upon a cancellation.
responseSubscription.cancel();
stopWatchReceiveTimeout();
// Close the response stream upon a cancellation.
response.close();
if (!responseSink.isClosed) {
responseSink.addError(options.cancelToken!.cancelError!);
responseSink.close();
}
responseSubscription.cancel();
responseSink.addErrorAndClose(options.cancelToken!.cancelError!);
});
return responseSink.stream;
}

extension on StreamController<Uint8List> {
void addErrorAndClose(Object error, [StackTrace? stackTrace]) {
if (!isClosed) {
addError(error, stackTrace);
close();
}
}
}
29 changes: 29 additions & 0 deletions dio/test/response/response_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,34 @@ void main() {
expect(source.hasListener, isFalse);
});
});

test('not watching the receive timeout after cancelled', () async {
bool timerCancelled = false;
final cancelToken = CancelToken();
final stream = handleResponseStream(
RequestOptions(
cancelToken: cancelToken,
receiveTimeout: Duration(seconds: 1),
),
ResponseBody(source.stream, 200),
onReceiveTimeoutWatchCancelled: () => timerCancelled = true,
);
expect(source.hasListener, isTrue);
expectLater(
stream,
emitsInOrder([
Uint8List.fromList([0]),
emitsError(matchesDioException(
DioExceptionType.cancel,
stackTraceContains: 'test/response/response_stream_test.dart',
)),
emitsDone,
]),
);
source.add(Uint8List.fromList([0]));
cancelToken.cancel();
await Future.microtask(() {});
expect(timerCancelled, isTrue);
});
});
}

0 comments on commit a0304b2

Please sign in to comment.