Skip to content

Commit

Permalink
Add FutureGroup.addCancelable()
Browse files Browse the repository at this point in the history
  • Loading branch information
nex3 committed Nov 26, 2024
1 parent 1de8372 commit 6bc6363
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
4 changes: 4 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.13.0

* Add `FutureGroup.addCancelable()`.

## 2.12.0

- Require Dart 3.4.
Expand Down
30 changes: 24 additions & 6 deletions pkgs/async/lib/src/future_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

import 'dart:async';

import 'cancelable_operation.dart';

/// A sentinel object indicating that a member of a [FutureGroup] was canceled
/// rather than completing normally.
const _canceledResult = Object();

/// A collection of futures waits until all added [Future]s complete.
///
/// Futures are added to the group with [add]. Once you're finished adding
Expand Down Expand Up @@ -61,12 +67,21 @@ class FutureGroup<T> implements Sink<Future<T>> {
/// The values emitted by the futures that have been added to the group, in
/// the order they were added.
///
/// The slots for futures that haven't completed yet are `null`.
final _values = <T?>[];
/// This is type `Object?` rather than `T?` so it can contain
/// [_canceledResult]. The slots for futures that haven't completed yet are
/// `null`.
final _values = <Object?>[];

/// Wait for [task] to complete.
@override
void add(Future<T> task) {
void add(Future<T> task) =>
addCancelable(CancelableOperation.fromFuture(task));

/// Wait for [task] to complete.
///
/// If [task] is canceled, it's removed from the group without adding a value
/// to [future].
void addCancelable(CancelableOperation<T> task) {
if (_closed) throw StateError('The FutureGroup is closed.');

// Ensure that future values are put into [values] in the same order they're
Expand All @@ -76,19 +91,22 @@ class FutureGroup<T> implements Sink<Future<T>> {
_values.add(null);

_pending++;
task.then((value) {
task.valueOrCancellation().then((value) {
if (_completer.isCompleted) return null;

_pending--;
_values[index] = value;
_values[index] = task.isCanceled ? _canceledResult : value;

if (_pending != 0) return null;
var onIdleController = _onIdleController;
if (onIdleController != null) onIdleController.add(null);

if (!_closed) return null;
if (onIdleController != null) onIdleController.close();
_completer.complete(_values.whereType<T>().toList());
_completer.complete([
for (var value in _values)
if (value != _canceledResult && value is T) value
]);
}).catchError((Object error, StackTrace stackTrace) {
if (_completer.isCompleted) return null;
_completer.completeError(error, stackTrace);
Expand Down
2 changes: 1 addition & 1 deletion pkgs/async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.12.0
version: 2.13.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/core/tree/main/pkgs/async
issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync
Expand Down
17 changes: 17 additions & 0 deletions pkgs/async/test/future_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:async';

import 'package:async/src/cancelable_operation.dart';
import 'package:async/src/future_group.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -92,6 +93,22 @@ void main() {
expect(completed, isTrue);
});

test('a canceled operation doesn\'t block completion', () {
var completer1 = Completer<int>();
var completer2 = CancelableCompleter<int>();
var completer3 = Completer<int>();

futureGroup.add(completer1.future);
futureGroup.addCancelable(completer2.operation);
futureGroup.add(completer3.future);
futureGroup.close();

completer3.complete(3);
completer2.operation.cancel();
completer1.complete(1);
expect(futureGroup.future, completion(equals([1, 3])));
});

test('completes to the values of the futures in order of addition', () {
var completer1 = Completer<int>();
var completer2 = Completer<int>();
Expand Down

0 comments on commit 6bc6363

Please sign in to comment.