Skip to content

Commit

Permalink
feat: fluent builders for linear pipelines (#67)
Browse files Browse the repository at this point in the history
- support for mapper, filter, plucker, splitter
- support for sink and source
- support for aggregators and parsers
  • Loading branch information
Seddryck authored Sep 14, 2024
1 parent 4d45700 commit b953810
Show file tree
Hide file tree
Showing 67 changed files with 1,155 additions and 223 deletions.
2 changes: 1 addition & 1 deletion Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Aggregator<TSource, TAccumulate, TResult> : SingleRouterPipe<TSourc
public TAccumulate? State { get; set; }
private TAccumulate? Seed { get; }

public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
public Aggregator(IChainablePort<TSource>? upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default, Expression<Action<Aggregator<TSource, TAccumulate, TResult>>>? completion = null)
: base(upstream)
{
(Accumulator, Selector, State, Seed) = (accumulator, selector, seed, seed);
Expand Down
13 changes: 12 additions & 1 deletion Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,25 @@ public abstract class ChainablePipe<T> : ObservablePipe, IChainablePipe<T>
public MainOutputPort<T> Main { get; }
protected Action? Completion { get; set; }
public IChainablePipe Pipe { get => this; }

public Pipeline? Pipeline { get; protected set; }

protected ChainablePipe(ObservabilityProvider? observability)
: base(observability)
{
Main = new(this);
}

protected ChainablePipe(IChainablePipe? upstream)
: base(upstream?.GetObservabilityProvider())
{
Main = new(this);
if (upstream is not null)
if (upstream is Pipeline pipeline)
Pipeline = pipeline;
else
Pipeline = upstream.Pipeline;
}

public void RegisterDownstream(Action<T?> downstream, Action? completion)
{
RegisterDownstream(downstream);
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/Combinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public abstract class Combinator<TFirst, TSecond, TResult> : ChainablePipe<TResu
protected IChainablePort<TSecond> SecondUpstream { get; }

public Combinator(IChainablePort<TFirst> firstUpstream, IChainablePort<TSecond> secondUpstream, Func<TFirst?, TSecond?, TResult?> function)
: base(firstUpstream.Pipe.GetObservabilityProvider())
: base(firstUpstream.Pipe)
{
firstUpstream.RegisterDownstream(EmitFirst);
firstUpstream.Pipe.RegisterOnCompleted(Complete);
Expand Down
8 changes: 4 additions & 4 deletions Streamistry.Core/DualRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public abstract class DualRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>,
public OutputPort<TInput> Alternate { get; }
public new OutputPort<TOutput> Main { get => base.Main; }

public DualRouterPipe(IChainablePort<TInput> upstream)
: base(upstream.Pipe.GetObservabilityProvider())
public DualRouterPipe(IChainablePort<TInput>? upstream)
: base(upstream?.Pipe)
{
Alternate = new(this, "Alternate");
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
upstream?.RegisterDownstream(Emit);
upstream?.Pipe.RegisterOnCompleted(Complete);
}

[Meter]
Expand Down
11 changes: 10 additions & 1 deletion Streamistry.Core/ExceptionMapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -10,12 +11,20 @@ public class ExceptionMapper<TInput, TOutput> : ExceptionRouterPipe<TInput, TOut
{
public Func<TInput?, TOutput?> Function { get; init; }

public ExceptionMapper(IChainablePort<TInput> upstream, Func<TInput?, TOutput?> function)
protected ExceptionMapper(Func<TInput?, TOutput?> function, IChainablePort<TInput> ? upstream)
: base(upstream)
{
Function = function;
}

public ExceptionMapper(IChainablePort<TInput> upstream, Func<TInput?, TOutput?> function)
: this(function, upstream)
{ }

public ExceptionMapper(Func<TInput?, TOutput?> function)
: this(function, null)
{ }

protected override TOutput? Invoke(TInput? obj)
=> Function.Invoke(obj);
}
2 changes: 1 addition & 1 deletion Streamistry.Core/ExceptionRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace Streamistry;
public abstract class ExceptionRouterPipe<TInput, TOutput> : DualRouterPipe<TInput, TOutput>
{
public ExceptionRouterPipe(IChainablePort<TInput> upstream)
public ExceptionRouterPipe(IChainablePort<TInput>? upstream)
: base(upstream)
{ }

Expand Down
10 changes: 8 additions & 2 deletions Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ public class Filter<TInput> : BaseSingleRouterPipe<TInput, TInput>
{
public Func<TInput?, bool> Predicate { get; init; }

public Filter(Func<TInput?, bool> predicate)
: this(predicate, null)
{ }

public Filter(IChainablePort<TInput> upstream, Func<TInput?, bool> predicate)
: this(predicate, upstream)
{ }

public Filter(Func<TInput?, bool> predicate, IChainablePort<TInput>? upstream = null)
: base(upstream)
{
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Predicate = predicate;
}

Expand Down
77 changes: 77 additions & 0 deletions Streamistry.Core/Fluent/AggregatorBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Pipes.Aggregators;

namespace Streamistry.Fluent;
internal class AggregatorBuilder<TInput, TAccumulate, TOutput>
{
protected IPipeBuilder<TInput> Upstream { get; }

public AggregatorBuilder(IPipeBuilder<TInput> upstream)
=> Upstream = upstream;

public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsMax()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Max<>), [typeof(TInput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsMin()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Min<>), [typeof(TInput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsAverage()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Average<,>), [typeof(TInput), typeof(TOutput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsMedian()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Median<,>), [typeof(TInput), typeof(TOutput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsSum()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Sum<,>), [typeof(TInput), typeof(TOutput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsCount()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Count<,>), [typeof(TInput), typeof(TOutput)]);

}

internal class SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Type Type { get; }
protected Type[] GenericTypeParameters { get; } = [typeof(int)];
public SpecializedAggregatorBuilder(IPipeBuilder<TInput> upstream, Type type, Type[] genericTypeParameters)
: base(upstream)
=> (Type, GenericTypeParameters) = (type, genericTypeParameters);

public override IChainablePort<TOutput> OnBuildPort()
{
var t = Type.MakeGenericType(GenericTypeParameters);
return (IChainablePort<TOutput>)Activator.CreateInstance(t, Upstream.BuildPort(), null)!;
}
}

internal class UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TAccumulate?, TInput?, TAccumulate?>? Accumulator { get; }
protected Func<TAccumulate?, TOutput?>? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput));
protected TAccumulate? Seed { get; set; } = default;

public UniversalAggregatorBuilder(IPipeBuilder<TInput> upstream, Func<TAccumulate?, TInput?, TAccumulate?> accumulator)
: base(upstream)
=> (Accumulator) = (accumulator);

public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSelector(Func<TAccumulate?, TOutput?>? selector)
{
Selector = selector;
return this;
}

public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSeed(TAccumulate? seed)
{
Seed = seed;
return this;
}

public override IChainablePort<TOutput> OnBuildPort()
=> new Aggregator<TInput, TAccumulate, TOutput>(
Upstream.BuildPort()
, Accumulator ?? throw new InvalidOperationException()
, Selector ?? throw new InvalidOperationException()
, Seed
);

}
51 changes: 51 additions & 0 deletions Streamistry.Core/Fluent/BasePipeBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;

internal abstract class BasePipeBuilder<TOutput> : IPipeBuilder<TOutput>
{
protected IChainablePort<TOutput>? Instance { get; set; }

public abstract IChainablePort<TOutput> OnBuildPort();

public IChainablePort<TOutput> BuildPort()
=> Instance ??= OnBuildPort();

public Pipeline Build()
{
BuildPort();
return Instance!.Pipe.Pipeline!;
}

public SinkBuilder<TOutput> Sink()
=> new(this);

public FilterBuilder<TOutput> Filter(Func<TOutput?, bool>? function)
=> new(this, function);
public MapperBuilder<TOutput, TNext> Map<TNext>(Func<TOutput?, TNext?>? function)
=> new(this, function);
public PluckerBuilder<TOutput, TNext> Pluck<TNext>(Expression<Func<TOutput, TNext?>> expr)
=> new(this, expr);
public SplitterBuilder<TOutput, TNext> Split<TNext>(Func<TOutput?, TNext[]?>? function)
=> new(this, function);

public UniversalAggregatorBuilder<TOutput, TAccumulate, TNext> Aggregate<TAccumulate, TNext>(Func<TAccumulate?, TOutput?, TAccumulate?> accumulator)
=> new(this, accumulator);
public UniversalAggregatorBuilder<TOutput, TNext, TNext> Aggregate<TNext>(Func<TNext?, TOutput?, TNext?> accumulator)
=> new(this, accumulator);
public UniversalAggregatorBuilder<TOutput, TOutput, TOutput> Aggregate(Func<TOutput?, TOutput?, TOutput?> accumulator)
=> new(this, accumulator);
public AggregatorBuilder<TOutput, TOutput, TOutput> Aggregate()
=> new(this);

public ParserBuilder<TOutput, TNext> Parse<TNext>(ParserDelegate<TOutput, TNext> parser)
=> new(this, parser);
public ParserBuilder<TOutput> Parse()
=> new(this);

}
21 changes: 21 additions & 0 deletions Streamistry.Core/Fluent/FilterBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class FilterBuilder<TInput> : PipeElementBuilder<TInput, TInput>, IPipeBuilder<TInput>
{
protected Func<TInput?, bool>? Function { get; }

public FilterBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, bool>? function)
:base(upstream)
=> (Function) = (function);

public override IChainablePort<TInput> OnBuildPort()
=> new Filter<TInput>(
Upstream.BuildPort()
, Function ?? throw new InvalidOperationException()
);
}
15 changes: 15 additions & 0 deletions Streamistry.Core/Fluent/IPipeBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal interface IPipeBuilder<T> : IBuilder<IChainablePort<T>>
{ }

internal interface IBuilder<T>
{
T BuildPort();
T OnBuildPort();
}
21 changes: 21 additions & 0 deletions Streamistry.Core/Fluent/MapperBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TInput?, TOutput?>? Function { get; set; }

public MapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, TOutput?>? function)
: base(upstream)
=> (Function) = (function);

public override IChainablePort<TOutput> OnBuildPort()
=> new Mapper<TInput, TOutput>(
Upstream.BuildPort()
, Function ?? throw new InvalidOperationException()
);
}
Loading

0 comments on commit b953810

Please sign in to comment.