Skip to content

Commit

Permalink
Cleanup WorkerPool
Browse files Browse the repository at this point in the history
- Create dedicated sequence barriers for processors
- Remove constructor that creates a new ring buffer
- Fix Start type
  • Loading branch information
ocoanet committed Dec 7, 2024
1 parent 7c0c517 commit 89728c5
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public long Run(ThroughputSessionContext sessionContext)
{
ResetCounters();

var workerPool = new WorkerPool<PerfEvent>(_ringBuffer, _ringBuffer.NewBarrier(), new FatalExceptionHandler<PerfEvent>(), _handlers);
var workerPool = new WorkerPool<PerfEvent>(_ringBuffer, Array.Empty<Sequence>(), new FatalExceptionHandler<PerfEvent>(), _handlers);
_ringBuffer.AddGatingSequences(workerPool.GetWorkerSequences());

workerPool.Start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public long Run(ThroughputSessionContext sessionContext)
{
ResetCounters();

var workerPool = new WorkerPool<PerfEvent>(_ringBuffer, _ringBuffer.NewBarrier(), new FatalExceptionHandler<PerfEvent>(), _handlers);
var workerPool = new WorkerPool<PerfEvent>(_ringBuffer, Array.Empty<Sequence>(), new FatalExceptionHandler<PerfEvent>(), _handlers);

_ringBuffer.AddGatingSequences(workerPool.GetWorkerSequences());

Expand Down
17 changes: 11 additions & 6 deletions src/Disruptor.Tests/Processing/WorkerPoolTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using Disruptor.Processing;
using NUnit.Framework;

Expand All @@ -10,12 +11,14 @@ public class WorkerPoolTests
[Test]
public void ShouldProcessEachMessageByOnlyOneWorker()
{
var pool = new WorkerPool<AtomicLong>(() => new AtomicLong(),
var ringBuffer = RingBuffer<AtomicLong>.CreateMultiProducer(() => new AtomicLong(), 1024, new BlockingWaitStrategy());
var pool = new WorkerPool<AtomicLong>(ringBuffer,
Array.Empty<Sequence>(),
new FatalExceptionHandler<AtomicLong>(),
new AtomicLongWorkHandler(),
new AtomicLongWorkHandler());

var ringBuffer = pool.Start();
pool.Start();

ringBuffer.Next();
ringBuffer.Next();
Expand All @@ -31,12 +34,14 @@ public void ShouldProcessEachMessageByOnlyOneWorker()
[Test]
public void ShouldProcessOnlyOnceItHasBeenPublished()
{
var pool = new WorkerPool<AtomicLong>(() => new AtomicLong(),
var ringBuffer = RingBuffer<AtomicLong>.CreateMultiProducer(() => new AtomicLong(), 1024, new BlockingWaitStrategy());
var pool = new WorkerPool<AtomicLong>(ringBuffer,
Array.Empty<Sequence>(),
new FatalExceptionHandler<AtomicLong>(),
new AtomicLongWorkHandler(),
new AtomicLongWorkHandler());

var ringBuffer = pool.Start();
pool.Start();

ringBuffer.Next();
ringBuffer.Next();
Expand Down Expand Up @@ -66,4 +71,4 @@ public void OnEvent(AtomicLong evt)
evt.Increment();
}
}
}
}
5 changes: 2 additions & 3 deletions src/Disruptor/Dsl/Disruptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,9 @@ IEventProcessor CreateEventProcessor(EventProcessorCreator<T> processorFactory)

internal EventHandlerGroup<T> CreateWorkerPool(Sequence[] barrierSequences, IWorkHandler<T>[] workHandlers)
{
var sequenceBarrier = _ringBuffer.NewBarrier(barrierSequences);
var workerPool = new WorkerPool<T>(_ringBuffer, sequenceBarrier, _exceptionHandler, workHandlers);
var workerPool = new WorkerPool<T>(_ringBuffer, barrierSequences, _exceptionHandler, workHandlers);

_consumerRepository.Add(workerPool, sequenceBarrier.DependentSequences);
_consumerRepository.Add(workerPool, workerPool.DependentSequences);

var workerSequences = workerPool.GetWorkerSequences();

Expand Down
2 changes: 2 additions & 0 deletions src/Disruptor/Processing/WorkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public WorkProcessor(RingBuffer<T> ringBuffer, SequenceBarrier sequenceBarrier,
/// <inheritdoc/>
public Sequence Sequence => _sequence;

public DependentSequenceGroup DependentSequences => _sequenceBarrier.DependentSequences;

/// <inheritdoc/>
public void Halt()
{
Expand Down
66 changes: 27 additions & 39 deletions src/Disruptor/Processing/WorkerPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -17,7 +18,6 @@ public sealed class WorkerPool<T> where T : class
private volatile int _runState = ProcessorRunStates.Idle;
private readonly Sequence _workSequence = new();
private readonly RingBuffer<T> _ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private readonly WorkProcessor<T>[] _workProcessors;

/// <summary>
Expand All @@ -27,56 +27,46 @@ public sealed class WorkerPool<T> where T : class
/// called before the work pool is started.
/// </summary>
/// <param name="ringBuffer">ringBuffer of events to be consumed.</param>
/// <param name="sequenceBarrier">sequenceBarrier on which the workers will depend.</param>
/// <param name="barrierSequences">sequences of the processors that must run before</param>
/// <param name="exceptionHandler">exceptionHandler to callback when an error occurs which is not handled by the <see cref="IWorkHandler{T}"/>s.</param>
/// <param name="workHandlers">workHandlers to distribute the work load across.</param>
public WorkerPool(RingBuffer<T> ringBuffer, SequenceBarrier sequenceBarrier, IExceptionHandler<T> exceptionHandler, params IWorkHandler<T>[] workHandlers)
public WorkerPool(RingBuffer<T> ringBuffer, Sequence[] barrierSequences, IExceptionHandler<T> exceptionHandler, params IWorkHandler<T>[] workHandlers)
{
if (workHandlers.Length == 0)
throw new ArgumentException("Unable to create worker pool without any work handlers.");

_ringBuffer = ringBuffer;
_workProcessors = new WorkProcessor<T>[workHandlers.Length];
_workProcessors = workHandlers.Select(x => CreateWorkProcessor(x, _workSequence)).ToArray();

DependentSequences = _workProcessors[0].DependentSequences;

for (var i = 0; i < workHandlers.Length; i++)
WorkProcessor<T> CreateWorkProcessor(IWorkHandler<T> workHandler, Sequence workSequence)
{
_workProcessors[i] = new WorkProcessor<T>(
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
_workSequence);
var sequenceBarrier = ringBuffer.NewBarrier(barrierSequences);
return new WorkProcessor<T>(ringBuffer, sequenceBarrier, workHandler, exceptionHandler, workSequence);
}
}

/// <summary>
/// Construct a work pool with an internal <see cref="RingBuffer{T}"/> for convenience.
/// Create a worker pool to enable an array of <see cref="IWorkHandler{T}"/>s to consume published sequences.
///
/// This option does not require <see cref="ISequencer.AddGatingSequences"/> to be called before the work pool is started.
/// This option requires a pre-configured <see cref="RingBuffer{T}"/> which must have <see cref="ISequencer.AddGatingSequences"/>
/// called before the work pool is started.
/// </summary>
/// <param name="eventFactory">eventFactory for filling the <see cref="RingBuffer{T}"/></param>
/// <param name="exceptionHandler">exceptionHandler to callback when an error occurs which is not handled by the <see cref="IWorkHandler{T}"/>s.</param>
/// <param name="workHandlers">workHandlers to distribute the work load across.</param>
public WorkerPool(Func<T> eventFactory, IExceptionHandler<T> exceptionHandler, params IWorkHandler<T>[] workHandlers)
/// <param name="ringBuffer">ringBuffer of events to be consumed.</param>
/// <param name="workProcessors">event processors of the target <see cref="IWorkHandler{T}"/>.</param>
public WorkerPool(RingBuffer<T> ringBuffer, WorkProcessor<T>[] workProcessors)
{
_ringBuffer = RingBuffer<T>.CreateMultiProducer(eventFactory, 1024, new BlockingWaitStrategy());
var barrier = _ringBuffer.NewBarrier();
_workProcessors = new WorkProcessor<T>[workHandlers.Length];
if (workProcessors.Length == 0)
throw new ArgumentException("Unable to create worker pool without any work processors.");

for (var i = 0; i < workHandlers.Length; i++)
{
_workProcessors[i] = new WorkProcessor<T>(
_ringBuffer,
barrier,
workHandlers[i],
exceptionHandler,
_workSequence);
}
_ringBuffer = ringBuffer;
_workProcessors = workProcessors;

_ringBuffer.AddGatingSequences(GetWorkerSequences());
DependentSequences = _workProcessors[0].DependentSequences;
}

/// <summary>
/// The <see cref="RingBuffer{T}"/> used by this worker pool.
/// </summary>
public RingBuffer<T> RingBuffer => _ringBuffer;
public DependentSequenceGroup DependentSequences { get; }

/// <summary>
/// Get an array of <see cref="Sequence"/>s representing the progress of the workers.
Expand Down Expand Up @@ -113,16 +103,16 @@ public void WaitUntilStarted(TimeSpan timeout)
/// Start the worker pool processing events in sequence.
/// </summary>
/// <exception cref="InvalidOperationException">if the pool is already started or halted</exception>
public RingBuffer<T> Start()
public void Start()
{
return Start(TaskScheduler.Default);
Start(TaskScheduler.Default);
}

/// <summary>
/// Start the worker pool processing events in sequence.
/// </summary>
/// <exception cref="InvalidOperationException">if the pool is already started or halted</exception>
public RingBuffer<T> Start(TaskScheduler taskScheduler)
public void Start(TaskScheduler taskScheduler)
{
var previousRunState = Interlocked.CompareExchange(ref _runState, ProcessorRunStates.Running, ProcessorRunStates.Idle);
if (previousRunState == ProcessorRunStates.Running)
Expand All @@ -143,8 +133,6 @@ public RingBuffer<T> Start(TaskScheduler taskScheduler)
workProcessor.Sequence.SetValue(cursor);
workProcessor.StartLongRunning(taskScheduler);
}

return _ringBuffer;
}

/// <summary>
Expand Down

0 comments on commit 89728c5

Please sign in to comment.