From 7d1bf536ae38d86cfa6616dbeac67221e80a84e7 Mon Sep 17 00:00:00 2001 From: Olivier Coanet Date: Sun, 10 Nov 2024 17:24:18 +0100 Subject: [PATCH] PerfTests: Add option to configure CPU affinity --- src/Disruptor.PerfTests/ILatencyTest.cs | 6 +- src/Disruptor.PerfTests/IThroughputTest.cs | 6 +- ...encyTest_ManualResetValueTaskSourceCore.cs | 8 +- ...neWayAwaitLatencyTest_QueueUserWorkItem.cs | 9 +- ...ayAwaitLatencyTest_TaskCompletionSource.cs | 8 +- .../OneWay/OneWayChannelLatencyTest.cs | 8 +- .../OneWay/OneWaySequencedLatencyTest.cs | 8 +- ...ySequencedLatencyTest_AsyncBatchHandler.cs | 8 +- ...OneWaySequencedLatencyTest_BatchHandler.cs | 8 +- .../OneWaySequencedLatencyTest_Channel.cs | 8 +- ...eWaySequencedLatencyTest_ThreadAffinity.cs | 117 ++++++++++++++++++ ...encyTest_ManualResetValueTaskSourceCore.cs | 8 +- ...gPongAwaitLatencyTest_QueueUserWorkItem.cs | 8 +- ...ngAwaitLatencyTest_TaskCompletionSource.cs | 8 +- .../PingPong/PingPongChannelLatencyTest.cs | 8 +- .../PingPong/PingPongQueueLatencyTest.cs | 8 +- .../PingPong/PingPongSequencedLatencyTest.cs | 38 ++++-- ...gSequencedLatencyTest_AsyncBatchHandler.cs | 8 +- ...ngPongSequencedLatencyTest_BatchHandler.cs | 8 +- .../PingPongSequencedLatencyTest_Multi.cs | 8 +- .../PingPongSequencedLatencyTest_Value.cs | 8 +- .../LatencySessionContext.cs | 28 +++++ src/Disruptor.PerfTests/LatencyTestSession.cs | 98 ++++++--------- src/Disruptor.PerfTests/PerfTestFactory.cs | 57 +++++++++ src/Disruptor.PerfTests/Program.cs | 66 ++++++---- src/Disruptor.PerfTests/ProgramOptions.cs | 82 ++++++++++-- .../Support/AdditionEventHandler.cs | 15 ++- .../Support/ThreadAffinityUtil.cs | 13 +- ...eSequencedThroughputTest_ThreadAffinity.cs | 20 ++- .../ThroughputSessionContext.cs | 18 +-- .../ThroughputTestSession.cs | 93 ++++++-------- 31 files changed, 536 insertions(+), 258 deletions(-) create mode 100644 src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs create mode 100644 src/Disruptor.PerfTests/LatencySessionContext.cs create mode 100644 src/Disruptor.PerfTests/PerfTestFactory.cs diff --git a/src/Disruptor.PerfTests/ILatencyTest.cs b/src/Disruptor.PerfTests/ILatencyTest.cs index c7b99f97..901a7c4f 100644 --- a/src/Disruptor.PerfTests/ILatencyTest.cs +++ b/src/Disruptor.PerfTests/ILatencyTest.cs @@ -5,7 +5,7 @@ namespace Disruptor.PerfTests; public interface ILatencyTest { - void Run(Stopwatch stopwatch, HistogramBase histogram); - int RequiredProcessorCount { get; } -} \ No newline at end of file + + void Run(LatencySessionContext sessionContext); +} diff --git a/src/Disruptor.PerfTests/IThroughputTest.cs b/src/Disruptor.PerfTests/IThroughputTest.cs index 1f2a1349..40c34ac0 100644 --- a/src/Disruptor.PerfTests/IThroughputTest.cs +++ b/src/Disruptor.PerfTests/IThroughputTest.cs @@ -4,7 +4,7 @@ namespace Disruptor.PerfTests; public interface IThroughputTest { - long Run(ThroughputSessionContext sessionContext); - int RequiredProcessorCount { get; } -} \ No newline at end of file + + long Run(ThroughputSessionContext sessionContext); +} diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore.cs index a0864514..0a677e39 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore.cs @@ -24,20 +24,20 @@ public OneWayAwaitLatencyTest_ManualResetValueTaskSourceCore() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { _completed.Reset(); foreach (var waiter in _waiters) { - waiter.Initialize(histogram); + waiter.Initialize(sessionContext.Histogram); waiter.Start(); } var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); for (int i = 0; i < _iterations; i++) { @@ -64,7 +64,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) waiter.Task.Wait(); } - stopwatch.Stop(); + sessionContext.Stop(); } private class Waiter : IValueTaskSource diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_QueueUserWorkItem.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_QueueUserWorkItem.cs index 74e81783..2001f48d 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_QueueUserWorkItem.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_QueueUserWorkItem.cs @@ -21,19 +21,19 @@ public OneWayAwaitLatencyTest_QueueUserWorkItem() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { _completed.Reset(); foreach (var waiter in _waiters) { - waiter.Initialize(histogram); + waiter.Initialize(sessionContext.Histogram); } var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); for (var i = 0; i < _iterations; i++) { @@ -56,7 +56,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) _completed.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } private class Waiter : IThreadPoolWorkItem @@ -73,6 +73,7 @@ public Waiter(ManualResetEventSlim completed) public void Initialize(HistogramBase histogram) { _histogram = histogram; + _startTimestamp = 0; } public void Notify(long timestamp) diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_TaskCompletionSource.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_TaskCompletionSource.cs index 22072fe4..69af6153 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_TaskCompletionSource.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWayAwaitLatencyTest_TaskCompletionSource.cs @@ -22,20 +22,20 @@ public OneWayAwaitLatencyTest_TaskCompletionSource() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { _completed.Reset(); foreach (var waiter in _waiters) { - waiter.Initialize(histogram); + waiter.Initialize(sessionContext.Histogram); waiter.Start(); } var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); for (int i = 0; i < _iterations; i++) { @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) waiter.Task.Wait(); } - stopwatch.Stop(); + sessionContext.Stop(); } private class Waiter diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWayChannelLatencyTest.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWayChannelLatencyTest.cs index 6a7e7c94..5cfe20c5 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWayChannelLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWayChannelLatencyTest.cs @@ -31,23 +31,23 @@ public OneWayChannelLatencyTest() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var startSignal = new ManualResetEventSlim(); var producerTask = _producer.Start(startSignal); - var consumerTask = _consumer.Start(histogram); + var consumerTask = _consumer.Start(sessionContext.Histogram); _producer.Started.Wait(); _consumer.Started.Wait(); - stopwatch.Start(); + sessionContext.Start(); startSignal.Set(); producerTask.Wait(); consumerTask.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } private class Producer diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs index 779d5401..f1a65017 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs @@ -27,15 +27,15 @@ public OneWaySequencedLatencyTest() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _handler.Initialize(histogram); + _handler.Initialize(sessionContext.Histogram); _handler.Started.Wait(); var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); var ringBuffer = _disruptor.RingBuffer; @@ -61,7 +61,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) _handler.Completed.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public void Dispose() diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_AsyncBatchHandler.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_AsyncBatchHandler.cs index 290822a9..77bfbfdf 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_AsyncBatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_AsyncBatchHandler.cs @@ -28,15 +28,15 @@ public OneWaySequencedLatencyTest_AsyncBatchHandler() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _handler.Initialize(histogram); + _handler.Initialize(sessionContext.Histogram); _handler.Started.Wait(); var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); var ringBuffer = _disruptor.RingBuffer; @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) _handler.Completed.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public void Dispose() diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs index 819f0e44..7d3752f8 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs @@ -28,15 +28,15 @@ public OneWaySequencedLatencyTest_BatchHandler() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _handler.Initialize(histogram); + _handler.Initialize(sessionContext.Histogram); _handler.Started.Wait(); var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); var ringBuffer = _disruptor.RingBuffer; @@ -62,7 +62,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) _handler.Completed.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public void Dispose() diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_Channel.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_Channel.cs index d5ac2068..a928e8ad 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_Channel.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_Channel.cs @@ -31,9 +31,9 @@ public OneWaySequencedLatencyTest_Channel() public int RequiredProcessorCount => 2; - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _consumer.Initialize(histogram); + _consumer.Initialize(sessionContext.Histogram); var consumerTask = Task.Run(_consumer.Run); @@ -44,7 +44,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; - stopwatch.Start(); + sessionContext.Start(); for (int i = 0; i < _iterations; i++) { @@ -70,7 +70,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) _consumer.Completed.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); consumerTask.Wait(); } diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs new file mode 100644 index 00000000..87eea3cb --- /dev/null +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs @@ -0,0 +1,117 @@ +using System; +using System.Diagnostics; +using System.Threading; +using Disruptor.Dsl; +using Disruptor.PerfTests.Support; +using Disruptor.Util; +using HdrHistogram; + +namespace Disruptor.PerfTests.Latency.OneWay; + +public class OneWaySequencedLatencyTest_ThreadAffinity : ILatencyTest, IDisposable +{ + private readonly ProgramOptions _options; + private const int _bufferSize = 1024; + private const long _iterations = 100 * 1000 * 30; + private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10); + + private readonly Disruptor _disruptor; + private readonly Handler _handler; + + public OneWaySequencedLatencyTest_ThreadAffinity(ProgramOptions options) + { + _options = options; + _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy()); + _handler = new Handler(options.CpuSet[1]); + _disruptor.HandleEventsWith(_handler); + _disruptor.Start(); + } + + public int RequiredProcessorCount => 2; + + public void Run(LatencySessionContext sessionContext) + { + _handler.Initialize(sessionContext.Histogram); + _handler.Started.Wait(); + + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.CpuSet[0]); + + Thread.CurrentThread.Priority = ThreadPriority.Highest; + + var pause = _pause; + var next = Stopwatch.GetTimestamp() + pause; + + sessionContext.Start(); + + var ringBuffer = _disruptor.RingBuffer; + + for (int i = 0; i < _iterations; i++) + { + var now = Stopwatch.GetTimestamp(); + while (now < next) + { + Thread.Yield(); + now = Stopwatch.GetTimestamp(); + } + + var s = ringBuffer.Next(); + ringBuffer[s].Value = now; + ringBuffer.Publish(s); + + next = now + pause; + } + + var lastS = ringBuffer.Next(); + ringBuffer[lastS].Value = -1; + ringBuffer.Publish(lastS); + + _handler.Completed.Wait(); + + sessionContext.Stop(); + } + + public void Dispose() + { + _disruptor.Shutdown(); + } + + private class Handler(int cpu) : IEventHandler + { + private HistogramBase _histogram; + private ThreadAffinityUtil.Scope _affinityScope; + + public ManualResetEventSlim Started { get; } = new(); + public ManualResetEventSlim Completed { get; } = new(); + + public void OnStart() + { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu, ThreadPriority.Highest); + Started.Set(); + } + + public void OnShutdown() + { + _affinityScope.Dispose(); + } + + public void Initialize(HistogramBase histogram) + { + _histogram = histogram; + Completed.Reset(); + } + + public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) + { + if (data.Value == -1) + { + Completed.Set(); + return; + } + + var handlerTimestamp = Stopwatch.GetTimestamp(); + var duration = handlerTimestamp - data.Value; + + _histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration)); + } + } +} diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_ManualResetValueTaskSourceCore.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_ManualResetValueTaskSourceCore.cs index 14ae54f8..53769265 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_ManualResetValueTaskSourceCore.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_ManualResetValueTaskSourceCore.cs @@ -20,16 +20,16 @@ public PingPongAwaitLatencyTest_ManualResetValueTaskSourceCore() _pinger = new Pinger(); } - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _pinger.Reset(histogram); + _pinger.Reset(sessionContext.Histogram); - stopwatch.Start(); + sessionContext.Start(); var task = _pinger.Start(); task.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public int RequiredProcessorCount => 2; diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_QueueUserWorkItem.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_QueueUserWorkItem.cs index 97150e13..08aeb982 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_QueueUserWorkItem.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_QueueUserWorkItem.cs @@ -18,16 +18,16 @@ public PingPongAwaitLatencyTest_QueueUserWorkItem() _pinger = new Pinger(); } - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var signal = new ManualResetEvent(false); - _pinger.Reset(signal, histogram); + _pinger.Reset(signal, sessionContext.Histogram); - stopwatch.Start(); + sessionContext.Start(); _pinger.SendPing(); signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); #if AWAIT_LATENCY_THREAD_COUNT Console.WriteLine($"THREAD COUNT: {Ponger.ThreadCount}"); diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_TaskCompletionSource.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_TaskCompletionSource.cs index 00974a26..d9ae58e9 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_TaskCompletionSource.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongAwaitLatencyTest_TaskCompletionSource.cs @@ -19,16 +19,16 @@ public PingPongAwaitLatencyTest_TaskCompletionSource() _pinger = new Pinger(); } - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { - _pinger.Reset(histogram); + _pinger.Reset(sessionContext.Histogram); - stopwatch.Start(); + sessionContext.Start(); var task = _pinger.Start(); task.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public int RequiredProcessorCount => 2; diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongChannelLatencyTest.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongChannelLatencyTest.cs index f63b7978..edc8ae26 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongChannelLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongChannelLatencyTest.cs @@ -37,11 +37,11 @@ public PingPongChannelLatencyTest() _ponger = new QueuePonger(_pingChannel, _pongChannel); } - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var globalSignal = new CountdownEvent(3); - _pinger.Reset(globalSignal, histogram); + _pinger.Reset(globalSignal, sessionContext.Histogram); _ponger.Reset(globalSignal); var pingerTask = _pinger.Start(); @@ -50,10 +50,10 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); pingerTask.Wait(); pongerTask.Wait(); - stopwatch.Stop(); + sessionContext.Stop(); } public int RequiredProcessorCount => 2; diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongQueueLatencyTest.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongQueueLatencyTest.cs index a7fb49b5..81224b17 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongQueueLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongQueueLatencyTest.cs @@ -24,12 +24,12 @@ public PingPongQueueLatencyTest() _ponger = new QueuePonger(_pingQueue, _pongQueue); } - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var cancellationToken = new CancellationTokenSource(); var signal = new ManualResetEvent(false); var globalSignal = new CountdownEvent(3); - _pinger.Reset(globalSignal, signal, histogram); + _pinger.Reset(globalSignal, signal, sessionContext.Histogram); _ponger.Reset(globalSignal, cancellationToken.Token); _pinger.Start(); @@ -37,9 +37,9 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); cancellationToken.Cancel(); } diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs index 8906e87c..de64201f 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs @@ -23,16 +23,16 @@ public class PingPongSequencedLatencyTest : ILatencyTest private readonly Ponger _ponger; private readonly IEventProcessor _pongProcessor; - public PingPongSequencedLatencyTest() + public PingPongSequencedLatencyTest(ProgramOptions options) { - var pingBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); - var pongBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); + var pingBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); + var pongBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var pingBarrier = pingBuffer.NewBarrier(); var pongBarrier = pongBuffer.NewBarrier(); - _pinger = new Pinger(pingBuffer, _iterations, _pauseNanos); - _ponger = new Ponger(pongBuffer); + _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0), _iterations, _pauseNanos); + _ponger = new Ponger(pongBuffer, options.GetCustomCpu(1)); _pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger); _pongProcessor = EventProcessorFactory.Create(pingBuffer,pingBarrier, _ponger); @@ -45,11 +45,11 @@ public PingPongSequencedLatencyTest() /////////////////////////////////////////////////////////////////////////////////////////////// - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var globalSignal = new CountdownEvent(3); var signal = new ManualResetEvent(false); - _pinger.Reset(globalSignal, signal, histogram); + _pinger.Reset(globalSignal, signal, sessionContext.Histogram); _ponger.Reset(globalSignal); var processorTask1 = _pongProcessor.Start(); @@ -59,10 +59,10 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); // running here signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); _pingProcessor.Halt(); _pongProcessor.Halt(); @@ -72,6 +72,7 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) private class Pinger : IEventHandler { private readonly RingBuffer _buffer; + private readonly int? _cpu; private readonly long _maxEvents; private readonly long _pauseTimeNs; private readonly long _pauseTimeTicks; @@ -80,10 +81,12 @@ private class Pinger : IEventHandler private long _counter; private CountdownEvent _globalSignal; private ManualResetEvent _signal; + private ThreadAffinityUtil.Scope _affinityScope; - public Pinger(RingBuffer buffer, long maxEvents, long pauseTimeNs) + public Pinger(RingBuffer buffer, int? cpu, long maxEvents, long pauseTimeNs) { _buffer = buffer; + _cpu = cpu; _maxEvents = maxEvents; _pauseTimeNs = pauseTimeNs; _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); @@ -122,6 +125,9 @@ private void Send() public void OnStart() { + if (_cpu != null) + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); @@ -132,6 +138,8 @@ public void OnStart() public void OnShutdown() { + if (_cpu != null) + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram) @@ -147,11 +155,14 @@ public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, Histogra private class Ponger : IEventHandler { private readonly RingBuffer _buffer; + private readonly int? _cpu; private CountdownEvent _globalSignal; + private ThreadAffinityUtil.Scope _affinityScope; - public Ponger(RingBuffer buffer) + public Ponger(RingBuffer buffer, int? cpu) { _buffer = buffer; + _cpu = cpu; } public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) @@ -163,12 +174,17 @@ public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) public void OnStart() { + if (_cpu != null) + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); } public void OnShutdown() { + if (_cpu != null) + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal) diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs index 338e5d04..79724f7c 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs @@ -44,11 +44,11 @@ public PingPongSequencedLatencyTest_AsyncBatchHandler() /////////////////////////////////////////////////////////////////////////////////////////////// - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var startCountdown = new CountdownEvent(3); var completedSignal = new ManualResetEvent(false); - _pinger.Reset(startCountdown, completedSignal, histogram); + _pinger.Reset(startCountdown, completedSignal, sessionContext.Histogram); _ponger.Reset(startCountdown); var processorTask1 = _pongProcessor.Start(); @@ -58,11 +58,11 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) startCountdown.Signal(); startCountdown.Wait(); - stopwatch.Start(); + sessionContext.Start(); completedSignal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); _pingProcessor.Halt(); _pongProcessor.Halt(); diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs index 26e7240e..51e45745 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs @@ -45,11 +45,11 @@ public PingPongSequencedLatencyTest_BatchHandler() /////////////////////////////////////////////////////////////////////////////////////////////// - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var globalSignal = new CountdownEvent(3); var signal = new ManualResetEvent(false); - _pinger.Reset(globalSignal, signal, histogram); + _pinger.Reset(globalSignal, signal, sessionContext.Histogram); _ponger.Reset(globalSignal); var processorTask1 = _pongProcessor.Start(); @@ -59,10 +59,10 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); // running here signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); _pingProcessor.Halt(); _pongProcessor.Halt(); diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs index db5c68d2..f9cd5dfd 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs @@ -45,11 +45,11 @@ public PingPongSequencedLatencyTest_Multi() /////////////////////////////////////////////////////////////////////////////////////////////// - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var globalSignal = new CountdownEvent(3); var signal = new ManualResetEvent(false); - _pinger.Reset(globalSignal, signal, histogram); + _pinger.Reset(globalSignal, signal, sessionContext.Histogram); _ponger.Reset(globalSignal); var processorTask1 = _pongProcessor.Start(); @@ -59,10 +59,10 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); // running here signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); _pingProcessor.Halt(); _pongProcessor.Halt(); diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs index 71b6176c..a3012f55 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs @@ -45,11 +45,11 @@ public PingPongSequencedLatencyTest_Value() /////////////////////////////////////////////////////////////////////////////////////////////// - public void Run(Stopwatch stopwatch, HistogramBase histogram) + public void Run(LatencySessionContext sessionContext) { var globalSignal = new CountdownEvent(3); var signal = new ManualResetEvent(false); - _pinger.Reset(globalSignal, signal, histogram); + _pinger.Reset(globalSignal, signal, sessionContext.Histogram); _ponger.Reset(globalSignal); var processorTask1 = _pongProcessor.Start(); @@ -59,10 +59,10 @@ public void Run(Stopwatch stopwatch, HistogramBase histogram) globalSignal.Signal(); globalSignal.Wait(); - stopwatch.Start(); + sessionContext.Start(); // running here signal.WaitOne(); - stopwatch.Stop(); + sessionContext.Stop(); _pingProcessor.Halt(); _pongProcessor.Halt(); diff --git a/src/Disruptor.PerfTests/LatencySessionContext.cs b/src/Disruptor.PerfTests/LatencySessionContext.cs new file mode 100644 index 00000000..f353e5b7 --- /dev/null +++ b/src/Disruptor.PerfTests/LatencySessionContext.cs @@ -0,0 +1,28 @@ +using System; +using System.Diagnostics; +using HdrHistogram; + +namespace Disruptor.PerfTests; + +public class LatencySessionContext +{ + private readonly Stopwatch _stopwatch = new(); + public LongHistogram Histogram { get; } = new(10000000000L, 4); + public TimeSpan ElapsedTime => _stopwatch.Elapsed; + + public void Reset() + { + Histogram.Reset(); + _stopwatch.Reset(); + } + + public void Start() + { + _stopwatch.Start(); + } + + public void Stop() + { + _stopwatch.Stop(); + } +} diff --git a/src/Disruptor.PerfTests/LatencyTestSession.cs b/src/Disruptor.PerfTests/LatencyTestSession.cs index 08bd8cbe..f3cd0ee4 100644 --- a/src/Disruptor.PerfTests/LatencyTestSession.cs +++ b/src/Disruptor.PerfTests/LatencyTestSession.cs @@ -2,61 +2,48 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; -using System.Linq; using System.Text; -using HdrHistogram; namespace Disruptor.PerfTests; -public class LatencyTestSession +public class LatencyTestSession : IDisposable { - private readonly Type _perfTestType; + private readonly ILatencyTest _test; private readonly ProgramOptions _options; private readonly string _resultDirectoryPath; - private readonly int _runCount; - public LatencyTestSession(Type perfTestType, ProgramOptions options, string resultDirectoryPath) + public LatencyTestSession(ILatencyTest test, ProgramOptions options, string resultDirectoryPath) { - _perfTestType = perfTestType; + _test = test; _options = options; _resultDirectoryPath = resultDirectoryPath; - _runCount = options.RunCountForLatencyTest; } public void Execute() { - var test = (ILatencyTest)Activator.CreateInstance(_perfTestType); - - try - { - CheckProcessorsRequirements(test); - - var results = Run(test); - Report(test, results); - } - finally - { - if (test is IDisposable disposable) - disposable.Dispose(); - } + var results = Run(); + Report(results); } - public List Run(ILatencyTest test) + public List Run() { - Console.WriteLine($"Latency Test to run => {_perfTestType.FullName}, Runs => {_runCount}"); + Console.Write($"Latency Test to run => {_test.GetType().FullName}, Runs => {_options.RunCountForLatencyTest}"); + if (_options.HasCustomCpuSet) + Console.Write($", Cpus: [{string.Join(", ", _options.CpuSet)}]"); + + Console.WriteLine(); Console.WriteLine("Starting"); var results = new List(); + var context = new LatencySessionContext(); - - for (var i = 0; i < _runCount; i++) + for (var i = 0; i < _options.RunCountForLatencyTest; i++) { - var histogram = new LongHistogram(10000000000L, 4); - var stopwatch = new Stopwatch(); - GC.Collect(); GC.WaitForPendingFinalizers(); + context.Reset(); + var beforeGen0Count = GC.CollectionCount(0); var beforeGen1Count = GC.CollectionCount(1); var beforeGen2Count = GC.CollectionCount(2); @@ -64,13 +51,13 @@ public List Run(ILatencyTest test) LatencyTestSessionResult result; try { - test.Run(stopwatch, histogram); + _test.Run(context); var gen0Count = GC.CollectionCount(0) - beforeGen0Count; var gen1Count = GC.CollectionCount(1) - beforeGen1Count; var gen2Count = GC.CollectionCount(2) - beforeGen2Count; - result = new LatencyTestSessionResult(histogram, stopwatch.Elapsed, gen0Count, gen1Count, gen2Count); + result = new LatencyTestSessionResult(context.Histogram, context.ElapsedTime, gen0Count, gen1Count, gen2Count); } catch (Exception ex) { @@ -84,7 +71,7 @@ public List Run(ILatencyTest test) return results; } - public void Report(ILatencyTest test, List results) + public void Report(List results) { var computerSpecifications = new ComputerSpecifications(); @@ -97,31 +84,21 @@ public void Report(ILatencyTest test, List results) if (!_options.GenerateReport) return; - var path = Path.Combine(_resultDirectoryPath, $"{_perfTestType.Name}-{DateTime.Now:yyyy-MM-dd hh-mm-ss}.html"); + var path = Path.Combine(_resultDirectoryPath, $"{_test.GetType().Name}-{DateTime.Now:yyyy-MM-dd hh-mm-ss}.html"); - File.WriteAllText(path, BuildReport(test, results, computerSpecifications)); + File.WriteAllText(path, BuildReport(results, computerSpecifications)); var totalsPath = Path.Combine(_resultDirectoryPath, $"Totals-{DateTime.Now:yyyy-MM-dd}.csv"); foreach (var result in results) { - File.AppendAllText(totalsPath, FormattableString.Invariant($"{DateTime.Now:HH:mm:ss},{_perfTestType.Name},{result.P(50)},{result.P(90)},{result.P(99)}{Environment.NewLine}")); + File.AppendAllText(totalsPath, FormattableString.Invariant($"{DateTime.Now:HH:mm:ss},{_test.GetType().Name},{result.P(50)},{result.P(90)},{result.P(99)}{Environment.NewLine}")); } if (_options.OpenReport) Process.Start(path); } - private static void CheckProcessorsRequirements(ILatencyTest test) - { - var availableProcessors = Environment.ProcessorCount; - if (test.RequiredProcessorCount <= availableProcessors) - return; - - Console.WriteLine("*** Warning ***: your system has insufficient processors to execute the test efficiently. "); - Console.WriteLine($"Processors required = {test.RequiredProcessorCount}, available = {availableProcessors}"); - } - - private string BuildReport(ILatencyTest test, List results, ComputerSpecifications computerSpecifications) + private string BuildReport(List results, ComputerSpecifications computerSpecifications) { var sb = new StringBuilder(); sb.AppendLine("") @@ -150,19 +127,16 @@ private string BuildReport(ILatencyTest test, List res } sb.AppendLine("

