Skip to content

Commit

Permalink
Refactor wait strategies and sequence barriers (breaking change)
Browse files Browse the repository at this point in the history
- Replace IWaitStrategy by ISequenceWaitStrategy and ISequenceWaiter
- Wait strategies now create dedicated waiters for each event processor
- Sequence barriers are now simpler and ISequenceBarrierOptions is removed
- Sequence barriers no longer wait for published sequences by default
- Disruptors now create dedicated sequence barriers for each event processor
  • Loading branch information
ocoanet committed Dec 1, 2024
1 parent 38620ab commit dc423f2
Show file tree
Hide file tree
Showing 95 changed files with 1,413 additions and 1,334 deletions.
2 changes: 1 addition & 1 deletion src/Disruptor.Benchmarks/Disruptor.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.2" />
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="Fody" Version="6.6.4" PrivateAssets="all" />
<PackageReference Include="InlineIL.Fody" Version="1.7.3" PrivateAssets="all" />
<PackageReference Include="ObjectLayoutInspector" Version="0.1.4" />
Expand Down
148 changes: 16 additions & 132 deletions src/Disruptor.Benchmarks/Processing/EventProcessorBenchmarks_Wait.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using Disruptor.Processing;
using Disruptor.Util;

namespace Disruptor.Benchmarks.Processing;
Expand All @@ -14,20 +13,15 @@ public class EventProcessorBenchmarks_Wait
private const int _operationsPerInvoke = 1000;

private readonly IPartialEventProcessor _processor1;
private readonly IPartialEventProcessor _processor2;

public EventProcessorBenchmarks_Wait()
{
var waitStrategy = new YieldingWaitStrategy();
var sequencer = new SingleProducerSequencer(64, waitStrategy);
var cursorSequence = new Sequence();
var sequenceBarrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(cursorSequence));
var sequenceBarrierClass = new SequenceBarrierClass(sequencer, waitStrategy, new DependentSequenceGroup(cursorSequence));
var sequenceBarrierProxy = StructProxy.CreateProxyInstance(sequenceBarrierClass);
var eventProcessorType = typeof(PartialEventProcessor<,>).MakeGenericType(typeof(ISequenceBarrierOptions.IsDependentSequencePublished), sequenceBarrierProxy.GetType());
_processor1 = (IPartialEventProcessor)Activator.CreateInstance(eventProcessorType, sequenceBarrier, sequenceBarrierProxy);

_processor2 = new PartialEventProcessor<ISequenceBarrierOptions.IsDependentSequencePublished, SequenceBarrierStruct>(sequenceBarrier, new SequenceBarrierStruct(sequencer, waitStrategy, new DependentSequenceGroup(cursorSequence)));
var sequenceWaiter = waitStrategy.NewSequenceWaiter(null, new DependentSequenceGroup(cursorSequence));
var sequenceBarrier = new SequenceBarrier(sequencer, sequenceWaiter);
_processor1 = new PartialEventProcessor<EventProcessorHelpers.NoopPublishedSequenceReader>(sequenceBarrier, new EventProcessorHelpers.NoopPublishedSequenceReader());

