Skip to content

Commit

Permalink
Add ReactiveCommand<TInput, TOutput>
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Aug 20, 2024
1 parent 7ebf09c commit f541cca
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 17 deletions.
27 changes: 11 additions & 16 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,22 @@
using System.Xml.Serialization;


var b = new Subject<bool>();


var doScan = Observable.FromAsync(async (token) =>
{
Console.WriteLine("scan start");
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine("scan end");
return 5;
});

var doCalc = Observable.FromAsync(async (token) =>
var rp = new ReactiveCommand<int, string>(async (x, ct) =>
{
Console.WriteLine("calc start");
await Task.Delay(TimeSpan.FromSeconds(3), token);
Console.WriteLine("calc end");
return 10;
await Task.Delay(TimeSpan.FromSeconds(1));
return x + "foo";
});

var countDown = Observable.Interval(TimeSpan.FromMilliseconds(300)).Index().Select(v => v > 9 ? 9 : v);

var work = doScan.Select(_ => doCalc).Switch().Replay(1).RefCount();
countDown.TakeUntil(work.LastAsync()).Concat(work.TakeLast(1)).Subscribe(v => Console.WriteLine($"progress: {v}"));
rp.Subscribe(x => Console.WriteLine("a:" + x));
rp.Subscribe(x => Console.WriteLine("b:" + x));

rp.Execute(0);
rp.Execute(1);

Console.ReadLine();

rp.Dispose();
2 changes: 1 addition & 1 deletion sandbox/WpfApp1/MainWindow.xaml.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,6 @@ public CommandViewModel()

public void Dispose()
{
Disposable.Combine(OnCheck, ShowMessageBox);
Disposable.Dispose(OnCheck, ShowMessageBox);
}
}
207 changes: 207 additions & 0 deletions src/R3/ReactiveCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,197 @@ public void Dispose()
}
}

public class ReactiveCommand<TInput, TOutput> : Observable<TOutput>, ICommand, IDisposable
{
FreeListCore<Subscription> list; // struct(array, int)
CompleteState completeState; // struct(int, IntPtr)
bool canExecute; // set from observable sequence
IDisposable subscription;

readonly Func<TInput, TOutput>? convert; // for sync
SingleAssignmentSubject<TInput>? asyncInput; // for async

public event EventHandler? CanExecuteChanged;

public ReactiveCommand(Func<TInput, TOutput> convert)
{
this.list = new FreeListCore<Subscription>(this);
this.canExecute = true;
this.convert = convert;
this.subscription = Disposable.Empty;
}

public ReactiveCommand(Func<TInput, CancellationToken, ValueTask<TOutput>> convertAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxSequential = -1)
{
this.list = new FreeListCore<Subscription>(this);
this.canExecute = true;
this.asyncInput = new SingleAssignmentSubject<TInput>();
this.subscription = asyncInput.SelectAwait(convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) =>
{
if (state.completeState.IsCompleted) return;

foreach (var subscription in state.list.AsSpan())
{
subscription?.observer.OnNext(x);
}
});
}

public ReactiveCommand(Observable<bool> canExecuteSource, bool initialCanExecute, Func<TInput, TOutput> convert)
{
this.list = new FreeListCore<Subscription>(this);
this.canExecute = initialCanExecute;
this.convert = convert;
this.subscription = canExecuteSource.Subscribe(this, static (newCanExecute, state) =>
{
state.ChangeCanExecute(newCanExecute);
});
}

public ReactiveCommand(Observable<bool> canExecuteSource, bool initialCanExecute, Func<TInput, CancellationToken, ValueTask<TOutput>> convertAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxSequential = -1)
{
this.list = new FreeListCore<Subscription>(this);
this.canExecute = initialCanExecute;
var subscription1 = canExecuteSource.Subscribe(this, static (newCanExecute, state) =>
{
state.ChangeCanExecute(newCanExecute);
});

this.asyncInput = new SingleAssignmentSubject<TInput>();
var subscription2 = asyncInput.SelectAwait(convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) =>
{
if (state.completeState.IsCompleted) return;

foreach (var subscription in state.list.AsSpan())
{
subscription?.observer.OnNext(x);
}
});