Test configuration

") - .AppendLine(" Test: " + _perfTestType.FullName + "
") - .AppendLine(" Runs: " + _runCount + "
"); - if (test.RequiredProcessorCount > Environment.ProcessorCount) - sb.AppendLine(" Warning ! Test requires: " + test.RequiredProcessorCount + " processors but there is only " + Environment.ProcessorCount + " here
"); - - sb.AppendLine("

Detailed test results

"); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); + .AppendLine(" Test: " + _test.GetType().FullName + "
") + .AppendLine(" Runs: " + _options.RunCountForLatencyTest + "
") + .AppendLine("

Detailed test results

") + .AppendLine("
RunLatencies (hdr histogram output)Duration (ms)# GC (0-1-2)
") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" "); for (var i = 0; i < results.Count; i++) { @@ -174,4 +148,10 @@ private string BuildReport(ILatencyTest test, List res return sb.ToString(); } + + public void Dispose() + { + if (_test is IDisposable disposable) + disposable.Dispose(); + } } diff --git a/src/Disruptor.PerfTests/PerfTestFactory.cs b/src/Disruptor.PerfTests/PerfTestFactory.cs new file mode 100644 index 00000000..fed51be5 --- /dev/null +++ b/src/Disruptor.PerfTests/PerfTestFactory.cs @@ -0,0 +1,57 @@ +using System; + +namespace Disruptor.PerfTests; + +public static class PerfTestFactory +{ + public static bool TryCreateLatencyTest(Type testType, ProgramOptions options, out ILatencyTest latencyTest) + => TryCreateTest(testType, options, out latencyTest); + + public static bool TryCreateThroughputTest(Type testType, ProgramOptions options, out IThroughputTest throughputTest) + => TryCreateTest(testType, options, out throughputTest); + + private static bool TryCreateTest(Type testType, ProgramOptions options, out T test) + { + if (!typeof(T).IsAssignableFrom(testType)) + { + Console.Error.WriteLine($"Error: the test type is invalid, FullName: {testType.FullName}"); + test = default; + return false; + } + + if (testType.GetConstructor([typeof(ProgramOptions)]) is { } constructor) + { + test = (T)constructor.Invoke([options]); + return true; + } + + test = (T)Activator.CreateInstance(testType); + return test != null; + } + + public static bool CheckProcessorsRequirements(this ILatencyTest latencyTest, ProgramOptions programOptions) + => CheckProcessorsRequirements(programOptions, latencyTest.RequiredProcessorCount); + + public static bool CheckProcessorsRequirements(this IThroughputTest latencyTest, ProgramOptions programOptions) + => CheckProcessorsRequirements(programOptions, latencyTest.RequiredProcessorCount); + + private static bool CheckProcessorsRequirements(ProgramOptions programOptions, int requiredProcessorCount) + { + var availableProcessors = Environment.ProcessorCount; + if (requiredProcessorCount > availableProcessors) + { + Console.Error.WriteLine("Error: your system has insufficient processors to execute the test efficiently."); + Console.Error.WriteLine($"Processors required = {requiredProcessorCount}, available = {availableProcessors}"); + return false; + } + + if (requiredProcessorCount > programOptions.CpuSet.Length) + { + Console.Error.WriteLine("Error: the CPU set is two small to execute the test efficiently."); + Console.Error.WriteLine($"CPU count required = {requiredProcessorCount}, CPU set length = {programOptions.CpuSet.Length}"); + return false; + } + + return true; + } +} diff --git a/src/Disruptor.PerfTests/Program.cs b/src/Disruptor.PerfTests/Program.cs index d65d619c..2631f657 100644 --- a/src/Disruptor.PerfTests/Program.cs +++ b/src/Disruptor.PerfTests/Program.cs @@ -1,6 +1,5 @@ using System; using System.IO; -using System.Text.RegularExpressions; namespace Disruptor.PerfTests; @@ -14,7 +13,7 @@ public static void Main(string[] args) return; } - if (!ValidateOptions(options)) + if (!options.Validate()) return; var selector = new PerfTestTypeSelector(options); @@ -26,23 +25,6 @@ public static void Main(string[] args) } } - private static bool ValidateOptions(ProgramOptions options) - { - if (options.Target != null && !Regex.IsMatch(options.Target, @"\\*\w+(?:.\w+)*\\*")) - { - Console.WriteLine($"Invalid target: [{options.Target}]"); - return false; - } - - if (options.From != null && !Regex.IsMatch(options.From, @"\w+(?:.\w+)*")) - { - Console.WriteLine($"Invalid from: [{options.From}]"); - return false; - } - - return true; - } - private static void RunTestForType(Type perfTestType, ProgramOptions options) { var outputDirectoryPath = Path.Combine(AppContext.BaseDirectory, "results"); @@ -52,19 +34,57 @@ private static void RunTestForType(Type perfTestType, ProgramOptions options) var isThroughputTest = typeof(IThroughputTest).IsAssignableFrom(perfTestType); if (isThroughputTest) { - var session = new ThroughputTestSession(perfTestType, options, outputDirectoryPath); - session.Execute(); + if (TryCreateTest(perfTestType, options, out var test) && ValidateTest(test.RequiredProcessorCount, options)) + { + using var session = new ThroughputTestSession(test, options, outputDirectoryPath); + session.Execute(); + } return; } var isLatencyTest = typeof(ILatencyTest).IsAssignableFrom(perfTestType); if (isLatencyTest) { - var session = new LatencyTestSession(perfTestType, options, outputDirectoryPath); - session.Execute(); + if (TryCreateTest(perfTestType, options, out var test) && ValidateTest(test.RequiredProcessorCount, options)) + { + var session = new LatencyTestSession(test, options, outputDirectoryPath); + session.Execute(); + } return; } throw new NotSupportedException($"Invalid test type: {perfTestType.Name}"); } + + private static bool TryCreateTest(Type testType, ProgramOptions options, out T test) + { + if (testType.GetConstructor([typeof(ProgramOptions)]) is { } constructor) + { + test = (T)constructor.Invoke([options]); + return true; + } + + test = (T)Activator.CreateInstance(testType); + return test != null; + } + + private static bool ValidateTest(int requiredProcessorCount, ProgramOptions options) + { + var availableProcessors = Environment.ProcessorCount; + if (requiredProcessorCount > availableProcessors) + { + Console.Error.WriteLine("Error: your system has insufficient CPUs to execute the test efficiently."); + Console.Error.WriteLine($"CPU count required = {requiredProcessorCount}, available = {availableProcessors}"); + return false; + } + + if (requiredProcessorCount > options.CpuSet.Length) + { + Console.Error.WriteLine("Error: the CPU set is two small to execute the test efficiently."); + Console.Error.WriteLine($"CPU count required = {requiredProcessorCount}, CPU set length = {options.CpuSet.Length}"); + return false; + } + + return true; + } } diff --git a/src/Disruptor.PerfTests/ProgramOptions.cs b/src/Disruptor.PerfTests/ProgramOptions.cs index d4c98aaa..58d82839 100644 --- a/src/Disruptor.PerfTests/ProgramOptions.cs +++ b/src/Disruptor.PerfTests/ProgramOptions.cs @@ -1,21 +1,43 @@ using System; using System.Globalization; +using System.Linq; +using System.Text.RegularExpressions; namespace Disruptor.PerfTests; public class ProgramOptions { - public int? RunCount { get; set; } - public string Target { get; set; } - public bool PrintComputerSpecifications { get; set; } = true; - public bool GenerateReport { get; set; } = true; - public bool OpenReport { get; set; } - public string From { get; set; } - public bool IncludeExternal { get; set; } - public bool IncludeLatency { get; set; } = true; - public bool IncludeThroughput { get; set; } = true; - public int RunCountForLatencyTest => RunCount ?? 3; - public int RunCountForThroughputTest => RunCount ?? 7; + public static int DefaultRunCountForLatencyTest { get; } = 3; + public static int DefaultRunCountForThroughputTest { get; } = 7; + public static int[] DefaultCpuSet { get; } = Enumerable.Range(0, Environment.ProcessorCount).ToArray(); + + private int[] _cpuSet = DefaultCpuSet; + + public int? RunCount { get; private set; } + public string Target { get; private set; } + public bool PrintComputerSpecifications { get; private set; } = true; + public bool GenerateReport { get; private set; } = true; + public bool OpenReport { get; private set; } + public string From { get; private set; } + public bool IncludeExternal { get; private set; } + public bool IncludeLatency { get; private set; } = true; + public bool IncludeThroughput { get; private set; } = true; + + public int[] CpuSet + { + get => _cpuSet; + private set + { + _cpuSet = value ?? DefaultCpuSet; + HasCustomCpuSet = value != null; + } + } + + public bool HasCustomCpuSet { get; private set; } + + public int RunCountForLatencyTest => RunCount ?? DefaultRunCountForLatencyTest; + public int RunCountForThroughputTest => RunCount ?? DefaultRunCountForThroughputTest; + public int? GetCustomCpu(int index) => HasCustomCpuSet ? CpuSet[index] : null; public static bool TryParse(string[] args, out ProgramOptions options) { @@ -115,6 +137,16 @@ public static bool TryParse(string[] args, out ProgramOptions options) continue; } + if (arg.Equals("--cpus", StringComparison.OrdinalIgnoreCase)) + { + if (index + 1 == args.Length || Regex.Match(args[index + 1], @"(?\d+)(?:,(?\d+))*") is not { Success: true} match) + return false; + + options.CpuSet = [int.Parse(match.Groups["cpu0"].Value), ..match.Groups["cpun"].Captures.Select(x => int.Parse(x.Value))]; + index++; + continue; + } + return false; } @@ -133,11 +165,37 @@ public static void PrintUsage() Console.WriteLine($" --report Generates an HTML report file at the end of the test. Default is {options.GenerateReport}."); Console.WriteLine($" --open-report Opens the HTML report file at the end of the test. Default is {options.OpenReport}."); Console.WriteLine($" --print-spec Prints computer specifications. Default is {options.PrintComputerSpecifications}."); - Console.WriteLine($" --runs Number of runs per test. Default is {options.RunCountForThroughputTest} for throughput tests and {options.RunCountForLatencyTest} for latency tests."); + Console.WriteLine($" --runs Number of runs per test. Default is {DefaultRunCountForThroughputTest} for throughput tests and {DefaultRunCountForLatencyTest} for latency tests."); Console.WriteLine($" --from The first test type name to run when running all tests."); Console.WriteLine($" --include-external Includes external tests. Default is {options.IncludeExternal}."); Console.WriteLine($" --include-latency Includes latency tests. Default is {options.IncludeLatency}."); Console.WriteLine($" --include-throughput Includes throughput tests. Default is {options.IncludeThroughput}."); + Console.WriteLine($" --cpus The comma-separated list of CPUs to use for CPU affinity (not supported in all tests)."); Console.WriteLine(); } + + public bool Validate() + { + if (Target != null && !Regex.IsMatch(Target, @"\\*\w+(?:.\w+)*\\*")) + { + Console.WriteLine($"Invalid target: [{Target}]"); + return false; + } + + if (From != null && !Regex.IsMatch(From, @"\w+(?:.\w+)*")) + { + Console.WriteLine($"Invalid from: [{From}]"); + return false; + } + + if (HasCustomCpuSet && CpuSet.Except(DefaultCpuSet).Any()) + { + Console.WriteLine($"Invalid cpus: [{string.Join(", ", CpuSet)}], available CPU range: [0-{Environment.ProcessorCount - 1}]"); + return false; + } + + return true; + } + + } diff --git a/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs b/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs index 4100c37d..cbd47b72 100644 --- a/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs +++ b/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs @@ -3,16 +3,29 @@ namespace Disruptor.PerfTests.Support; -public class AdditionEventHandler : IEventHandler, IValueEventHandler +public class AdditionEventHandler(int? cpu = null) : IEventHandler, IValueEventHandler { private PaddedLong _value; private PaddedLong _batchesProcessed; private long _latchSequence; private readonly ManualResetEvent _latch = new(false); + private ThreadAffinityUtil.Scope _affinityScope; public long Value => _value.Value; public long BatchesProcessed => _batchesProcessed.Value; + public void OnStart() + { + if (cpu != null) + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu.Value, ThreadPriority.Highest); + } + + public void OnShutdown() + { + if (cpu != null) + _affinityScope.Dispose(); + } + public void WaitForSequence() { _latch.WaitOne(); diff --git a/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs b/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs index 80735ffa..cb66336e 100644 --- a/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs +++ b/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs @@ -14,14 +14,19 @@ public class ThreadAffinityUtil [DllImport("libc.so.6")] private static extern int sched_setaffinity(int pid, IntPtr cpusetsize, ref ulong cpuset); - public static Scope SetThreadAffinity(int processorIndex) + public static Scope SetThreadAffinity(int processorIndex, ThreadPriority? threadPriority = null) { + var previousThreadPriority = Thread.CurrentThread.Priority; + Thread.BeginThreadAffinity(); var affinity = (1ul << processorIndex); SetProcessorAffinity(affinity); - return new Scope(); + if (threadPriority != null) + Thread.CurrentThread.Priority = threadPriority.Value; + + return new Scope(previousThreadPriority); } private static void RemoveThreadAffinity() @@ -78,11 +83,13 @@ private static ProcessThread GetCurrentProcessThread() throw new InvalidOperationException($"Could not retrieve native thread with ID: {threadId}, current managed thread ID was {threadId}"); } - public readonly ref struct Scope + public readonly struct Scope(ThreadPriority initialThreadPriority) : IDisposable { public void Dispose() { RemoveThreadAffinity(); + + Thread.CurrentThread.Priority = initialThreadPriority; } } } diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs index 33d084f0..4f401d71 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs @@ -38,14 +38,16 @@ public class OneToOneSequencedThroughputTest_ThreadAffinity : IThroughputTest private const int _bufferSize = 1024 * 64; private const long _iterations = 1000L * 1000L * 100L; + private readonly ProgramOptions _options; private readonly RingBuffer _ringBuffer; private readonly AdditionEventHandler _eventHandler; private readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest_ThreadAffinity() + public OneToOneSequencedThroughputTest_ThreadAffinity(ProgramOptions options) { - _eventHandler = new AdditionEventHandler(); + _options = options; + _eventHandler = new AdditionEventHandler(options.CpuSet[1]); _ringBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _eventHandler); @@ -60,19 +62,11 @@ public long Run(ThroughputSessionContext sessionContext) long expectedCount = _eventProcessor.Sequence.Value + _iterations; _eventHandler.Reset(expectedCount); - - var processorTask = PerfTestUtil.StartLongRunning(() => - { - using var _ = ThreadAffinityUtil.SetThreadAffinity(0); - - Thread.CurrentThread.Priority = ThreadPriority.Highest; - - _eventProcessor.Run(); - }); + var processorTask = _eventProcessor.Start(); _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); - using var _ = ThreadAffinityUtil.SetThreadAffinity(1); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.CpuSet[0]); Thread.CurrentThread.Priority = ThreadPriority.Highest; @@ -99,4 +93,4 @@ public long Run(ThroughputSessionContext sessionContext) return _iterations; } -} \ No newline at end of file +} diff --git a/src/Disruptor.PerfTests/ThroughputSessionContext.cs b/src/Disruptor.PerfTests/ThroughputSessionContext.cs index d8947ef3..29da03fd 100644 --- a/src/Disruptor.PerfTests/ThroughputSessionContext.cs +++ b/src/Disruptor.PerfTests/ThroughputSessionContext.cs @@ -1,28 +1,32 @@ +using System; using System.Diagnostics; namespace Disruptor.PerfTests; public class ThroughputSessionContext { - public readonly Stopwatch Stopwatch = new(); - public double? BatchPercent; - public double? AverageBatchSize; + private readonly Stopwatch _stopwatch = new(); + + public double? BatchPercent { get; private set; } + public double? AverageBatchSize { get; private set; } + + public TimeSpan ElapsedTime => _stopwatch.Elapsed; public void Reset() { - Stopwatch.Reset(); + _stopwatch.Reset(); BatchPercent = null; AverageBatchSize = null; } public void Start() { - Stopwatch.Start(); + _stopwatch.Start(); } public void Stop() { - Stopwatch.Stop(); + _stopwatch.Stop(); } public void SetBatchData(long batchesProcessedCount, long iterations) @@ -30,4 +34,4 @@ public void SetBatchData(long batchesProcessedCount, long iterations) AverageBatchSize = (double)iterations / batchesProcessedCount; BatchPercent = 1 - (double)batchesProcessedCount / iterations; } -} \ No newline at end of file +} diff --git a/src/Disruptor.PerfTests/ThroughputTestSession.cs b/src/Disruptor.PerfTests/ThroughputTestSession.cs index cabe85e4..df075d43 100644 --- a/src/Disruptor.PerfTests/ThroughputTestSession.cs +++ b/src/Disruptor.PerfTests/ThroughputTestSession.cs @@ -8,48 +8,38 @@ namespace Disruptor.PerfTests; -public class ThroughputTestSession +public class ThroughputTestSession : IDisposable { - private readonly Type _perfTestType; + private readonly IThroughputTest _test; private readonly ProgramOptions _options; private readonly string _resultDirectoryPath; - private readonly int _runCount; - public ThroughputTestSession(Type perfTestType, ProgramOptions options, string resultDirectoryPath) + public ThroughputTestSession(IThroughputTest test, ProgramOptions options, string resultDirectoryPath) { - _perfTestType = perfTestType; + _test = test; _options = options; _resultDirectoryPath = resultDirectoryPath; - _runCount = options.RunCountForThroughputTest; } public void Execute() { - var test = (IThroughputTest)Activator.CreateInstance(_perfTestType); - - try - { - CheckProcessorsRequirements(test); - - var results = Run(test); - Report(test, results); - } - finally - { - if (test is IDisposable disposable) - disposable.Dispose(); - } + var results = Run(); + Report(results); } - private List Run(IThroughputTest test) + private List Run() { - Console.WriteLine($"Throughput Test to run => {_perfTestType.FullName}, Runs => {_runCount}"); + Console.Write($"Throughput Test to run => {_test.GetType().FullName}, Runs => {_options.RunCountForThroughputTest}"); + if (_options.HasCustomCpuSet) + Console.Write($", Cpus: [{string.Join(", ", _options.CpuSet)}]"); + + Console.WriteLine(); Console.WriteLine("Starting"); var results = new List(); var context = new ThroughputSessionContext(); - for (var i = 0; i < _runCount; i++) + for (var i = 0; i < _options.RunCountForThroughputTest; i++) { GC.Collect(); GC.WaitForPendingFinalizers(); @@ -63,13 +53,13 @@ private List Run(IThroughputTest test) ThroughputTestSessionResult result; try { - var totalOperationsInRun = test.Run(context); + var totalOperationsInRun = _test.Run(context); var gen0Count = GC.CollectionCount(0) - beforeGen0Count; var gen1Count = GC.CollectionCount(1) - beforeGen1Count; var gen2Count = GC.CollectionCount(2) - beforeGen2Count; - result = new ThroughputTestSessionResult(totalOperationsInRun, context.Stopwatch.Elapsed, gen0Count, gen1Count, gen2Count, context); + result = new ThroughputTestSessionResult(totalOperationsInRun, context.ElapsedTime, gen0Count, gen1Count, gen2Count, context); } catch (Exception ex) { @@ -83,7 +73,7 @@ private List Run(IThroughputTest test) return results; } - private void Report(IThroughputTest test, List results) + private void Report(List results) { var computerSpecifications = new ComputerSpecifications(); @@ -96,28 +86,18 @@ private void Report(IThroughputTest test, List resu if (!_options.GenerateReport) return; - var path = Path.Combine(_resultDirectoryPath, $"{_perfTestType.Name}-{DateTime.UtcNow:yyyy-MM-dd hh-mm-ss}.html"); - File.WriteAllText(path, BuildReport(test, results, computerSpecifications)); + var path = Path.Combine(_resultDirectoryPath, $"{_test.GetType().Name}-{DateTime.UtcNow:yyyy-MM-dd hh-mm-ss}.html"); + File.WriteAllText(path, BuildReport(results, computerSpecifications)); var totalsPath = Path.Combine(_resultDirectoryPath, $"Totals-{DateTime.Now:yyyy-MM-dd}.csv"); var average = results.Average(x => x.TotalOperationsInRun / x.Duration.TotalSeconds); - File.AppendAllText(totalsPath, FormattableString.Invariant($"{DateTime.Now:HH:mm:ss},{_perfTestType.Name},{average}{Environment.NewLine}")); + File.AppendAllText(totalsPath, FormattableString.Invariant($"{DateTime.Now:HH:mm:ss},{_test.GetType().Name},{average}{Environment.NewLine}")); if (_options.OpenReport) Process.Start(path); } - private void CheckProcessorsRequirements(IThroughputTest test) - { - var availableProcessors = Environment.ProcessorCount; - if (test.RequiredProcessorCount <= availableProcessors) - return; - - Console.WriteLine("*** Warning ***: your system has insufficient processors to execute the test efficiently. "); - Console.WriteLine($"Processors required = {test.RequiredProcessorCount}, available = {availableProcessors}"); - } - - private string BuildReport(IThroughputTest test, List results, ComputerSpecifications computerSpecifications) + private string BuildReport(List results, ComputerSpecifications computerSpecifications) { var sb = new StringBuilder(); sb.AppendLine("") @@ -143,21 +123,18 @@ private string BuildReport(IThroughputTest test, ListTest configuration") - .AppendLine(" Test: " + _perfTestType.FullName + "
") - .AppendLine(" Runs: " + _runCount + "
"); - if (test.RequiredProcessorCount > Environment.ProcessorCount) - sb.AppendLine(" Warning ! Test requires: " + test.RequiredProcessorCount + " processors but there is only " + Environment.ProcessorCount + " here
"); - - sb.AppendLine("

Detailed test results

"); - sb.AppendLine("
RunLatencies (hdr histogram output)Duration (ms)# GC (0-1-2)
"); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); - sb.AppendLine(" "); + .AppendLine(" Test: " + _test.GetType().FullName + "
") + .AppendLine(" Runs: " + _options.RunCountForThroughputTest + "
") + .AppendLine("

Detailed test results

") + .AppendLine("
RunOperations per secondDuration (ms)# GC (0-1-2)Batch %Average Batch Size"); - sb.AppendLine("
") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" ") + .AppendLine(" "); for (var i = 0; i < results.Count; i++) { @@ -169,4 +146,10 @@ private string BuildReport(IThroughputTest test, List
RunOperations per secondDuration (ms)# GC (0-1-2)Batch %Average Batch Size") + .AppendLine("