Skip to content

Commit

Permalink
Add cancellation tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz96 committed Sep 20, 2024
1 parent bd73c5c commit a979ef0
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 37 deletions.
48 changes: 41 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ dotnet add package PipelineNet
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [Simple example](#simple-example)
- [Pipeline vs Chain of responsibility](#pipeline-vs-chain-of-responsibility)
- [Middleware](#middleware)
- [Pipelines](#pipelines)
- [Chains of responsibility](#chains-of-responsibility)
- [Middleware resolver](#middleware-resolver)
- [License](#license)
- [Simple example](#simple-example)
- [Pipeline vs Chain of responsibility](#pipeline-vs-chain-of-responsibility)
- [Middleware](#middleware)
- [Pipelines](#pipelines)
- [Chains of responsibility](#chains-of-responsibility)
- [Cancellation tokens](#cancellation-tokens)
- [Middleware resolver](#middleware-resolver)
- [ServiceProvider implementation](#serviceprovider-implementation)
- [Unity implementation](#unity-implementation)
- [License](#license)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -194,6 +197,37 @@ result = await exceptionHandlersChain.Execute(new ArgumentException()); // Resul
result = await exceptionHandlersChain.Execute(new InvalidOperationException()); // Result will be false
```

## Cancellation tokens
If you want to pass the cancellation token to your asynchronous pipeline middleware, you can do so by implementing the `ICancellableAsyncMiddleware<TParameter>` interface
and passing the cancellation token argument to the `IAsyncPipeline<TParameter>.Execute` method:
```C#
var pipeline = new AsyncPipeline<Bitmap>(new ActivatorMiddlewareResolver())
.AddCancellable<RoudCornersCancellableAsyncMiddleware>()
.Add<AddTransparencyAsyncMiddleware>() // You can mix both kinds of asynchronous middleware
.AddCancellable<AddWatermarkCancellableAsyncMiddleware>();

Bitmap image = (Bitmap) Image.FromFile("party-photo.png");
CancellationToken cancellationToken = CancellationToken.None;
await pipeline.Execute(image, cancellationToken);

public class RoudCornersCancellableAsyncMiddleware : ICancellableAsyncMiddleware<Bitmap>
{
public async Task Run(Bitmap parameter, Func<Bitmap, Task> next, CancellationToken cancellationToken)
{
await RoundCournersAsync(parameter, cancellationToken);
await next(parameter);
}

private async Task RoudCournersAsync(Bitmap bitmap, CancellationToken cancellationToken)
{
// Handle somehow
await Task.CompletedTask;
}
}
```
And to pass the cancellation token to your asynchronous chain of responsibility middleware, you can implement the `ICancellableAsyncMiddleware<TParameter, TReturn>` interface
and pass the cancellation token argument to the `IAsynchChainOfResponsibility<TParamete, TReturnr>.Execute` method.

## Middleware resolver
You may be wondering what is all this `ActivatorMiddlewareResolver` class being passed to every instance of pipeline and chain of responsibility.
This is a default implementation of the `IMiddlewareResolver`, which is used to create instances of the middleware types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ public async Task<bool> Run(Exception exception, Func<Exception, Task<bool>> exe
return await executeNext(exception);
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<Exception, bool>
{
public async Task<bool> Run(Exception exception, Func<Exception, Task<bool>> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return await executeNext(exception);
}
}
#endregion

[Fact]
Expand Down Expand Up @@ -184,5 +193,25 @@ public void Execute_SynchronousChainOfResponsibility_SuccessfullyExecute()

Assert.Equal("Test with spaces and new lines", result);
}

[Fact]
public async Task Execute_ChainOfMiddlewareWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var responsibilityChain = new AsyncResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.Chain<UnavailableResourcesExceptionHandler>()
.Chain(typeof(InvalidateDataExceptionHandler))
.Chain<MyExceptionHandler>()
.ChainCancellable<ThrowIfCancellationRequestedMiddleware>();

// Creates an ArgumentNullException. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var exception = new ArgumentNullException();

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// The 'ThrowIfCancellationRequestedMiddleware' should throw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => responsibilityChain.Execute(exception, cancellationToken));
}
}
}
37 changes: 37 additions & 0 deletions src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public async Task Run(PersonModel context, Func<PersonModel, Task> executeNext)
await executeNext(context);
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<PersonModel>
{
public async Task Run(PersonModel context, Func<PersonModel, Task> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await executeNext(context);
}
}
#endregion

[Fact]
Expand Down Expand Up @@ -162,5 +171,33 @@ public void Add_AddTypeThatIsNotAMiddleware_ThrowsException()
pipeline.Add(typeof(AsyncPipelineTests));
});
}

[Fact]
public async Task Execute_RunPipelineWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var pipeline = new AsyncPipeline<PersonModel>(new ActivatorMiddlewareResolver())
.Add<PersonWithEvenId>()
.Add<PersonWithOddId>()
.Add<PersonWithEmailName>()
.Add<PersonWithGenderProperty>()
.AddCancellable<ThrowIfCancellationRequestedMiddleware>();

// Create a new instance with a 'Gender' property. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var personModel = new PersonModel
{
Name = "[email protected]",
Gender = Gender.Other
};

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// Check if 'ThrowIfCancellationRequestedMiddleware' threw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => pipeline.Execute(personModel, cancellationToken));

// Check if the level of 'personModel' is 4, which is configured by 'PersonWithGenderProperty' middleware.
Assert.Equal(4, personModel.Level);
}
}
}
64 changes: 64 additions & 0 deletions src/PipelineNet/AsyncBaseMiddlewareFlow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using PipelineNet.MiddlewareResolver;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace PipelineNet
{
/// <summary>
/// Defines the base class for asynchronous middleware flows.
/// </summary>
/// <typeparam name="TMiddleware">The middleware type.</typeparam>
/// <typeparam name="TCancellableMiddleware">The cancellable middleware type.</typeparam>
public abstract class AsyncBaseMiddlewareFlow<TMiddleware, TCancellableMiddleware>
{
/// <summary>
/// The list of middleware types.
/// </summary>
protected IList<Type> MiddlewareTypes { get; private set; }

/// <summary>
/// The resolver used to create the middleware types.
/// </summary>
protected IMiddlewareResolver MiddlewareResolver { get; private set; }

internal AsyncBaseMiddlewareFlow(IMiddlewareResolver middlewareResolver)
{
MiddlewareResolver = middlewareResolver ?? throw new ArgumentNullException("middlewareResolver",
"An instance of IMiddlewareResolver must be provided. You can use ActivatorMiddlewareResolver.");
MiddlewareTypes = new List<Type>();
}

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the middleware type.
/// </summary>
private static readonly TypeInfo MiddlewareTypeInfo = typeof(TMiddleware).GetTypeInfo();

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the cancellable middleware type.
/// </summary>
private static readonly TypeInfo CancellableMiddlewareTypeInfo = typeof(TCancellableMiddleware).GetTypeInfo();


/// <summary>
/// Adds a new middleware type to the internal list of types.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <typeparamref name="TMiddleware"/> or <see cref="TCancellableMiddleware"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
protected void AddMiddleware(Type middlewareType)
{
if (middlewareType == null) throw new ArgumentNullException("middlewareType");

bool isAssignableFromMiddleware = MiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo())
|| CancellableMiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo());
if (!isAssignableFromMiddleware)
throw new ArgumentException(
$"The middleware type must implement \"{typeof(TMiddleware)}\" or \"{typeof(TCancellableMiddleware)}\".");

this.MiddlewareTypes.Add(middlewareType);
}
}
}
51 changes: 42 additions & 9 deletions src/PipelineNet/ChainsOfResponsibility/AsyncResponsibilityChain.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using PipelineNet.Middleware;
using PipelineNet.MiddlewareResolver;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineNet.ChainsOfResponsibility
Expand All @@ -10,7 +11,7 @@ namespace PipelineNet.ChainsOfResponsibility
/// </summary>
/// <typeparam name="TParameter">The input type for the chain.</typeparam>
/// <typeparam name="TReturn">The return type of the chain.</typeparam>
public class AsyncResponsibilityChain<TParameter, TReturn> : BaseMiddlewareFlow<IAsyncMiddleware<TParameter, TReturn>>,
public class AsyncResponsibilityChain<TParameter, TReturn> : AsyncBaseMiddlewareFlow<IAsyncMiddleware<TParameter, TReturn>, ICancellableAsyncMiddleware<TParameter, TReturn>>,
IAsyncResponsibilityChain<TParameter, TReturn>
{
private Func<TParameter, Task<TReturn>> _finallyFunc;
Expand All @@ -35,13 +36,25 @@ public IAsyncResponsibilityChain<TParameter, TReturn> Chain<TMiddleware>() where
return this;
}

/// <summary>
/// Chains a new cancellable middleware to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <typeparam name="TCancellableMiddleware">The new middleware being added.</typeparam>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
public IAsyncResponsibilityChain<TParameter, TReturn> ChainCancellable<TCancellableMiddleware>() where TCancellableMiddleware : ICancellableAsyncMiddleware<TParameter, TReturn>
{
MiddlewareTypes.Add(typeof(TCancellableMiddleware));
return this;
}

/// <summary>
/// Chains a new middleware type to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/> or <see cref="ICancellableAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
public IAsyncResponsibilityChain<TParameter, TReturn> Chain(Type middlewareType)
Expand All @@ -54,7 +67,15 @@ public IAsyncResponsibilityChain<TParameter, TReturn> Chain(Type middlewareType)
/// Executes the configured chain of responsibility.
/// </summary>
/// <param name="parameter"></param>
public async Task<TReturn> Execute(TParameter parameter)
public async Task<TReturn> Execute(TParameter parameter) =>
await Execute(parameter, default).ConfigureAwait(false);

/// <summary>
/// Executes the configured chain of responsibility.
/// </summary>
/// <param name="parameter"></param>
/// <param name="cancellationToken">The cancellation token that will be passed to all middleware.</param>
public async Task<TReturn> Execute(TParameter parameter, CancellationToken cancellationToken)
{
if (MiddlewareTypes.Count == 0)
return default(TReturn);
Expand All @@ -68,7 +89,6 @@ public async Task<TReturn> Execute(TParameter parameter)
{
var type = MiddlewareTypes[index];
resolverResult = MiddlewareResolver.Resolve(type);
var middleware = (IAsyncMiddleware<TParameter, TReturn>)resolverResult.Middleware;

index++;
// If the current instance of middleware is the last one in the list,
Expand All @@ -77,20 +97,33 @@ public async Task<TReturn> Execute(TParameter parameter)
if (index == MiddlewareTypes.Count)
func = this._finallyFunc ?? ((p) => Task.FromResult(default(TReturn)));

if (resolverResult.IsDisposable && !(middleware is IDisposable
if (resolverResult == null || resolverResult.Middleware == null)
{
throw new InvalidOperationException($"'{MiddlewareResolver.GetType()}' failed to resolve middleware of type '{type}'.");
}

if (resolverResult.IsDisposable && !(resolverResult.Middleware is IDisposable
#if NETSTANDARD2_1_OR_GREATER
|| middleware is IAsyncDisposable
|| resolverResult.Middleware is IAsyncDisposable
#endif
))
{
throw new InvalidOperationException($"'{middleware.GetType().FullName}' type does not implement IDisposable" +
throw new InvalidOperationException($"'{resolverResult.Middleware.GetType()}' type does not implement IDisposable" +
#if NETSTANDARD2_1_OR_GREATER
" or IAsyncDisposable" +
#endif
".");
}

return await middleware.Run(param, func).ConfigureAwait(false);
if (resolverResult.Middleware is ICancellableAsyncMiddleware<TParameter, TReturn> cancellableMiddleware)
{
return await cancellableMiddleware.Run(param, func, cancellationToken).ConfigureAwait(false);
}
else
{
var middleware = (IAsyncMiddleware<TParameter, TReturn>)resolverResult.Middleware;
return await middleware.Run(param, func).ConfigureAwait(false);
}
}
finally
{
Expand Down Expand Up @@ -121,7 +154,7 @@ public async Task<TReturn> Execute(TParameter parameter)
/// <summary>
/// Sets the function to be executed at the end of the chain as a fallback.
/// A chain can only have one finally function. Calling this method more
/// a second time will just replace the existing finally <see cref="Func{TParameter, TResult}<"/>.
/// a second time will just replace the existing finally <see cref="Func{TParameter, TResult}"/>.
/// </summary>
/// <param name="finallyFunc">The function that will be execute at the end of chain.</param>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using PipelineNet.Middleware;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineNet.ChainsOfResponsibility
Expand Down Expand Up @@ -29,13 +30,22 @@ public interface IAsyncResponsibilityChain<TParameter, TReturn>
IAsyncResponsibilityChain<TParameter, TReturn> Chain<TMiddleware>()
where TMiddleware : IAsyncMiddleware<TParameter, TReturn>;

/// <summary>
/// Chains a new cancellable middleware to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <typeparam name="TCancellableMiddleware">The new cancellable middleware being added.</typeparam>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
IAsyncResponsibilityChain<TParameter, TReturn> ChainCancellable<TCancellableMiddleware>()
where TCancellableMiddleware : ICancellableAsyncMiddleware<TParameter, TReturn>;

/// <summary>
/// Chains a new middleware type to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/> or <see cref="ICancellableAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
IAsyncResponsibilityChain<TParameter, TReturn> Chain(Type middlewareType);
Expand All @@ -45,5 +55,12 @@ IAsyncResponsibilityChain<TParameter, TReturn> Chain<TMiddleware>()
/// </summary>
/// <param name="parameter"></param>
Task<TReturn> Execute(TParameter parameter);

/// <summary>
/// Executes the configured chain of responsibility.
/// </summary>
/// <param name="parameter"></param>
/// <param name="cancellationToken">The cancellation token that will be passed to all middleware.</param>
Task<TReturn> Execute(TParameter parameter, CancellationToken cancellationToken);
}
}
Loading

0 comments on commit a979ef0

Please sign in to comment.