this.subscription = Disposable.Combine(subscription1, subscription2);
}

bool ICommand.CanExecute(object? _) // parameter is ignored
{
return CanExecute();
}

void ICommand.Execute(object? parameter)
{
if (typeof(TInput) == typeof(Unit))
{
Execute(Unsafe.As<Unit, TInput>(ref Unsafe.AsRef(in Unit.Default)));
}
else
{
Execute((TInput)parameter!);
}
}

public void ChangeCanExecute(bool canExecute)
{
if (this.canExecute == canExecute) return;
this.canExecute = canExecute;
CanExecuteChanged?.Invoke(this, EventArgs.Empty);
}

public bool IsDisabled => !CanExecute();

public bool CanExecute()
{
return canExecute;
}

public void Execute(TInput parameter)
{
if (completeState.IsCompleted) return;

if (convert != null)
{
// sync
foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnNext(convert(parameter));
}
}
else if (asyncInput != null)
{
// async
asyncInput.OnNext(parameter);
}
}

protected override IDisposable SubscribeCore(Observer<TOutput> observer)
{
var result = completeState.TryGetResult();
if (result != null)
{
observer.OnCompleted(result.Value);
return Disposable.Empty;
}

var subscription = new Subscription(this, observer); // create subscription and add observer to list.

// need to check called completed during adding
result = completeState.TryGetResult();
if (result != null)
{
subscription.observer.OnCompleted(result.Value);
subscription.Dispose();
return Disposable.Empty;
}

return subscription;
}

public void Dispose()
{
Dispose(true);
}

public void Dispose(bool callOnCompleted)
{
if (completeState.TrySetDisposed(out var alreadyCompleted))
{
if (callOnCompleted && !alreadyCompleted)
{
// not yet disposed so can call list iteration
foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnCompleted();
}
}

list.Dispose();
subscription?.Dispose();
asyncInput?.Dispose();
}
}

sealed class Subscription : IDisposable
{
public readonly Observer<TOutput> observer;
readonly int removeKey;
ReactiveCommand<TInput, TOutput>? parent;

public Subscription(ReactiveCommand<TInput, TOutput> parent, Observer<TOutput> observer)
{
this.parent = parent;
this.observer = observer;
parent.list.Add(this, out removeKey); // for the thread-safety, add and set removeKey in same lock.
}

public void Dispose()
{
var p = Interlocked.Exchange(ref parent, null);
if (p == null) return;

// removeKey is index, will reuse if remove completed so only allows to call from here and must not call twice.
p.list.Remove(removeKey);
}
}
}

public static class ReactiveCommandExtensions
{
public static ReactiveCommand<T> ToReactiveCommand<T>(this Observable<bool> canExecuteSource, bool initialCanExecute = true)
Expand All @@ -178,6 +369,12 @@ public static ReactiveCommand<T> ToReactiveCommand<T>(this Observable<bool> canE
return command;
}

public static ReactiveCommand<TInput, TOutput> ToReactiveCommand<TInput, TOutput>(this Observable<bool> canExecuteSource, Func<TInput, TOutput> convert, bool initialCanExecute = true)
{
var command = new ReactiveCommand<TInput, TOutput>(canExecuteSource, initialCanExecute, convert);
return command;
}

public static ReactiveCommand<Unit> ToReactiveCommand(this Observable<bool> canExecuteSource, bool initialCanExecute = true)
{
var command = new ReactiveCommand<Unit>(canExecuteSource, initialCanExecute);
Expand Down Expand Up @@ -207,4 +404,14 @@ public static ReactiveCommand<T> ToReactiveCommand<T>(

return command;
}

public static ReactiveCommand<TInput, TOutput> ToReactiveCommand<TInput, TOutput>(
this Observable<bool> canExecuteSource, Func<TInput, CancellationToken, ValueTask<TOutput>> convertAsync,
bool initialCanExecute = true,
AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true,
bool cancelOnCompleted = false, int maxSequential = -1)
{
var command = new ReactiveCommand<TInput, TOutput>(canExecuteSource, initialCanExecute, convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential);
return command;
}
}

0 comments on commit f541cca

Please sign in to comment.