diff --git a/README.md b/README.md index b323cfd7..351842af 100644 --- a/README.md +++ b/README.md @@ -539,7 +539,9 @@ public enum AwaitOperation /// All values are sent immediately to the asynchronous method. Parallel, /// All values are sent immediately to the asynchronous method, but the results are queued and passed to the next operator in order. - SequentialParallel + SequentialParallel, + /// Only the latest value is queued, and the next value waits for the completion of the asynchronous method. + Latest, } ``` @@ -559,9 +561,9 @@ Additionally, the following time-related filtering methods can also accept async | Name | ReturnType | | --- | --- | -| **Debounce**(this `Observable` source, `Func` throttleDurationSelector, `Boolean` configureAwait = true) | `Observable` | -| **ThrottleFirst**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | -| **ThrottleLast**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | +| **Debounce**(this `Observable` source, `Func` throttleDurationSelector, `Boolean` configureAwait = true) | `Observable` | +| **ThrottleFirst**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | +| **ThrottleLast**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | Concurrency Policy --- @@ -576,8 +578,8 @@ It has `ObservableList`, `ObservableDictionary`, `ObservableHas ```csharp Observable> IObservableCollection.ObserveAdd() Observable> IObservableCollection.ObserveRemove() -Observable> IObservableCollection.ObserveReplace() -Observable> IObservableCollection.ObserveMove() +Observable> IObservableCollection.ObserveReplace() +Observable> IObservableCollection.ObserveMove() Observable> IObservableCollection.ObserveReset() ``` @@ -1412,99 +1414,99 @@ For default time based operations that do not take a provider, `ObservableSystem Factory methods are defined as static methods in the static class `Observable`. -| Name(Parameter) | ReturnType | -| --- | --- | -| **Amb**(params `Observable[]` sources) | `Observable` | -| **Amb**(`IEnumerable>` sources) | `Observable` | -| **CombineLatest**(params `Observable[]` sources) | `Observable` | -| **CombineLatest**(`IEnumerable>` sources) | `Observable` | -| **Concat**(params `Observable[]` sources) | `Observable` | -| **Concat**(`IEnumerable>` sources) | `Observable` | -| **Concat**(this `Observable>` sources) | `Observable` | -| **Create**(`Func, IDisposable>` subscribe, `Boolean` rawObserver = false) | `Observable` | -| **Create**(`TState` state, `Func, TState, IDisposable>` subscribe, `Boolean` rawObserver = false) | `Observable` | -| **Create**(`Func, CancellationToken, ValueTask>` subscribe, `Boolean` rawObserver = false) | `Observable` | -| **Create**(`TState` state, `Func, TState, CancellationToken, ValueTask>` subscribe, `Boolean` rawObserver = false) | `Observable` | -| **CreateFrom**(`Func>` factory) | `Observable` | -| **CreateFrom**(`TState` state, `Func>` factory) | `Observable` | -| **Defer**(`Func>` observableFactory) | `Observable` | -| **Empty**() | `Observable` | -| **Empty**(`TimeProvider` timeProvider) | `Observable` | -| **Empty**(`TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **EveryUpdate**() | `Observable` | -| **EveryUpdate**(`CancellationToken` cancellationToken) | `Observable` | -| **EveryUpdate**(`FrameProvider` frameProvider) | `Observable` | -| **EveryUpdate**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken) | `Observable` | -| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `CancellationToken` cancellationToken = default) | `Observable` | -| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `EqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Observable` | -| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `FrameProvider` frameProvider, `EqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Observable` | -| **FromAsync**(`Func` asyncFactory, `Boolean` configureAwait = true) | `Observable` | -| **FromAsync**(`Func>` asyncFactory, `Boolean` configureAwait = true) | `Observable` | -| **FromEvent**(`Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | -| **FromEvent**(`Action>` addHandler, `Action>` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | -| **FromEvent**(`Func` conversion, `Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | -| **FromEvent**(`Func, TDelegate>` conversion, `Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | -| **FromEventHandler**(`Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable>` | -| **FromEventHandler**(`Action>` addHandler, `Action>` removeHandler, `CancellationToken` cancellationToken = default) | `Observable>` | -| **Interval**(`TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | -| **Interval**(`TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **IntervalFrame**(`Int32` periodFrame, `CancellationToken` cancellationToken = default) | `Observable` | -| **IntervalFrame**(`Int32` periodFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Merge**(params `Observable[]` sources) | `Observable` | -| **Merge**(`IEnumerable>` sources) | `Observable` | -| **Merge**(this `Observable>` sources) | `Observable` | -| **Never**() | `Observable` | -| **NextFrame**(`CancellationToken` cancellationToken = default) | `Observable` | -| **NextFrame**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Range**(`Int32` start, `Int32` count) | `Observable` | -| **Range**(`Int32` start, `Int32` count, `CancellationToken` cancellationToken) | `Observable` | -| **Repeat**(`T` value, `Int32` count) | `Observable` | -| **Repeat**(`T` value, `Int32` count, `CancellationToken` cancellationToken) | `Observable` | -| **Return**(`T` value) | `Observable` | -| **Return**(`T` value, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Return**(`T` value, `TimeSpan` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Return**(`Unit` value) | `Observable` | -| **Return**(`Boolean` value) | `Observable` | -| **Return**(`Int32` value) | `Observable` | -| **ReturnFrame**(`T` value, `CancellationToken` cancellationToken = default) | `Observable` | -| **ReturnFrame**(`T` value, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **ReturnFrame**(`T` value, `Int32` dueTimeFrame, `CancellationToken` cancellationToken = default) | `Observable` | -| **ReturnFrame**(`T` value, `Int32` dueTimeFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **ReturnOnCompleted**(`Result` result) | `Observable` | -| **ReturnOnCompleted**(`Result` result, `TimeProvider` timeProvider) | `Observable` | -| **ReturnOnCompleted**(`Result` result, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **ReturnUnit**() | `Observable` | -| **Throw**(`Exception` exception) | `Observable` | -| **Throw**(`Exception` exception, `TimeProvider` timeProvider) | `Observable` | -| **Throw**(`Exception` exception, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **Timer**(`TimeSpan` dueTime, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`DateTimeOffset` dueTime, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`TimeSpan` dueTime, `TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`DateTimeOffset` dueTime, `TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`TimeSpan` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`DateTimeOffset` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`TimeSpan` dueTime, `TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Timer**(`DateTimeOffset` dueTime, `TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **TimerFrame**(`Int32` dueTimeFrame, `CancellationToken` cancellationToken = default) | `Observable` | -| **TimerFrame**(`Int32` dueTimeFrame, `Int32` periodFrame, `CancellationToken` cancellationToken = default) | `Observable` | -| **TimerFrame**(`Int32` dueTimeFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **TimerFrame**(`Int32` dueTimeFrame, `Int32` periodFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **ToObservable**(this `Task` task, `Boolean` configureAwait = true) | `Observable` | -| **ToObservable**(this `Task` task, `Boolean` configureAwait = true) | `Observable` | -| **ToObservable**(this `ValueTask` task, `Boolean` configureAwait = true) | `Observable` | -| **ToObservable**(this `ValueTask` task, `Boolean` configureAwait = true) | `Observable` | -| **ToObservable**(this `IEnumerable` source, `CancellationToken` cancellationToken = default) | `Observable` | -| **ToObservable**(this `IAsyncEnumerable` source) | `Observable` | -| **ToObservable**(this `IObservable` source) | `Observable` | -| **Yield**(`CancellationToken` cancellationToken = default) | `Observable` | -| **Yield**(`TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **YieldFrame**(`CancellationToken` cancellationToken = default) | `Observable` | -| **YieldFrame**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | -| **Zip**(params `Observable[]` sources) | `Observable` | -| **Zip**(`IEnumerable>` sources) | `Observable` | -| **ZipLatest**(params `Observable[]` sources) | `Observable` | -| **ZipLatest**(`IEnumerable>` sources) | `Observable` | +| Name(Parameter) | ReturnType | +| --- | --- | +| **Amb**(params `Observable[]` sources) | `Observable` | +| **Amb**(`IEnumerable>` sources) | `Observable` | +| **CombineLatest**(params `Observable[]` sources) | `Observable` | +| **CombineLatest**(`IEnumerable>` sources) | `Observable` | +| **Concat**(params `Observable[]` sources) | `Observable` | +| **Concat**(`IEnumerable>` sources) | `Observable` | +| **Concat**(this `Observable>` sources) | `Observable` | +| **Create**(`Func, IDisposable>` subscribe, `Boolean` rawObserver = false) | `Observable` | +| **Create**(`TState` state, `Func, TState, IDisposable>` subscribe, `Boolean` rawObserver = false) | `Observable` | +| **Create**(`Func, CancellationToken, ValueTask>` subscribe, `Boolean` rawObserver = false) | `Observable` | +| **Create**(`TState` state, `Func, TState, CancellationToken, ValueTask>` subscribe, `Boolean` rawObserver = false) | `Observable` | +| **CreateFrom**(`Func>` factory) | `Observable` | +| **CreateFrom**(`TState` state, `Func>` factory) | `Observable` | +| **Defer**(`Func>` observableFactory) | `Observable` | +| **Empty**() | `Observable` | +| **Empty**(`TimeProvider` timeProvider) | `Observable` | +| **Empty**(`TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **EveryUpdate**() | `Observable` | +| **EveryUpdate**(`CancellationToken` cancellationToken) | `Observable` | +| **EveryUpdate**(`FrameProvider` frameProvider) | `Observable` | +| **EveryUpdate**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken) | `Observable` | +| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `CancellationToken` cancellationToken = default) | `Observable` | +| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `EqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Observable` | +| **EveryValueChanged**(`TSource` source, `Func` propertySelector, `FrameProvider` frameProvider, `EqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Observable` | +| **FromAsync**(`Func` asyncFactory, `Boolean` configureAwait = true) | `Observable` | +| **FromAsync**(`Func>` asyncFactory, `Boolean` configureAwait = true) | `Observable` | +| **FromEvent**(`Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | +| **FromEvent**(`Action>` addHandler, `Action>` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | +| **FromEvent**(`Func` conversion, `Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | +| **FromEvent**(`Func, TDelegate>` conversion, `Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable` | +| **FromEventHandler**(`Action` addHandler, `Action` removeHandler, `CancellationToken` cancellationToken = default) | `Observable>` | +| **FromEventHandler**(`Action>` addHandler, `Action>` removeHandler, `CancellationToken` cancellationToken = default) | `Observable>` | +| **Interval**(`TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | +| **Interval**(`TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **IntervalFrame**(`Int32` periodFrame, `CancellationToken` cancellationToken = default) | `Observable` | +| **IntervalFrame**(`Int32` periodFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Merge**(params `Observable[]` sources) | `Observable` | +| **Merge**(`IEnumerable>` sources) | `Observable` | +| **Merge**(this `Observable>` sources) | `Observable` | +| **Never**() | `Observable` | +| **NextFrame**(`CancellationToken` cancellationToken = default) | `Observable` | +| **NextFrame**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Range**(`Int32` start, `Int32` count) | `Observable` | +| **Range**(`Int32` start, `Int32` count, `CancellationToken` cancellationToken) | `Observable` | +| **Repeat**(`T` value, `Int32` count) | `Observable` | +| **Repeat**(`T` value, `Int32` count, `CancellationToken` cancellationToken) | `Observable` | +| **Return**(`T` value) | `Observable` | +| **Return**(`T` value, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Return**(`T` value, `TimeSpan` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Return**(`Unit` value) | `Observable` | +| **Return**(`Boolean` value) | `Observable` | +| **Return**(`Int32` value) | `Observable` | +| **ReturnFrame**(`T` value, `CancellationToken` cancellationToken = default) | `Observable` | +| **ReturnFrame**(`T` value, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **ReturnFrame**(`T` value, `Int32` dueTimeFrame, `CancellationToken` cancellationToken = default) | `Observable` | +| **ReturnFrame**(`T` value, `Int32` dueTimeFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **ReturnOnCompleted**(`Result` result) | `Observable` | +| **ReturnOnCompleted**(`Result` result, `TimeProvider` timeProvider) | `Observable` | +| **ReturnOnCompleted**(`Result` result, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **ReturnUnit**() | `Observable` | +| **Throw**(`Exception` exception) | `Observable` | +| **Throw**(`Exception` exception, `TimeProvider` timeProvider) | `Observable` | +| **Throw**(`Exception` exception, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **Timer**(`TimeSpan` dueTime, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`DateTimeOffset` dueTime, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`TimeSpan` dueTime, `TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`DateTimeOffset` dueTime, `TimeSpan` period, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`TimeSpan` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`DateTimeOffset` dueTime, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`TimeSpan` dueTime, `TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Timer**(`DateTimeOffset` dueTime, `TimeSpan` period, `TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **TimerFrame**(`Int32` dueTimeFrame, `CancellationToken` cancellationToken = default) | `Observable` | +| **TimerFrame**(`Int32` dueTimeFrame, `Int32` periodFrame, `CancellationToken` cancellationToken = default) | `Observable` | +| **TimerFrame**(`Int32` dueTimeFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **TimerFrame**(`Int32` dueTimeFrame, `Int32` periodFrame, `FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **ToObservable**(this `Task` task, `Boolean` configureAwait = true) | `Observable` | +| **ToObservable**(this `Task` task, `Boolean` configureAwait = true) | `Observable` | +| **ToObservable**(this `ValueTask` task, `Boolean` configureAwait = true) | `Observable` | +| **ToObservable**(this `ValueTask` task, `Boolean` configureAwait = true) | `Observable` | +| **ToObservable**(this `IEnumerable` source, `CancellationToken` cancellationToken = default) | `Observable` | +| **ToObservable**(this `IAsyncEnumerable` source) | `Observable` | +| **ToObservable**(this `IObservable` source) | `Observable` | +| **Yield**(`CancellationToken` cancellationToken = default) | `Observable` | +| **Yield**(`TimeProvider` timeProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **YieldFrame**(`CancellationToken` cancellationToken = default) | `Observable` | +| **YieldFrame**(`FrameProvider` frameProvider, `CancellationToken` cancellationToken = default) | `Observable` | +| **Zip**(params `Observable[]` sources) | `Observable` | +| **Zip**(`IEnumerable>` sources) | `Observable` | +| **ZipLatest**(params `Observable[]` sources) | `Observable` | +| **ZipLatest**(`IEnumerable>` sources) | `Observable` | Methods that accept a `CancellationToken` will emit `OnCompleted` when a Cancel is issued. This allows you to unsubscribe all subscriptions from the event source. @@ -1537,295 +1539,295 @@ Among our custom frame-based methods, `EveryUpdate` emits values every frame. `Y Operator methods are defined as extension methods to `Observable` in the static class `ObservableExtensions`. -| Name(Parameter) | ReturnType | -| --- | --- | -| **AggregateAsync**(this `Observable` source, `Func` func, `CancellationToken` cancellationToken = default) | `Task` | -| **AggregateAsync**(this `Observable` source, `TResult` seed, `Func` func, `CancellationToken` cancellationToken = default) | `Task` | -| **AggregateAsync**(this `Observable` source, `TAccumulate` seed, `Func` func, `Func` resultSelector, `CancellationToken` cancellationToken = default) | `Task` | -| **AggregateByAsync**(this `Observable` source, `Func` keySelector, `TAccumulate` seed, `Func` func, `IEqualityComparer` keyComparer = default, `CancellationToken` cancellationToken = default) | `Task>>` | -| **AggregateByAsync**(this `Observable` source, `Func` keySelector, `Func` seedSelector, `Func` func, `IEqualityComparer` keyComparer = default, `CancellationToken` cancellationToken = default) | `Task>>` | -| **AllAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **Amb**(this `Observable` source, `Observable` second) | `Observable` | -| **AnyAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AnyAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **Append**(this `Observable` source, `T` value) | `Observable` | -| **AsObservable**(this `Observable` source) | `Observable` | -| **AsSystemObservable**(this `Observable` source) | `IObservable` | -| **AsUnitObservable**(this `Observable` source) | `Observable` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **Cast**(this `Observable` source) | `Observable` | -| **Catch**(this `Observable` source, `Observable` second) | `Observable` | -| **Catch**(this `Observable` source, `Func>` errorHandler) | `Observable` | -| **Chunk**(this `Observable` source, `Int32` count) | `Observable` | -| **Chunk**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | -| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | -| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `Int32` count) | `Observable` | -| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `Int32` count, `TimeProvider` timeProvider) | `Observable` | -| **Chunk**(this `Observable` source, `Observable` windowBoundaries) | `Observable` | -| **ChunkFrame**(this `Observable` source) | `Observable` | -| **ChunkFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `Int32` count) | `Observable` | -| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `Int32` count, `FrameProvider` frameProvider) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | -| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | -| **Concat**(this `Observable` source, `Observable` second) | `Observable` | -| **ContainsAsync**(this `Observable` source, `T` value, `CancellationToken` cancellationToken = default) | `Task` | -| **ContainsAsync**(this `Observable` source, `T` value, `IEqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Task` | -| **CountAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **CountAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **Debounce**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | -| **Debounce**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | -| **Debounce**(this `Observable` source, `Func` throttleDurationSelector, `Boolean` configureAwait = true) | `Observable` | -| **DebounceFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **DebounceFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **DefaultIfEmpty**(this `Observable` source) | `Observable` | -| **DefaultIfEmpty**(this `Observable` source, `T` defaultValue) | `Observable` | -| **Delay**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | -| **Delay**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **DelayFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **DelayFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **DelaySubscription**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | -| **DelaySubscription**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **DelaySubscriptionFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **DelaySubscriptionFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **Dematerialize**(this `Observable>` source) | `Observable` | -| **Distinct**(this `Observable` source) | `Observable` | -| **Distinct**(this `Observable` source, `IEqualityComparer` comparer) | `Observable` | -| **DistinctBy**(this `Observable` source, `Func` keySelector) | `Observable` | -| **DistinctBy**(this `Observable` source, `Func` keySelector, `IEqualityComparer` comparer) | `Observable` | -| **DistinctUntilChanged**(this `Observable` source) | `Observable` | -| **DistinctUntilChanged**(this `Observable` source, `IEqualityComparer` comparer) | `Observable` | -| **DistinctUntilChangedBy**(this `Observable` source, `Func` keySelector) | `Observable` | -| **DistinctUntilChangedBy**(this `Observable` source, `Func` keySelector, `IEqualityComparer` comparer) | `Observable` | -| **Do**(this `Observable` source, `Action` onNext = default, `Action` onErrorResume = default, `Action` onCompleted = default, `Action` onDispose = default, `Action` onSubscribe = default) | `Observable` | -| **Do**(this `Observable` source, `TState` state, `Action` onNext = default, `Action` onErrorResume = default, `Action` onCompleted = default, `Action` onDispose = default, `Action` onSubscribe = default) | `Observable` | -| **DoCancelOnCompleted**(this `Observable` source, `CancellationTokenSource` cancellationTokenSource) | `Observable` | -| **ElementAtAsync**(this `Observable` source, `Int32` index, `CancellationToken` cancellationToken = default) | `Task` | -| **ElementAtAsync**(this `Observable` source, `Index` index, `CancellationToken` cancellationToken = default) | `Task` | -| **ElementAtOrDefaultAsync**(this `Observable` source, `Int32` index, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **ElementAtOrDefaultAsync**(this `Observable` source, `Index` index, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **FirstAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **FirstAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **FirstOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **FirstOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **ForEachAsync**(this `Observable` source, `Action` action, `CancellationToken` cancellationToken = default) | `Task` | -| **ForEachAsync**(this `Observable` source, `Action` action, `CancellationToken` cancellationToken = default) | `Task` | -| **IgnoreElements**(this `Observable` source) | `Observable` | -| **IgnoreElements**(this `Observable` source, `Action` doOnNext) | `Observable` | -| **IgnoreOnErrorResume**(this `Observable` source) | `Observable` | -| **IgnoreOnErrorResume**(this `Observable` source, `Action` doOnErrorResume) | `Observable` | -| **Index**(this `Observable` source) | `Observable` | -| **Index**(this `Observable` source) | `Observable>` | -| **IsEmptyAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **LastAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **LastAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **LastOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **LastOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **LongCountAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **LongCountAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **Materialize**(this `Observable` source) | `Observable>` | -| **MaxAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **MaxAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **MaxAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **MaxAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **MaxByAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task` | -| **MaxByAsync**(this `Observable` source, `Func` keySelector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **Merge**(this `Observable` source, `Observable` second) | `Observable` | -| **MinAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **MinAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **MinAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **MinAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **MinByAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task` | -| **MinByAsync**(this `Observable` source, `Func` keySelector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | -| **MinMaxAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | -| **MinMaxAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **MinMaxAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task>` | -| **MinMaxAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **Multicast**(this `Observable` source, `ISubject` subject) | `ConnectableObservable` | -| **ObserveOn**(this `Observable` source, `SynchronizationContext` synchronizationContext) | `Observable` | -| **ObserveOn**(this `Observable` source, `TimeProvider` timeProvider) | `Observable` | -| **ObserveOn**(this `Observable` source, `FrameProvider` frameProvider) | `Observable` | -| **ObserveOnCurrentSynchronizationContext**(this `Observable` source) | `Observable` | -| **ObserveOnThreadPool**(this `Observable` source) | `Observable` | -| **OfType**(this `Observable` source) | `Observable` | -| **OnErrorResumeAsFailure**(this `Observable` source) | `Observable` | -| **Pairwise**(this `Observable` source) | `Observable>` | -| **Prepend**(this `Observable` source, `T` value) | `Observable` | -| **Publish**(this `Observable` source) | `ConnectableObservable` | -| **Publish**(this `Observable` source, `T` initialValue) | `ConnectableObservable` | -| **RefCount**(this `ConnectableObservable` source) | `Observable` | -| **Replay**(this `Observable` source) | `ConnectableObservable` | -| **Replay**(this `Observable` source, `Int32` bufferSize) | `ConnectableObservable` | -| **Replay**(this `Observable` source, `TimeSpan` window) | `ConnectableObservable` | -| **Replay**(this `Observable` source, `TimeSpan` window, `TimeProvider` timeProvider) | `ConnectableObservable` | -| **Replay**(this `Observable` source, `Int32` bufferSize, `TimeSpan` window) | `ConnectableObservable` | -| **Replay**(this `Observable` source, `Int32` bufferSize, `TimeSpan` window, `TimeProvider` timeProvider) | `ConnectableObservable` | -| **ReplayFrame**(this `Observable` source, `Int32` window) | `ConnectableObservable` | -| **ReplayFrame**(this `Observable` source, `Int32` window, `FrameProvider` frameProvider) | `ConnectableObservable` | -| **ReplayFrame**(this `Observable` source, `Int32` bufferSize, `Int32` window) | `ConnectableObservable` | -| **ReplayFrame**(this `Observable` source, `Int32` bufferSize, `Int32` window, `FrameProvider` frameProvider) | `ConnectableObservable` | -| **Scan**(this `Observable` source, `Func` accumulator) | `Observable` | -| **Scan**(this `Observable` source, `TAccumulate` seed, `Func` accumulator) | `Observable` | -| **Select**(this `Observable` source, `Func` selector) | `Observable` | -| **Select**(this `Observable` source, `Func` selector) | `Observable` | -| **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | -| **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | -| **SelectAwait**(this `Observable` source, `Func>` selector, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `Observable` | -| **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | -| **SelectMany**(this `Observable` source, `Func>` collectionSelector, `Func` resultSelector) | `Observable` | -| **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | -| **SelectMany**(this `Observable` source, `Func>` collectionSelector, `Func` resultSelector) | `Observable` | -| **SequenceEqualAsync**(this `Observable` source, `Observable` second, `CancellationToken` cancellationToken = default) | `Task` | -| **SequenceEqualAsync**(this `Observable` source, `Observable` second, `IEqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Task` | -| **Share**(this `Observable` source) | `Observable` | -| **SingleAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SingleAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | -| **SingleOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **SingleOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | -| **Skip**(this `Observable` source, `Int32` count) | `Observable` | -| **Skip**(this `Observable` source, `TimeSpan` duration) | `Observable` | -| **Skip**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | -| **SkipFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **SkipFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **SkipLast**(this `Observable` source, `Int32` count) | `Observable` | -| **SkipLast**(this `Observable` source, `TimeSpan` duration) | `Observable` | -| **SkipLast**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | -| **SkipLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **SkipLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **SkipUntil**(this `Observable` source, `Observable` other) | `Observable` | -| **SkipUntil**(this `Observable` source, `CancellationToken` cancellationToken) | `Observable` | -| **SkipUntil**(this `Observable` source, `Task` task) | `Observable` | -| **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | -| **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | -| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onErrorResume, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | -| **SubscribeOn**(this `Observable` source, `SynchronizationContext` synchronizationContext) | `Observable` | -| **SubscribeOn**(this `Observable` source, `TimeProvider` timeProvider) | `Observable` | -| **SubscribeOn**(this `Observable` source, `FrameProvider` frameProvider) | `Observable` | -| **SubscribeOnCurrentSynchronizationContext**(this `Observable` source) | `Observable` | -| **SubscribeOnThreadPool**(this `Observable` source) | `Observable` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | -| **Switch**(this `Observable>` sources) | `Observable` | -| **Synchronize**(this `Observable` source) | `Observable` | -| **Synchronize**(this `Observable` source, `Object` gate) | `Observable` | -| **Take**(this `Observable` source, `Int32` count) | `Observable` | -| **Take**(this `Observable` source, `TimeSpan` duration) | `Observable` | -| **Take**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | -| **TakeFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **TakeFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **TakeLast**(this `Observable` source, `Int32` count) | `Observable` | -| **TakeLast**(this `Observable` source, `TimeSpan` duration) | `Observable` | -| **TakeLast**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | -| **TakeLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **TakeLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **TakeUntil**(this `Observable` source, `Observable` other) | `Observable` | -| **TakeUntil**(this `Observable` source, `CancellationToken` cancellationToken) | `Observable` | -| **TakeUntil**(this `Observable` source, `Task` task) | `Observable` | -| **TakeWhile**(this `Observable` source, `Func` predicate) | `Observable` | -| **TakeWhile**(this `Observable` source, `Func` predicate) | `Observable` | -| **ThrottleFirst**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | -| **ThrottleFirst**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | -| **ThrottleFirst**(this `Observable` source, `Observable` sampler) | `Observable` | -| **ThrottleFirst**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | -| **ThrottleFirstFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **ThrottleFirstFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **ThrottleLast**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | -| **ThrottleLast**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | -| **ThrottleLast**(this `Observable` source, `Observable` sampler) | `Observable` | -| **ThrottleLast**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | -| **ThrottleLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **ThrottleLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **Timeout**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | -| **Timeout**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | -| **TimeoutFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | -| **TimeoutFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | -| **ToArrayAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **ToAsyncEnumerable**(this `Observable` source, `CancellationToken` cancellationToken = default) | `IAsyncEnumerable` | -| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToHashSetAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToHashSetAsync**(this `Observable` source, `IEqualityComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToListAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToLiveList**(this `Observable` source) | `LiveList` | -| **ToLiveList**(this `Observable` source, `Int32` bufferSize) | `LiveList` | -| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `CancellationToken` cancellationToken = default) | `Task>` | -| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | -| **Trampoline**(this `Observable` source) | `Observable` | -| **WaitAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | -| **Where**(this `Observable` source, `Func` predicate) | `Observable` | -| **Where**(this `Observable` source, `Func` predicate) | `Observable` | -| **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | -| **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | -| **WhereAwait**(this `Observable` source, `Func>` predicate, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `Observable` | -| **WithLatestFrom**(this `Observable` first, `Observable` second, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | -| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | -| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | +| Name(Parameter) | ReturnType | +| --- | --- | +| **AggregateAsync**(this `Observable` source, `Func` func, `CancellationToken` cancellationToken = default) | `Task` | +| **AggregateAsync**(this `Observable` source, `TResult` seed, `Func` func, `CancellationToken` cancellationToken = default) | `Task` | +| **AggregateAsync**(this `Observable` source, `TAccumulate` seed, `Func` func, `Func` resultSelector, `CancellationToken` cancellationToken = default) | `Task` | +| **AggregateByAsync**(this `Observable` source, `Func` keySelector, `TAccumulate` seed, `Func` func, `IEqualityComparer` keyComparer = default, `CancellationToken` cancellationToken = default) | `Task>>` | +| **AggregateByAsync**(this `Observable` source, `Func` keySelector, `Func` seedSelector, `Func` func, `IEqualityComparer` keyComparer = default, `CancellationToken` cancellationToken = default) | `Task>>` | +| **AllAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **Amb**(this `Observable` source, `Observable` second) | `Observable` | +| **AnyAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AnyAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **Append**(this `Observable` source, `T` value) | `Observable` | +| **AsObservable**(this `Observable` source) | `Observable` | +| **AsSystemObservable**(this `Observable` source) | `IObservable` | +| **AsUnitObservable**(this `Observable` source) | `Observable` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **AverageAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **Cast**(this `Observable` source) | `Observable` | +| **Catch**(this `Observable` source, `Observable` second) | `Observable` | +| **Catch**(this `Observable` source, `Func>` errorHandler) | `Observable` | +| **Chunk**(this `Observable` source, `Int32` count) | `Observable` | +| **Chunk**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | +| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | +| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `Int32` count) | `Observable` | +| **Chunk**(this `Observable` source, `TimeSpan` timeSpan, `Int32` count, `TimeProvider` timeProvider) | `Observable` | +| **Chunk**(this `Observable` source, `Observable` windowBoundaries) | `Observable` | +| **ChunkFrame**(this `Observable` source) | `Observable` | +| **ChunkFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `Int32` count) | `Observable` | +| **ChunkFrame**(this `Observable` source, `Int32` frameCount, `Int32` count, `FrameProvider` frameProvider) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | +| **CombineLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | +| **Concat**(this `Observable` source, `Observable` second) | `Observable` | +| **ContainsAsync**(this `Observable` source, `T` value, `CancellationToken` cancellationToken = default) | `Task` | +| **ContainsAsync**(this `Observable` source, `T` value, `IEqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Task` | +| **CountAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **CountAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **Debounce**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | +| **Debounce**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | +| **Debounce**(this `Observable` source, `Func` throttleDurationSelector, `Boolean` configureAwait = true) | `Observable` | +| **DebounceFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **DebounceFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **DefaultIfEmpty**(this `Observable` source) | `Observable` | +| **DefaultIfEmpty**(this `Observable` source, `T` defaultValue) | `Observable` | +| **Delay**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | +| **Delay**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **DelayFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **DelayFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **DelaySubscription**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | +| **DelaySubscription**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **DelaySubscriptionFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **DelaySubscriptionFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **Dematerialize**(this `Observable>` source) | `Observable` | +| **Distinct**(this `Observable` source) | `Observable` | +| **Distinct**(this `Observable` source, `IEqualityComparer` comparer) | `Observable` | +| **DistinctBy**(this `Observable` source, `Func` keySelector) | `Observable` | +| **DistinctBy**(this `Observable` source, `Func` keySelector, `IEqualityComparer` comparer) | `Observable` | +| **DistinctUntilChanged**(this `Observable` source) | `Observable` | +| **DistinctUntilChanged**(this `Observable` source, `IEqualityComparer` comparer) | `Observable` | +| **DistinctUntilChangedBy**(this `Observable` source, `Func` keySelector) | `Observable` | +| **DistinctUntilChangedBy**(this `Observable` source, `Func` keySelector, `IEqualityComparer` comparer) | `Observable` | +| **Do**(this `Observable` source, `Action` onNext = default, `Action` onErrorResume = default, `Action` onCompleted = default, `Action` onDispose = default, `Action` onSubscribe = default) | `Observable` | +| **Do**(this `Observable` source, `TState` state, `Action` onNext = default, `Action` onErrorResume = default, `Action` onCompleted = default, `Action` onDispose = default, `Action` onSubscribe = default) | `Observable` | +| **DoCancelOnCompleted**(this `Observable` source, `CancellationTokenSource` cancellationTokenSource) | `Observable` | +| **ElementAtAsync**(this `Observable` source, `Int32` index, `CancellationToken` cancellationToken = default) | `Task` | +| **ElementAtAsync**(this `Observable` source, `Index` index, `CancellationToken` cancellationToken = default) | `Task` | +| **ElementAtOrDefaultAsync**(this `Observable` source, `Int32` index, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **ElementAtOrDefaultAsync**(this `Observable` source, `Index` index, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **FirstAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **FirstAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **FirstOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **FirstOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **ForEachAsync**(this `Observable` source, `Action` action, `CancellationToken` cancellationToken = default) | `Task` | +| **ForEachAsync**(this `Observable` source, `Action` action, `CancellationToken` cancellationToken = default) | `Task` | +| **IgnoreElements**(this `Observable` source) | `Observable` | +| **IgnoreElements**(this `Observable` source, `Action` doOnNext) | `Observable` | +| **IgnoreOnErrorResume**(this `Observable` source) | `Observable` | +| **IgnoreOnErrorResume**(this `Observable` source, `Action` doOnErrorResume) | `Observable` | +| **Index**(this `Observable` source) | `Observable` | +| **Index**(this `Observable` source) | `Observable>` | +| **IsEmptyAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **LastAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **LastAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **LastOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **LastOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **LongCountAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **LongCountAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **Materialize**(this `Observable` source) | `Observable>` | +| **MaxAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **MaxAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **MaxAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **MaxAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **MaxByAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task` | +| **MaxByAsync**(this `Observable` source, `Func` keySelector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **Merge**(this `Observable` source, `Observable` second) | `Observable` | +| **MinAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **MinAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **MinAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **MinAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **MinByAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task` | +| **MinByAsync**(this `Observable` source, `Func` keySelector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task` | +| **MinMaxAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | +| **MinMaxAsync**(this `Observable` source, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **MinMaxAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task>` | +| **MinMaxAsync**(this `Observable` source, `Func` selector, `IComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **Multicast**(this `Observable` source, `ISubject` subject) | `ConnectableObservable` | +| **ObserveOn**(this `Observable` source, `SynchronizationContext` synchronizationContext) | `Observable` | +| **ObserveOn**(this `Observable` source, `TimeProvider` timeProvider) | `Observable` | +| **ObserveOn**(this `Observable` source, `FrameProvider` frameProvider) | `Observable` | +| **ObserveOnCurrentSynchronizationContext**(this `Observable` source) | `Observable` | +| **ObserveOnThreadPool**(this `Observable` source) | `Observable` | +| **OfType**(this `Observable` source) | `Observable` | +| **OnErrorResumeAsFailure**(this `Observable` source) | `Observable` | +| **Pairwise**(this `Observable` source) | `Observable>` | +| **Prepend**(this `Observable` source, `T` value) | `Observable` | +| **Publish**(this `Observable` source) | `ConnectableObservable` | +| **Publish**(this `Observable` source, `T` initialValue) | `ConnectableObservable` | +| **RefCount**(this `ConnectableObservable` source) | `Observable` | +| **Replay**(this `Observable` source) | `ConnectableObservable` | +| **Replay**(this `Observable` source, `Int32` bufferSize) | `ConnectableObservable` | +| **Replay**(this `Observable` source, `TimeSpan` window) | `ConnectableObservable` | +| **Replay**(this `Observable` source, `TimeSpan` window, `TimeProvider` timeProvider) | `ConnectableObservable` | +| **Replay**(this `Observable` source, `Int32` bufferSize, `TimeSpan` window) | `ConnectableObservable` | +| **Replay**(this `Observable` source, `Int32` bufferSize, `TimeSpan` window, `TimeProvider` timeProvider) | `ConnectableObservable` | +| **ReplayFrame**(this `Observable` source, `Int32` window) | `ConnectableObservable` | +| **ReplayFrame**(this `Observable` source, `Int32` window, `FrameProvider` frameProvider) | `ConnectableObservable` | +| **ReplayFrame**(this `Observable` source, `Int32` bufferSize, `Int32` window) | `ConnectableObservable` | +| **ReplayFrame**(this `Observable` source, `Int32` bufferSize, `Int32` window, `FrameProvider` frameProvider) | `ConnectableObservable` | +| **Scan**(this `Observable` source, `Func` accumulator) | `Observable` | +| **Scan**(this `Observable` source, `TAccumulate` seed, `Func` accumulator) | `Observable` | +| **Select**(this `Observable` source, `Func` selector) | `Observable` | +| **Select**(this `Observable` source, `Func` selector) | `Observable` | +| **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | +| **Select**(this `Observable` source, `TState` state, `Func` selector) | `Observable` | +| **SelectAwait**(this `Observable` source, `Func>` selector, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `Observable` | +| **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | +| **SelectMany**(this `Observable` source, `Func>` collectionSelector, `Func` resultSelector) | `Observable` | +| **SelectMany**(this `Observable` source, `Func>` selector) | `Observable` | +| **SelectMany**(this `Observable` source, `Func>` collectionSelector, `Func` resultSelector) | `Observable` | +| **SequenceEqualAsync**(this `Observable` source, `Observable` second, `CancellationToken` cancellationToken = default) | `Task` | +| **SequenceEqualAsync**(this `Observable` source, `Observable` second, `IEqualityComparer` equalityComparer, `CancellationToken` cancellationToken = default) | `Task` | +| **Share**(this `Observable` source) | `Observable` | +| **SingleAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SingleAsync**(this `Observable` source, `Func` predicate, `CancellationToken` cancellationToken = default) | `Task` | +| **SingleOrDefaultAsync**(this `Observable` source, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **SingleOrDefaultAsync**(this `Observable` source, `Func` predicate, `T` defaultValue = default, `CancellationToken` cancellationToken = default) | `Task` | +| **Skip**(this `Observable` source, `Int32` count) | `Observable` | +| **Skip**(this `Observable` source, `TimeSpan` duration) | `Observable` | +| **Skip**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | +| **SkipFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **SkipFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **SkipLast**(this `Observable` source, `Int32` count) | `Observable` | +| **SkipLast**(this `Observable` source, `TimeSpan` duration) | `Observable` | +| **SkipLast**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | +| **SkipLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **SkipLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **SkipUntil**(this `Observable` source, `Observable` other) | `Observable` | +| **SkipUntil**(this `Observable` source, `CancellationToken` cancellationToken) | `Observable` | +| **SkipUntil**(this `Observable` source, `Task` task) | `Observable` | +| **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | +| **SkipWhile**(this `Observable` source, `Func` predicate) | `Observable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeAwait**(this `Observable` source, `Func` onNextAsync, `Action` onErrorResume, `Action` onCompleted, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `IDisposable` | +| **SubscribeOn**(this `Observable` source, `SynchronizationContext` synchronizationContext) | `Observable` | +| **SubscribeOn**(this `Observable` source, `TimeProvider` timeProvider) | `Observable` | +| **SubscribeOn**(this `Observable` source, `FrameProvider` frameProvider) | `Observable` | +| **SubscribeOnCurrentSynchronizationContext**(this `Observable` source) | `Observable` | +| **SubscribeOnThreadPool**(this `Observable` source) | `Observable` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **SumAsync**(this `Observable` source, `Func` selector, `CancellationToken` cancellationToken = default) | `Task` | +| **Switch**(this `Observable>` sources) | `Observable` | +| **Synchronize**(this `Observable` source) | `Observable` | +| **Synchronize**(this `Observable` source, `Object` gate) | `Observable` | +| **Take**(this `Observable` source, `Int32` count) | `Observable` | +| **Take**(this `Observable` source, `TimeSpan` duration) | `Observable` | +| **Take**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | +| **TakeFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **TakeFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **TakeLast**(this `Observable` source, `Int32` count) | `Observable` | +| **TakeLast**(this `Observable` source, `TimeSpan` duration) | `Observable` | +| **TakeLast**(this `Observable` source, `TimeSpan` duration, `TimeProvider` timeProvider) | `Observable` | +| **TakeLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **TakeLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **TakeUntil**(this `Observable` source, `Observable` other) | `Observable` | +| **TakeUntil**(this `Observable` source, `CancellationToken` cancellationToken) | `Observable` | +| **TakeUntil**(this `Observable` source, `Task` task) | `Observable` | +| **TakeWhile**(this `Observable` source, `Func` predicate) | `Observable` | +| **TakeWhile**(this `Observable` source, `Func` predicate) | `Observable` | +| **ThrottleFirst**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | +| **ThrottleFirst**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | +| **ThrottleFirst**(this `Observable` source, `Observable` sampler) | `Observable` | +| **ThrottleFirst**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | +| **ThrottleFirstFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **ThrottleFirstFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **ThrottleLast**(this `Observable` source, `TimeSpan` timeSpan) | `Observable` | +| **ThrottleLast**(this `Observable` source, `TimeSpan` timeSpan, `TimeProvider` timeProvider) | `Observable` | +| **ThrottleLast**(this `Observable` source, `Observable` sampler) | `Observable` | +| **ThrottleLast**(this `Observable` source, `Func` sampler, `Boolean` configureAwait = true) | `Observable` | +| **ThrottleLastFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **ThrottleLastFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **Timeout**(this `Observable` source, `TimeSpan` dueTime) | `Observable` | +| **Timeout**(this `Observable` source, `TimeSpan` dueTime, `TimeProvider` timeProvider) | `Observable` | +| **TimeoutFrame**(this `Observable` source, `Int32` frameCount) | `Observable` | +| **TimeoutFrame**(this `Observable` source, `Int32` frameCount, `FrameProvider` frameProvider) | `Observable` | +| **ToArrayAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **ToAsyncEnumerable**(this `Observable` source, `CancellationToken` cancellationToken = default) | `IAsyncEnumerable` | +| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToDictionaryAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToHashSetAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToHashSetAsync**(this `Observable` source, `IEqualityComparer` comparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToListAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToLiveList**(this `Observable` source) | `LiveList` | +| **ToLiveList**(this `Observable` source, `Int32` bufferSize) | `LiveList` | +| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `CancellationToken` cancellationToken = default) | `Task>` | +| **ToLookupAsync**(this `Observable` source, `Func` keySelector, `Func` elementSelector, `IEqualityComparer` keyComparer, `CancellationToken` cancellationToken = default) | `Task>` | +| **Trampoline**(this `Observable` source) | `Observable` | +| **WaitAsync**(this `Observable` source, `CancellationToken` cancellationToken = default) | `Task` | +| **Where**(this `Observable` source, `Func` predicate) | `Observable` | +| **Where**(this `Observable` source, `Func` predicate) | `Observable` | +| **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | +| **Where**(this `Observable` source, `TState` state, `Func` predicate) | `Observable` | +| **WhereAwait**(this `Observable` source, `Func>` predicate, `AwaitOperation` awaitOperations = AwaitOperation.Sequential, `Boolean` configureAwait = true, `Int32` maxConcurrent = -1) | `Observable` | +| **WithLatestFrom**(this `Observable` first, `Observable` second, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | +| **Zip**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Func` resultSelector) | `Observable` | +| **ZipLatest**(this `Observable` source1, `Observable` source2, `Observable` source3, `Observable` source4, `Observable` source5, `Observable` source6, `Observable` source7, `Observable` source8, `Observable` source9, `Observable` source10, `Observable` source11, `Observable` source12, `Observable` source13, `Observable` source14, `Observable` source15, `Func` resultSelector) | `Observable` | In dotnet/reactive, methods that return a single `IObservable` (such as `First`) are all provided only as `***Async`, returning `Task`. Additionally, to align with the naming of Enumerable, `Buffer` has been changed to `Chunk`. @@ -1857,4 +1859,4 @@ Similar to `IObservable`, if you want to stop the stream when an `OnErrorResu License --- -This library is under the MIT License. \ No newline at end of file +This library is under the MIT License. diff --git a/src/R3/AwaitOperation.cs b/src/R3/AwaitOperation.cs index 067d1eb9..b4dca6b7 100644 --- a/src/R3/AwaitOperation.cs +++ b/src/R3/AwaitOperation.cs @@ -13,7 +13,9 @@ public enum AwaitOperation /// All values are sent immediately to the asynchronous method. Parallel, /// All values are sent immediately to the asynchronous method, but the results are queued and passed to the next operator in order. - SequentialParallel + SequentialParallel, + /// Only the latest value is queued, and the next value waits for the completion of the asynchronous method. + Latest, } internal abstract class AwaitOperationSequentialObserver : Observer @@ -664,3 +666,89 @@ async void RunQueueWorker() // don't(can't) wait so use async void } } } + +internal abstract class AwaitOperationLatestObserver : Observer +{ + readonly CancellationTokenSource cancellationTokenSource; + readonly bool configureAwait; // continueOnCapturedContext + readonly Channel channel; + bool completed; + + protected override bool AutoDisposeOnCompleted => false; // disable auto-dispose + + public AwaitOperationLatestObserver(bool configureAwait) + { + this.cancellationTokenSource = new CancellationTokenSource(); + this.configureAwait = configureAwait; + this.channel = ChannelUtility.CreateSingleReadeWriterSingularBounded(); + + RunQueueWorker(); + } + + protected override sealed void OnNextCore(T value) + { + channel.Writer.TryWrite(value); + } + + protected override sealed void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + PublishOnCompleted(result); + Dispose(); + return; + } + + Volatile.Write(ref completed, true); + channel.Writer.TryComplete(); // exit wait read loop + } + + protected override sealed void DisposeCore() + { + channel.Writer.TryComplete(); // complete writing + cancellationTokenSource.Cancel(); // stop selector await. + } + + protected abstract ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait); + protected abstract void PublishOnCompleted(Result result); + + async void RunQueueWorker() // don't(can't) wait so use async void + { + var reader = channel.Reader; + var token = cancellationTokenSource.Token; + + try + { + while (await reader.WaitToReadAsync(/* don't pass CancellationToken, uses WriterComplete */).ConfigureAwait(configureAwait)) + { + while (reader.TryRead(out var item)) + { + try + { + if (token.IsCancellationRequested) return; + + await OnNextAsync(item, token, configureAwait).ConfigureAwait(configureAwait); + } + catch (Exception ex) + { + if (ex is OperationCanceledException) + { + return; + } + OnErrorResume(ex); + } + } + } + + if (Volatile.Read(ref completed)) + { + PublishOnCompleted(Result.Success); + Dispose(); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + } + } +} diff --git a/src/R3/Internal/ChannelUtility.cs b/src/R3/Internal/ChannelUtility.cs index 1c898818..3dae9899 100644 --- a/src/R3/Internal/ChannelUtility.cs +++ b/src/R3/Internal/ChannelUtility.cs @@ -11,8 +11,21 @@ internal static class ChannelUtility AllowSynchronousContinuations = true // if false, uses TaskCreationOptions.RunContinuationsAsynchronously so avoid it. }; + static readonly BoundedChannelOptions singularBoundedOptions = new BoundedChannelOptions(1) + { + SingleWriter = true, // in Rx operator, OnNext gurantees synchronous + SingleReader = true, // almostly uses single reader loop + AllowSynchronousContinuations = true, // if false, uses TaskCreationOptions.RunContinuationsAsynchronously so avoid it. + FullMode = BoundedChannelFullMode.DropOldest, // This will ensure that the latest item to come in is always added + }; + internal static Channel CreateSingleReadeWriterUnbounded() { return Channel.CreateUnbounded(options); } + + internal static Channel CreateSingleReadeWriterSingularBounded() + { + return Channel.CreateBounded(singularBoundedOptions); + } } diff --git a/src/R3/Operators/SelectAwait.cs b/src/R3/Operators/SelectAwait.cs index cd63e1fe..6ce08675 100644 --- a/src/R3/Operators/SelectAwait.cs +++ b/src/R3/Operators/SelectAwait.cs @@ -43,6 +43,8 @@ protected override IDisposable SubscribeCore(Observer observer) if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); return source.Subscribe(new SelectAwaitSequentialParallelConcurrentLimit(observer, selector, configureAwait, maxConcurrent)); } + case AwaitOperation.Latest: + return source.Subscribe(new SelectAwaitLatest(observer, selector, configureAwait)); default: throw new ArgumentException(); } @@ -248,4 +250,27 @@ protected override void PublishOnCompleted(Result result) observer.OnCompleted(result); } } + + sealed class SelectAwaitLatest(Observer observer, Func> selector, bool configureAwait) + : AwaitOperationLatestObserver(configureAwait) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } } diff --git a/src/R3/Operators/SubscribeAwait.cs b/src/R3/Operators/SubscribeAwait.cs index 65b6dabc..16ffee51 100644 --- a/src/R3/Operators/SubscribeAwait.cs +++ b/src/R3/Operators/SubscribeAwait.cs @@ -40,6 +40,8 @@ public static IDisposable SubscribeAwait(this Observable source, Func(onNextAsync, onErrorResume, onCompleted, configureAwait)); case AwaitOperation.SequentialParallel: throw new ArgumentException("SubscribeAwait does not support SequentialParallel. Use Sequential for sequential operation, use parallel for parallel operation instead."); + case AwaitOperation.Latest: + return source.Subscribe(new SubscribeAwaitLatest(onNextAsync, onErrorResume, onCompleted, configureAwait)); default: throw new ArgumentException(); } @@ -158,3 +160,22 @@ protected override void PublishOnCompleted(Result result) } } } + +internal sealed class SubscribeAwaitLatest(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait) + : AwaitOperationLatestObserver(configureAwait) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result); + } +} diff --git a/src/R3/Operators/WhereAwait.cs b/src/R3/Operators/WhereAwait.cs index 2386cfe4..1c519ab3 100644 --- a/src/R3/Operators/WhereAwait.cs +++ b/src/R3/Operators/WhereAwait.cs @@ -11,7 +11,8 @@ public static Observable WhereAwait(this Observable source, Func(Observable source, Func> predicate, AwaitOperation awaitOperations, bool configureAwait, int maxConcurrent) : Observable +internal sealed class WhereAwait(Observable source, Func> predicate, AwaitOperation awaitOperations, bool configureAwait, int maxConcurrent) + : Observable { protected override IDisposable SubscribeCore(Observer observer) { @@ -34,7 +35,7 @@ protected override IDisposable SubscribeCore(Observer observer) return source.Subscribe(new WhereAwaitParallelConcurrentLimit(observer, predicate, configureAwait, maxConcurrent)); } - + case AwaitOperation.SequentialParallel: if (maxConcurrent == -1) { @@ -45,12 +46,15 @@ protected override IDisposable SubscribeCore(Observer observer) if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); return source.Subscribe(new WhereAwaitSequentialParallelConcurrentLimit(observer, predicate, configureAwait, maxConcurrent)); } + case AwaitOperation.Latest: + return source.Subscribe(new WhereAwaitLatest(observer, predicate, configureAwait)); default: throw new ArgumentException(); } } - sealed class WhereAwaitSequential(Observer observer, Func> predicate, bool configureAwait) : AwaitOperationSequentialObserver(configureAwait) + sealed class WhereAwaitSequential(Observer observer, Func> predicate, bool configureAwait) + : AwaitOperationSequentialObserver(configureAwait) { #if NET6_0_OR_GREATER @@ -78,7 +82,8 @@ protected override void PublishOnCompleted(Result result) } } - sealed class WhereAwaitDrop(Observer observer, Func> predicate, bool configureAwait) : AwaitOperationDropObserver(configureAwait) + sealed class WhereAwaitDrop(Observer observer, Func> predicate, bool configureAwait) + : AwaitOperationDropObserver(configureAwait) { #if NET6_0_OR_GREATER @@ -103,7 +108,8 @@ protected override void PublishOnCompleted(Result result) } } - sealed class WhereAwaitParallel(Observer observer, Func> predicate, bool configureAwait) : AwaitOperationParallelObserver(configureAwait) + sealed class WhereAwaitParallel(Observer observer, Func> predicate, bool configureAwait) + : AwaitOperationParallelObserver(configureAwait) { #if NET6_0_OR_GREATER @@ -268,4 +274,34 @@ protected override void PublishOnCompleted(Result result) observer.OnCompleted(result); } } + + sealed class WhereAwaitLatest(Observer observer, Func> predicate, bool configureAwait) + : AwaitOperationLatestObserver(configureAwait) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + if (await predicate(value, cancellationToken).ConfigureAwait(configureAwait)) + { + if (!cancellationToken.IsCancellationRequested) + { + observer.OnNext(value); + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + } diff --git a/tests/R3.Tests/OperatorTests/SelectAwaitTest.cs b/tests/R3.Tests/OperatorTests/SelectAwaitTest.cs index 0cd5bc23..af6318b5 100644 --- a/tests/R3.Tests/OperatorTests/SelectAwaitTest.cs +++ b/tests/R3.Tests/OperatorTests/SelectAwaitTest.cs @@ -495,4 +495,98 @@ public void SequentialParallelLimit() liveList.AssertIsCompleted(); } + + [Fact] + public void Latest() + { + SynchronizationContext.SetSynchronizationContext(null); // xUnit insert fucking SynchronizationContext so ignore it. + + var subject = new Subject(); + var timeProvider = new FakeTimeProvider(); + + using var liveList = subject + .SelectAwait(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(3), timeProvider, ct); + return x * 100; + }, AwaitOperation.Latest, configureAwait: false) + .ToLiveList(); + + subject.OnNext(1); + subject.OnNext(2); + subject.OnNext(3); + subject.OnNext(4); + subject.OnNext(5); + + liveList.AssertEqual([]); + + timeProvider.Advance(2); + liveList.AssertEqual([]); + + timeProvider.Advance(1); + liveList.AssertEqual([100]); + + timeProvider.Advance(3); + liveList.AssertEqual([100,500]); + + subject.OnNext(6); + subject.OnNext(7); + subject.OnNext(8); + subject.OnNext(9); + + timeProvider.Advance(1); + liveList.AssertEqual([100, 500]); + + timeProvider.Advance(2); + liveList.AssertEqual([100, 500, 600]); + + timeProvider.Advance(3); + liveList.AssertEqual([100, 500, 600, 900]); + + subject.OnCompleted(); + + liveList.AssertIsCompleted(); + } + + [Fact] + public async Task LatestCancel() + { + SynchronizationContext.SetSynchronizationContext(null); // xUnit insert fucking SynchronizationContext so ignore it. + + var subject = new Subject(); + var timeProvider = new FakeTimeProvider(); + + bool canceled = false; + using var liveList = subject + .SelectAwait(async (x, ct) => + { + try + { + await Task.Delay(TimeSpan.FromSeconds(3), timeProvider, ct); + return x * 100; + } + catch (OperationCanceledException) + { + canceled = true; + throw; + } + }, AwaitOperation.Latest) + .ToLiveList(); + + subject.OnNext(1); + subject.OnNext(2); + + liveList.AssertEqual([]); + + timeProvider.Advance(3); + liveList.AssertEqual([100]); + + canceled.Should().BeFalse(); + + liveList.Dispose(); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + canceled.Should().BeTrue(); + } } diff --git a/tests/R3.Tests/OperatorTests/SubscribeAwaitTest.cs b/tests/R3.Tests/OperatorTests/SubscribeAwaitTest.cs index 613e4262..824de088 100644 --- a/tests/R3.Tests/OperatorTests/SubscribeAwaitTest.cs +++ b/tests/R3.Tests/OperatorTests/SubscribeAwaitTest.cs @@ -205,4 +205,51 @@ public void ParallelLimit() subject.OnCompleted(); } + + [Fact] + public void Latest() + { + SynchronizationContext.SetSynchronizationContext(null); // xUnit insert fucking SynchronizationContext so ignore it. + + var subject = new Subject(); + var timeProvider = new FakeTimeProvider(); + + var liveList = new List(); + using var _ = subject + .SubscribeAwait(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(3), timeProvider, ct); + liveList.Add(x * 100); + }, AwaitOperation.Latest); + + subject.OnNext(1); + subject.OnNext(2); + subject.OnNext(3); + subject.OnNext(4); + subject.OnNext(5); + + liveList.Should().Equal([]); + + timeProvider.Advance(2); + liveList.Should().Equal([]); + + timeProvider.Advance(1); + liveList.Should().Equal([100]); + + timeProvider.Advance(3); + liveList.Should().Equal([100,500]); + + subject.OnNext(6); + subject.OnNext(7); + subject.OnNext(8); + subject.OnNext(9); + + timeProvider.Advance(3); + liveList.Should().Equal([100,500,600]); + + timeProvider.Advance(3); + liveList.Should().Equal([100,500,600,900]); + + subject.OnCompleted(); + } } diff --git a/tests/R3.Tests/OperatorTests/WhereAwaitTest.cs b/tests/R3.Tests/OperatorTests/WhereAwaitTest.cs index 8baaab2f..99f15e15 100644 --- a/tests/R3.Tests/OperatorTests/WhereAwaitTest.cs +++ b/tests/R3.Tests/OperatorTests/WhereAwaitTest.cs @@ -323,4 +323,55 @@ public void SequentialParallelLimit() liveList.AssertIsCompleted(); } + [Fact] + public void Latest() + { + SynchronizationContext.SetSynchronizationContext(null); // xUnit insert fucking SynchronizationContext so ignore it. + + var subject = new Subject(); + var timeProvider = new FakeTimeProvider(); + + using var liveList = subject + .WhereAwait(async (x, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(3), timeProvider, ct); + return x % 2 != 0; + }, AwaitOperation.Latest) + .Select(x => x * 100) + .ToLiveList(); + + subject.OnNext(1); + subject.OnNext(2); + subject.OnNext(3); + subject.OnNext(4); + subject.OnNext(5); + liveList.AssertEqual([]); + + timeProvider.Advance(2); + liveList.AssertEqual([]); + + timeProvider.Advance(1); + liveList.AssertEqual([100]); + + timeProvider.Advance(3); + liveList.AssertEqual([100,500]); + + subject.OnNext(6); + subject.OnNext(7); + subject.OnNext(8); + subject.OnNext(9); + subject.OnNext(10); + subject.OnNext(11); + + timeProvider.Advance(3); + liveList.AssertEqual([100,500]); + + timeProvider.Advance(3); + liveList.AssertEqual([100,500,1100]); + + subject.OnCompleted(); + + liveList.AssertIsCompleted(); + } + }