Skip to content

Chaining

Frans Lytzen edited this page Oct 30, 2020 · 1 revision

A call to SafeParallelAsyncWithResult will return an IAsyncEnumerable of result objects. The result object will contain both the input and the output (if one is returned by your Func) as well as any errors.

You can safely chain these in order to build a simple, parallel processing pipeline. The methods will respect the maximum parallelism for each "bit" and will effectively run at the pace of the slowest participant. The inputs and outputs will go out of scope as you run, so you can safely use chaining, even with millions of objects. An example usecase is reading data out of one data store, modifying it and writing it to another data store. As long as you ensure your initial data reader returns an Enumerable and reads data from the data store as it is being requested, you can safely do this without worrying about running out of memory.

Example

var idList = Enumerable.Range(1, 1000000);
IDatabaseReader dbReader = new FakeDatabaseReader(500);
IDatabaseWriter dbWriter = new FakeDatabaseWriter(200);
IQueueWriter queueWriter = new FakeQueueWriter(25);

// Note that IAsyncEnumerable has a nifty "WithCancellation" method, which you can just call at the end and 
// it will apply down through the stack. However, you can only do this when the result is an IAsyncEnumerable.
// In this example, I  

var cancellationToken = new CancellationToken();

var process = idList.SafeParallelAsyncWithResult(id => dbReader.ReadData(id), 30)
                .SafeParallelAsyncWithResult(readResult => dbWriter.WriteData(readResult.Output), 100)
                .SafeParallelAsync(writeResult => queueWriter.Write(writeResult.Output.SomeDescription), 50, cancellationToken);

await process;

In this case, you will notice that a CancellationToken is passed in to the last participant. This is entirely optional. If provided on the last call to any of the SafeParallelAsync/SafeParallelAsyncWithResult methods, the cancellation token is automatically passed to the other calls so you only need to specify it the once.

Clone this wiki locally