Skip to content

Commit

Permalink
Update BatchEventProcessorBenchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Oct 4, 2019
1 parent 99d2934 commit 806219a
Showing 1 changed file with 43 additions and 25 deletions.
68 changes: 43 additions & 25 deletions src/Disruptor.Benchmarks/BatchEventProcessorBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,67 @@
using BenchmarkDotNet.Attributes;
using System;
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;

namespace Disruptor.Benchmarks
{
public class BatchEventProcessorBenchmarks
{
private readonly Sequence _sequence = new Sequence();
private readonly RingBuffer<TestEvent> _ringBuffer;
private readonly IBatchEventProcessor<TestEvent> _processor;
private readonly TestEventHandler _eventHandler;
private readonly ISequenceBarrier _sequenceBarrier;

public BatchEventProcessorBenchmarks()
{
_ringBuffer = new RingBuffer<TestEvent>(() => new TestEvent(), new SingleProducerSequencer(4096, new SpinWaitWaitStrategy()));
_eventHandler = new TestEventHandler();
_processor = BatchEventProcessorFactory.Create(_ringBuffer, _ringBuffer.NewBarrier(), _eventHandler);
_sequenceBarrier = _ringBuffer.NewBarrier();

_eventHandler.Processor = _processor;
_eventHandler.RingBuffer = _ringBuffer;
_ringBuffer.AddGatingSequences(_processor.Sequence);
_ringBuffer.PublishEvent().Dispose();
}

[IterationSetup]
public void BeforeRun()
public volatile int Running;

[Benchmark]
public long ProcessEvent()
{
var sequence = _ringBuffer.Next(4096);
for (var i = 0; i < 4096; i++)
var nextSequence = 0L;
try
{
var availableSequence = _sequenceBarrier.WaitFor(nextSequence);

while (nextSequence <= availableSequence)
{
var evt = _ringBuffer[nextSequence];
_eventHandler.OnEvent(evt, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

_sequence.SetValue(availableSequence);
}
catch (TimeoutException)
{
NotifyTimeout(_sequence.Value);
}
catch (AlertException)
{
if (Running != 2)
{
return nextSequence;
}
}
catch (Exception)
{
_ringBuffer[sequence + i].Data = i;
_sequence.SetValue(nextSequence);
nextSequence++;
}
_ringBuffer.Publish(sequence);

return nextSequence;
}

[Benchmark(OperationsPerInvoke = 4096)]
public void Run()
[MethodImpl(MethodImplOptions.NoInlining)]
private void NotifyTimeout(long sequenceValue)
{
_processor.Run();
}

public class TestEvent
Expand All @@ -43,18 +71,8 @@ public class TestEvent

public class TestEventHandler : IEventHandler<TestEvent>
{
public long Sum { get; set; }
public RingBuffer<TestEvent> RingBuffer { get; set; }
public IBatchEventProcessor<TestEvent> Processor { get; set; }

public void OnEvent(TestEvent data, long sequence, bool endOfBatch)
{
Sum += data.Data;

if (data.Data == 4095)
Processor.Halt();
else
RingBuffer.Publish(sequence + 1);
}
}
}
Expand Down

0 comments on commit 806219a

Please sign in to comment.