Skip to content

Commit

Permalink
PerfTests: Add option to configure CPU affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Nov 10, 2024
1 parent 32d43e7 commit 7d1bf53
Show file tree
Hide file tree
Showing 31 changed files with 536 additions and 258 deletions.
6 changes: 3 additions & 3 deletions src/Disruptor.PerfTests/ILatencyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Disruptor.PerfTests;

public interface ILatencyTest
{
void Run(Stopwatch stopwatch, HistogramBase histogram);

int RequiredProcessorCount { get; }
}

void Run(LatencySessionContext sessionContext);
}
6 changes: 3 additions & 3 deletions src/Disruptor.PerfTests/IThroughputTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Disruptor.PerfTests;

public interface IThroughputTest
{
long Run(ThroughputSessionContext sessionContext);

int RequiredProcessorCount { get; }
}

long Run(ThroughputSessionContext sessionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ public OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
waiter.Initialize(sessionContext.Histogram);
waiter.Start();
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

for (int i = 0; i < _iterations; i++)
{
Expand All @@ -64,7 +64,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)
waiter.Task.Wait();
}

stopwatch.Stop();
sessionContext.Stop();
}

private class Waiter : IValueTaskSource<bool>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ public OneWayAwaitLatencyTest_QueueUserWorkItem()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
waiter.Initialize(sessionContext.Histogram);
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

for (var i = 0; i < _iterations; i++)
{
Expand All @@ -56,7 +56,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)

_completed.Wait();

stopwatch.Stop();
sessionContext.Stop();
}

private class Waiter : IThreadPoolWorkItem
Expand All @@ -73,6 +73,7 @@ public Waiter(ManualResetEventSlim completed)
public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
_startTimestamp = 0;
}

public void Notify(long timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ public OneWayAwaitLatencyTest_TaskCompletionSource()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_completed.Reset();

foreach (var waiter in _waiters)
{
waiter.Initialize(histogram);
waiter.Initialize(sessionContext.Histogram);
waiter.Start();
}

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

for (int i = 0; i < _iterations; i++)
{
Expand All @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)
waiter.Task.Wait();
}

stopwatch.Stop();
sessionContext.Stop();
}

private class Waiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ public OneWayChannelLatencyTest()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
var startSignal = new ManualResetEventSlim();
var producerTask = _producer.Start(startSignal);
var consumerTask = _consumer.Start(histogram);
var consumerTask = _consumer.Start(sessionContext.Histogram);

_producer.Started.Wait();
_consumer.Started.Wait();

stopwatch.Start();
sessionContext.Start();

startSignal.Set();

producerTask.Wait();
consumerTask.Wait();

stopwatch.Stop();
sessionContext.Stop();
}

private class Producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public OneWaySequencedLatencyTest()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_handler.Initialize(histogram);
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

var ringBuffer = _disruptor.RingBuffer;

Expand All @@ -61,7 +61,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)

_handler.Completed.Wait();

stopwatch.Stop();
sessionContext.Stop();
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public OneWaySequencedLatencyTest_AsyncBatchHandler()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_handler.Initialize(histogram);
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

var ringBuffer = _disruptor.RingBuffer;

Expand All @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)

_handler.Completed.Wait();

stopwatch.Stop();
sessionContext.Stop();
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public OneWaySequencedLatencyTest_BatchHandler()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_handler.Initialize(histogram);
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

var ringBuffer = _disruptor.RingBuffer;

Expand All @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)

_handler.Completed.Wait();

stopwatch.Stop();
sessionContext.Stop();
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public OneWaySequencedLatencyTest_Channel()

public int RequiredProcessorCount => 2;

public void Run(Stopwatch stopwatch, HistogramBase histogram)
public void Run(LatencySessionContext sessionContext)
{
_consumer.Initialize(histogram);
_consumer.Initialize(sessionContext.Histogram);

var consumerTask = Task.Run(_consumer.Run);

Expand All @@ -44,7 +44,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)
var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

stopwatch.Start();
sessionContext.Start();

for (int i = 0; i < _iterations; i++)
{
Expand All @@ -70,7 +70,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram)

_consumer.Completed.Wait();

stopwatch.Stop();
sessionContext.Stop();

consumerTask.Wait();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Diagnostics;
using System.Threading;
using Disruptor.Dsl;
using Disruptor.PerfTests.Support;
using Disruptor.Util;
using HdrHistogram;

namespace Disruptor.PerfTests.Latency.OneWay;

public class OneWaySequencedLatencyTest_ThreadAffinity : ILatencyTest, IDisposable
{
private readonly ProgramOptions _options;
private const int _bufferSize = 1024;
private const long _iterations = 100 * 1000 * 30;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private readonly Disruptor<PerfEvent> _disruptor;
private readonly Handler _handler;

public OneWaySequencedLatencyTest_ThreadAffinity(ProgramOptions options)
{
_options = options;
_disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy());
_handler = new Handler(options.CpuSet[1]);
_disruptor.HandleEventsWith(_handler);
_disruptor.Start();
}

public int RequiredProcessorCount => 2;

public void Run(LatencySessionContext sessionContext)
{
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.CpuSet[0]);

Thread.CurrentThread.Priority = ThreadPriority.Highest;

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

sessionContext.Start();

var ringBuffer = _disruptor.RingBuffer;

for (int i = 0; i < _iterations; i++)
{
var now = Stopwatch.GetTimestamp();
while (now < next)
{
Thread.Yield();
now = Stopwatch.GetTimestamp();
}

var s = ringBuffer.Next();
ringBuffer[s].Value = now;
ringBuffer.Publish(s);

next = now + pause;
}

var lastS = ringBuffer.Next();
ringBuffer[lastS].Value = -1;
ringBuffer.Publish(lastS);

_handler.Completed.Wait();

sessionContext.Stop();
}

public void Dispose()
{
_disruptor.Shutdown();
}

private class Handler(int cpu) : IEventHandler<PerfEvent>
{
private HistogramBase _histogram;
private ThreadAffinityUtil.Scope _affinityScope;

public ManualResetEventSlim Started { get; } = new();
public ManualResetEventSlim Completed { get; } = new();

public void OnStart()
{
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu, ThreadPriority.Highest);
Started.Set();
}

public void OnShutdown()
{
_affinityScope.Dispose();
}

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
Completed.Reset();
}

public void OnEvent(PerfEvent data, long sequence, bool endOfBatch)
{
if (data.Value == -1)
{
Completed.Set();
return;
}

var handlerTimestamp = Stopwatch.GetTimestamp();
var duration = handlerTimestamp - data.Value;

_histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration));
}
}
}
Loading

0 comments on commit 7d1bf53

Please sign in to comment.