sequencer.Publish(42);
cursorSequence.SetValue(42);
Expand All @@ -36,76 +30,43 @@ public EventProcessorBenchmarks_Wait()
[Benchmark(Baseline = true, OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_Default()
{
_processor1.ProcessingLoop_Options(42);
}

[Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_Typed_Class_Proxy()
{
_processor1.ProcessingLoop_Typed(42);
}

[Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_Typed_Struct()
{
_processor2.ProcessingLoop_Typed(42);
_processor1.ProcessingLoop(42);
}

public interface IPartialEventProcessor
{
void ProcessingLoop_Options(long nextSequence);
void ProcessingLoop_Typed(long nextSequence);
void ProcessingLoop(long nextSequence);
}

/// <summary>
/// Partial copy of <see cref="EventProcessor{T, TDataProvider, TSequenceBarrier, TEventHandler, TBatchStartAware}"/>
/// Partial copy of <see cref="EventProcessor{T, TDataProvider, TPublishedSequenceReader, TEventHandler, TOnBatchStartEvaluator, TBatchSizeLimiter}"/>
/// </summary>
public sealed class PartialEventProcessor<TSequenceBarrierOptions, TSequenceBarrier> : IPartialEventProcessor
where TSequenceBarrierOptions : ISequenceBarrierOptions
where TSequenceBarrier : ISequenceBarrier
public sealed class PartialEventProcessor<TPublishedSequenceReader> : IPartialEventProcessor
where TPublishedSequenceReader : struct, IPublishedSequenceReader
{
private readonly Sequence _sequence = new();
private readonly SequenceBarrier _sequenceBarrier;
private TSequenceBarrier _typedSequenceBarrier;
private readonly TPublishedSequenceReader _publishedSequenceReader;

public PartialEventProcessor(SequenceBarrier sequenceBarrier, TSequenceBarrier typedSequenceBarrier)
public PartialEventProcessor(SequenceBarrier sequenceBarrier, TPublishedSequenceReader publishedSequenceReader)
{
_sequenceBarrier = sequenceBarrier;
_typedSequenceBarrier = typedSequenceBarrier;
}

[MethodImpl(MethodImplOptions.NoInlining | Constants.AggressiveOptimization)]
public void ProcessingLoop_Options(long nextSequence)
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _sequenceBarrier.WaitFor<TSequenceBarrierOptions>(nextSequence);
if (waitResult.IsTimeout)
{
HandleTimeout(_sequence.Value);
return;
}

var availableSequence = waitResult.UnsafeAvailableSequence;
Process(availableSequence);

_sequence.SetValue(availableSequence);
}
_publishedSequenceReader = publishedSequenceReader;
}

[MethodImpl(MethodImplOptions.NoInlining | Constants.AggressiveOptimization)]
public void ProcessingLoop_Typed(long nextSequence)
public void ProcessingLoop(long nextSequence)
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _typedSequenceBarrier.WaitFor(nextSequence);
var waitResult = _sequenceBarrier.WaitFor(nextSequence);
if (waitResult.IsTimeout)
{
HandleTimeout(_sequence.Value);
return;
}

var availableSequence = waitResult.UnsafeAvailableSequence;
var availableSequence = _publishedSequenceReader.GetHighestPublishedSequence(nextSequence, waitResult.UnsafeAvailableSequence);
Process(availableSequence);

_sequence.SetValue(availableSequence);
Expand All @@ -122,81 +83,4 @@ private void Process(long sequence)
{
}
}

public interface ISequenceBarrier
{
SequenceWaitResult WaitFor(long sequence);
}

public sealed class SequenceBarrierClass : ISequenceBarrier
{
private readonly ISequencer _sequencer;
private readonly IWaitStrategy _waitStrategy;
private readonly DependentSequenceGroup _dependentSequences;
private CancellationTokenSource _cancellationTokenSource;

public SequenceBarrierClass(ISequencer sequencer, IWaitStrategy waitStrategy, DependentSequenceGroup dependentSequences)
{
_sequencer = sequencer;
_waitStrategy = waitStrategy;
_dependentSequences = dependentSequences;
_cancellationTokenSource = new CancellationTokenSource();
}

[MethodImpl(MethodImplOptions.AggressiveInlining | Constants.AggressiveOptimization)]
public SequenceWaitResult WaitFor(long sequence)
{
_cancellationTokenSource.Token.ThrowIfCancellationRequested();

var availableSequence = _dependentSequences.Value;
if (availableSequence >= sequence)
{
return availableSequence;
}

return InvokeWaitStrategy(sequence);
}

[MethodImpl(MethodImplOptions.NoInlining)]
private SequenceWaitResult InvokeWaitStrategy(long sequence)
{
return _waitStrategy.WaitFor(sequence, _dependentSequences, _cancellationTokenSource.Token);
}
}

public struct SequenceBarrierStruct : ISequenceBarrier
{
private readonly ISequencer _sequencer;
private readonly IWaitStrategy _waitStrategy;
private readonly DependentSequenceGroup _dependentSequences;
private CancellationTokenSource _cancellationTokenSource;

public SequenceBarrierStruct(ISequencer sequencer, IWaitStrategy waitStrategy, DependentSequenceGroup dependentSequences)
{
_sequencer = sequencer;
_waitStrategy = waitStrategy;
_dependentSequences = dependentSequences;
_cancellationTokenSource = new CancellationTokenSource();
}

[MethodImpl(MethodImplOptions.AggressiveInlining | Constants.AggressiveOptimization)]
public SequenceWaitResult WaitFor(long sequence)
{
_cancellationTokenSource.Token.ThrowIfCancellationRequested();

var availableSequence = _dependentSequences.Value;
if (availableSequence >= sequence)
{
return availableSequence;
}

return InvokeWaitStrategy(sequence);
}

[MethodImpl(MethodImplOptions.NoInlining)]
private SequenceWaitResult InvokeWaitStrategy(long sequence)
{
return _waitStrategy.WaitFor(sequence, _dependentSequences, _cancellationTokenSource.Token);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ internal class EventProcessorBenchmarks_WaitRaw
{
private const int _operationsPerInvoke = 500;

private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Sequence _sequence = new();
private readonly Sequence _cursor = new();
private readonly YieldingWaitStrategy _waitStrategy = new();
private readonly DependentSequenceGroup _dependentSequences;
private readonly SequenceBarrier _sequenceBarrier;

public EventProcessorBenchmarks_WaitRaw()
{
_dependentSequences = new DependentSequenceGroup(_cursor);
_cursor.SetValue(42);
var sequencer = new SingleProducerSequencer(1024, new YieldingWaitStrategy());
_sequenceBarrier = sequencer.NewBarrier(null);
sequencer.Publish(42);
}

public int NextSequence { get; set; } = 42;
Expand All @@ -30,7 +28,7 @@ public void WaitWithCheck()
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _waitStrategy.WaitFor(NextSequence, _dependentSequences, _cancellationTokenSource.Token);
var waitResult = _sequenceBarrier.WaitFor(NextSequence);
if (waitResult.IsTimeout)
{
HandleTimeout(_sequence.Value);
Expand All @@ -49,7 +47,7 @@ public void WaitWithoutCheck()
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _waitStrategy.WaitFor(NextSequence, _dependentSequences, _cancellationTokenSource.Token);
var waitResult = _sequenceBarrier.WaitFor(NextSequence);

var availableSequence = waitResult.UnsafeAvailableSequence;
Process(availableSequence);
Expand Down
Loading

0 comments on commit dc423f2

Please sign in to comment.