From 5c29270ee56157f422b7985bd7e6c93b389140a2 Mon Sep 17 00:00:00 2001 From: Olivier Coanet Date: Sat, 30 Nov 2024 22:55:51 +0100 Subject: [PATCH] Make CPU affinity configurable in perf tests --- src/Disruptor.Benchmarks/Program.cs | 2 - .../OneWay/OneWaySequencedLatencyTest.cs | 26 +++- ...OneWaySequencedLatencyTest_BatchHandler.cs | 24 +++- ...eWaySequencedLatencyTest_ThreadAffinity.cs | 117 ------------------ .../PingPong/PingPongSequencedLatencyTest.cs | 30 ++--- ...gSequencedLatencyTest_AsyncBatchHandler.cs | 10 +- ...ngPongSequencedLatencyTest_BatchHandler.cs | 26 ++-- .../PingPongSequencedLatencyTest_Multi.cs | 36 +++--- .../PingPongSequencedLatencyTest_Value.cs | 32 +++-- .../Support/AdditionEventHandler.cs | 8 +- .../Support/LongArrayEventHandler.cs | 13 +- .../Support/ThreadAffinityScope.cs | 16 +++ .../Support/ThreadAffinityUtil.cs | 39 +++--- .../OneToOneSequencedThroughputTest.cs | 11 +- ...eSequencedThroughputTest_BatchPublisher.cs | 13 +- ...neToOneSequencedThroughputTest_Blocking.cs | 9 +- ...eToOneSequencedThroughputTest_LongArray.cs | 14 ++- .../OneToOneSequencedThroughputTest_Multi.cs | 11 +- ...eSequencedThroughputTest_ThreadAffinity.cs | 96 -------------- 19 files changed, 208 insertions(+), 325 deletions(-) delete mode 100644 src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs create mode 100644 src/Disruptor.PerfTests/Support/ThreadAffinityScope.cs delete mode 100644 src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs diff --git a/src/Disruptor.Benchmarks/Program.cs b/src/Disruptor.Benchmarks/Program.cs index eda083e..bf96840 100644 --- a/src/Disruptor.Benchmarks/Program.cs +++ b/src/Disruptor.Benchmarks/Program.cs @@ -16,8 +16,6 @@ public static void Main(string[] args) config: DefaultConfig.Instance.WithOption(ConfigOptions.JoinSummary, true), args: args ); - - Console.ReadLine(); } public static async Task MainTests() diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs index f1a6501..5f094fd 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest.cs @@ -12,15 +12,17 @@ public class OneWaySequencedLatencyTest : ILatencyTest, IDisposable { private const int _bufferSize = 1024; private const long _iterations = 100 * 1000 * 30; - private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10); + private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10); + private readonly ProgramOptions _options; private readonly Disruptor _disruptor; private readonly Handler _handler; - public OneWaySequencedLatencyTest() + public OneWaySequencedLatencyTest(ProgramOptions options) { - _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new BlockingWaitStrategy()); - _handler = new Handler(); + _options = options; + _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy()); + _handler = new Handler(options.GetCustomCpu(1)); _disruptor.HandleEventsWith(_handler); _disruptor.Start(); } @@ -32,6 +34,8 @@ public void Run(LatencySessionContext sessionContext) _handler.Initialize(sessionContext.Histogram); _handler.Started.Wait(); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; @@ -71,16 +75,30 @@ public void Dispose() private class Handler : IEventHandler { + private readonly int? _cpu; private HistogramBase _histogram; + private ThreadAffinityScope _affinityScope; + + public Handler(int? cpu) + { + _cpu = cpu; + } public ManualResetEventSlim Started { get; } = new(); public ManualResetEventSlim Completed { get; } = new(); public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + Started.Set(); } + public void OnShutdown() + { + _affinityScope.Dispose(); + } + public void Initialize(HistogramBase histogram) { _histogram = histogram; diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs index 7d3752f..850ee48 100644 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_BatchHandler.cs @@ -14,14 +14,15 @@ public class OneWaySequencedLatencyTest_BatchHandler : ILatencyTest, IDisposable private const long _iterations = 100 * 1000 * 30; private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10); + private readonly ProgramOptions _options; private readonly Disruptor _disruptor; private readonly Handler _handler; - public OneWaySequencedLatencyTest_BatchHandler() + public OneWaySequencedLatencyTest_BatchHandler(ProgramOptions options) { - _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new BlockingWaitStrategy()); - // _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy()); - _handler = new Handler(); + _options = options; + _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy()); + _handler = new Handler(options.GetCustomCpu(1)); _disruptor.HandleEventsWith(_handler); _disruptor.Start(); } @@ -33,6 +34,8 @@ public void Run(LatencySessionContext sessionContext) _handler.Initialize(sessionContext.Histogram); _handler.Started.Wait(); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + var pause = _pause; var next = Stopwatch.GetTimestamp() + pause; @@ -72,16 +75,29 @@ public void Dispose() private class Handler : IBatchEventHandler { + private readonly int? _cpu; private HistogramBase _histogram; + private ThreadAffinityScope _affinityScope; + + public Handler(int? cpu) + { + _cpu = cpu; + } public ManualResetEventSlim Started { get; } = new(); public ManualResetEventSlim Completed { get; } = new(); public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); Started.Set(); } + public void OnShutdown() + { + _affinityScope.Dispose(); + } + public void Initialize(HistogramBase histogram) { _histogram = histogram; diff --git a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs b/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs deleted file mode 100644 index 87eea3c..0000000 --- a/src/Disruptor.PerfTests/Latency/OneWay/OneWaySequencedLatencyTest_ThreadAffinity.cs +++ /dev/null @@ -1,117 +0,0 @@ -using System; -using System.Diagnostics; -using System.Threading; -using Disruptor.Dsl; -using Disruptor.PerfTests.Support; -using Disruptor.Util; -using HdrHistogram; - -namespace Disruptor.PerfTests.Latency.OneWay; - -public class OneWaySequencedLatencyTest_ThreadAffinity : ILatencyTest, IDisposable -{ - private readonly ProgramOptions _options; - private const int _bufferSize = 1024; - private const long _iterations = 100 * 1000 * 30; - private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10); - - private readonly Disruptor _disruptor; - private readonly Handler _handler; - - public OneWaySequencedLatencyTest_ThreadAffinity(ProgramOptions options) - { - _options = options; - _disruptor = new Disruptor(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy()); - _handler = new Handler(options.CpuSet[1]); - _disruptor.HandleEventsWith(_handler); - _disruptor.Start(); - } - - public int RequiredProcessorCount => 2; - - public void Run(LatencySessionContext sessionContext) - { - _handler.Initialize(sessionContext.Histogram); - _handler.Started.Wait(); - - using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.CpuSet[0]); - - Thread.CurrentThread.Priority = ThreadPriority.Highest; - - var pause = _pause; - var next = Stopwatch.GetTimestamp() + pause; - - sessionContext.Start(); - - var ringBuffer = _disruptor.RingBuffer; - - for (int i = 0; i < _iterations; i++) - { - var now = Stopwatch.GetTimestamp(); - while (now < next) - { - Thread.Yield(); - now = Stopwatch.GetTimestamp(); - } - - var s = ringBuffer.Next(); - ringBuffer[s].Value = now; - ringBuffer.Publish(s); - - next = now + pause; - } - - var lastS = ringBuffer.Next(); - ringBuffer[lastS].Value = -1; - ringBuffer.Publish(lastS); - - _handler.Completed.Wait(); - - sessionContext.Stop(); - } - - public void Dispose() - { - _disruptor.Shutdown(); - } - - private class Handler(int cpu) : IEventHandler - { - private HistogramBase _histogram; - private ThreadAffinityUtil.Scope _affinityScope; - - public ManualResetEventSlim Started { get; } = new(); - public ManualResetEventSlim Completed { get; } = new(); - - public void OnStart() - { - _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu, ThreadPriority.Highest); - Started.Set(); - } - - public void OnShutdown() - { - _affinityScope.Dispose(); - } - - public void Initialize(HistogramBase histogram) - { - _histogram = histogram; - Completed.Reset(); - } - - public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) - { - if (data.Value == -1) - { - Completed.Set(); - return; - } - - var handlerTimestamp = Stopwatch.GetTimestamp(); - var duration = handlerTimestamp - data.Value; - - _histogram.RecordValue(StopwatchUtil.ToNanoseconds(duration)); - } - } -} diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs index de64201..a3aff79 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest.cs @@ -31,7 +31,7 @@ public PingPongSequencedLatencyTest(ProgramOptions options) var pingBarrier = pingBuffer.NewBarrier(); var pongBarrier = pongBuffer.NewBarrier(); - _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0), _iterations, _pauseNanos); + _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0)); _ponger = new Ponger(pongBuffer, options.GetCustomCpu(1)); _pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger); @@ -73,32 +73,28 @@ private class Pinger : IEventHandler { private readonly RingBuffer _buffer; private readonly int? _cpu; - private readonly long _maxEvents; - private readonly long _pauseTimeNs; private readonly long _pauseTimeTicks; private HistogramBase _histogram; private long _t0; private long _counter; private CountdownEvent _globalSignal; private ManualResetEvent _signal; - private ThreadAffinityUtil.Scope _affinityScope; + private ThreadAffinityScope _affinityScope; - public Pinger(RingBuffer buffer, int? cpu, long maxEvents, long pauseTimeNs) + public Pinger(RingBuffer buffer, int? cpu) { _buffer = buffer; _cpu = cpu; - _maxEvents = maxEvents; - _pauseTimeNs = pauseTimeNs; - _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); + _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos); } public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) { var t1 = Stopwatch.GetTimestamp(); - _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs); + _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos); - if (data.Value < _maxEvents) + if (data.Value < _iterations) { while (_pauseTimeTicks > (Stopwatch.GetTimestamp() - t1)) { @@ -125,8 +121,7 @@ private void Send() public void OnStart() { - if (_cpu != null) - _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest); + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); _globalSignal.Signal(); _globalSignal.Wait(); @@ -138,8 +133,7 @@ public void OnStart() public void OnShutdown() { - if (_cpu != null) - _affinityScope.Dispose(); + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram) @@ -157,7 +151,7 @@ private class Ponger : IEventHandler private readonly RingBuffer _buffer; private readonly int? _cpu; private CountdownEvent _globalSignal; - private ThreadAffinityUtil.Scope _affinityScope; + private ThreadAffinityScope _affinityScope; public Ponger(RingBuffer buffer, int? cpu) { @@ -174,8 +168,7 @@ public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) public void OnStart() { - if (_cpu != null) - _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest); + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); _globalSignal.Signal(); _globalSignal.Wait(); @@ -183,8 +176,7 @@ public void OnStart() public void OnShutdown() { - if (_cpu != null) - _affinityScope.Dispose(); + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal) diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs index 79724f7..68dbfc0 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_AsyncBatchHandler.cs @@ -30,7 +30,7 @@ public PingPongSequencedLatencyTest_AsyncBatchHandler() var pingBarrier = pingBuffer.NewAsyncBarrier(); var pongBarrier = pongBuffer.NewAsyncBarrier(); - _pinger = new Pinger(pongBuffer, _pauseNanos); + _pinger = new Pinger(pongBuffer); _ponger = new Ponger(pingBuffer); _pingProcessor = EventProcessorFactory.Create(pingBuffer, pingBarrier, _pinger); @@ -77,7 +77,6 @@ private class PingPongEvent private class Pinger : IAsyncBatchEventHandler { private readonly RingBuffer _buffer; - private readonly long _pauseTimeNs; private readonly long _pauseTimeTicks; private HistogramBase _histogram; private CountdownEvent _startCountdown; @@ -85,11 +84,10 @@ private class Pinger : IAsyncBatchEventHandler private long _t0; private long _counter; - public Pinger(RingBuffer buffer, long pauseTimeNs) + public Pinger(RingBuffer buffer) { _buffer = buffer; - _pauseTimeNs = pauseTimeNs; - _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); + _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos); } public ValueTask OnBatch(EventBatch batch, long sequence) @@ -97,7 +95,7 @@ public ValueTask OnBatch(EventBatch batch, long sequence) foreach (var data in batch) { var t1 = Stopwatch.GetTimestamp(); - _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs); + _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos); if (data.Counter == _iterations) { diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs index 51e4574..e583690 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_BatchHandler.cs @@ -23,16 +23,16 @@ public class PingPongSequencedLatencyTest_BatchHandler : ILatencyTest private readonly Ponger _ponger; private readonly IEventProcessor _pongProcessor; - public PingPongSequencedLatencyTest_BatchHandler() + public PingPongSequencedLatencyTest_BatchHandler(ProgramOptions options) { - var pingBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); - var pongBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); + var pingBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); + var pongBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var pingBarrier = pingBuffer.NewBarrier(); var pongBarrier = pongBuffer.NewBarrier(); - _pinger = new Pinger(pingBuffer, _iterations, _pauseNanos); - _ponger = new Ponger(pongBuffer); + _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0), _iterations, _pauseNanos); + _ponger = new Ponger(pongBuffer, options.GetCustomCpu(1)); _pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger); _pongProcessor = EventProcessorFactory.Create(pingBuffer,pingBarrier, _ponger); @@ -72,6 +72,7 @@ public void Run(LatencySessionContext sessionContext) private class Pinger : IBatchEventHandler { private readonly RingBuffer _buffer; + private readonly int? _cpu; private readonly long _maxEvents; private readonly long _pauseTimeNs; private readonly long _pauseTimeTicks; @@ -80,10 +81,12 @@ private class Pinger : IBatchEventHandler private long _counter; private CountdownEvent _globalSignal; private ManualResetEvent _signal; + private ThreadAffinityScope _affinityScope; - public Pinger(RingBuffer buffer, long maxEvents, long pauseTimeNs) + public Pinger(RingBuffer buffer, int? cpu, long maxEvents, long pauseTimeNs) { _buffer = buffer; + _cpu = cpu; _maxEvents = maxEvents; _pauseTimeNs = pauseTimeNs; _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); @@ -125,6 +128,8 @@ private void Send() public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); @@ -135,6 +140,7 @@ public void OnStart() public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram) @@ -150,11 +156,14 @@ public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, Histogra private class Ponger : IBatchEventHandler { private readonly RingBuffer _buffer; + private readonly int? _cpu; private CountdownEvent _globalSignal; + private ThreadAffinityScope _affinityScope; - public Ponger(RingBuffer buffer) + public Ponger(RingBuffer buffer, int? cpu) { _buffer = buffer; + _cpu = cpu; } public void OnBatch(EventBatch batch, long sequence) @@ -169,12 +178,15 @@ public void OnBatch(EventBatch batch, long sequence) public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); } public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal) diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs index f9cd5df..f4420f3 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Multi.cs @@ -23,16 +23,16 @@ public class PingPongSequencedLatencyTest_Multi : ILatencyTest private readonly Ponger _ponger; private readonly IEventProcessor _pongProcessor; - public PingPongSequencedLatencyTest_Multi() + public PingPongSequencedLatencyTest_Multi(ProgramOptions options) { - var pingBuffer = RingBuffer.CreateMultiProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); - var pongBuffer = RingBuffer.CreateMultiProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); + var pingBuffer = RingBuffer.CreateMultiProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); + var pongBuffer = RingBuffer.CreateMultiProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var pingBarrier = pingBuffer.NewBarrier(); var pongBarrier = pongBuffer.NewBarrier(); - _pinger = new Pinger(pingBuffer, _iterations, _pauseNanos); - _ponger = new Ponger(pongBuffer); + _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0)); + _ponger = new Ponger(pongBuffer, options.GetCustomCpu(1)); _pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger); _pongProcessor = EventProcessorFactory.Create(pingBuffer,pingBarrier, _ponger); @@ -74,30 +74,29 @@ public void Run(LatencySessionContext sessionContext) private class Pinger : IEventHandler { private readonly RingBuffer _buffer; - private readonly long _maxEvents; - private readonly long _pauseTimeNs; + private readonly int? _cpu; private readonly long _pauseTimeTicks; private HistogramBase _histogram; private long _t0; private long _counter; private CountdownEvent _globalSignal; private ManualResetEvent _signal; + private ThreadAffinityScope _affinityScope; - public Pinger(RingBuffer buffer, long maxEvents, long pauseTimeNs) + public Pinger(RingBuffer buffer, int? cpu) { _buffer = buffer; - _maxEvents = maxEvents; - _pauseTimeNs = pauseTimeNs; - _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); + _cpu = cpu; + _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos); } public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) { var t1 = Stopwatch.GetTimestamp(); - _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs); + _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos); - if (data.Value < _maxEvents) + if (data.Value < _iterations) { while (_pauseTimeTicks > (Stopwatch.GetTimestamp() - t1)) { @@ -124,6 +123,8 @@ private void Send() public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); @@ -134,6 +135,7 @@ public void OnStart() public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram) @@ -149,11 +151,14 @@ public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, Histogra private class Ponger : IEventHandler { private readonly RingBuffer _buffer; + private readonly int? _cpu; private CountdownEvent _globalSignal; + private ThreadAffinityScope _affinityScope; - public Ponger(RingBuffer buffer) + public Ponger(RingBuffer buffer, int? cpu) { _buffer = buffer; + _cpu = cpu; } public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) @@ -165,12 +170,15 @@ public void OnEvent(PerfEvent data, long sequence, bool endOfBatch) public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); } public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal) diff --git a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs index a3012f5..7405eb8 100644 --- a/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs +++ b/src/Disruptor.PerfTests/Latency/PingPong/PingPongSequencedLatencyTest_Value.cs @@ -23,7 +23,7 @@ public class PingPongSequencedLatencyTest_Value : ILatencyTest private readonly Ponger _ponger; private readonly IValueEventProcessor _pongProcessor; - public PingPongSequencedLatencyTest_Value() + public PingPongSequencedLatencyTest_Value(ProgramOptions options) { var pingBuffer = ValueRingBuffer.CreateSingleProducer(PerfValueEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); var pongBuffer = ValueRingBuffer.CreateSingleProducer(PerfValueEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); @@ -31,8 +31,8 @@ public PingPongSequencedLatencyTest_Value() var pingBarrier = pingBuffer.NewBarrier(); var pongBarrier = pongBuffer.NewBarrier(); - _pinger = new Pinger(pingBuffer, _iterations, _pauseNanos); - _ponger = new Ponger(pongBuffer); + _pinger = new Pinger(pingBuffer, options.GetCustomCpu(0)); + _ponger = new Ponger(pongBuffer, options.GetCustomCpu(1)); _pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger); _pongProcessor = EventProcessorFactory.Create(pingBuffer,pingBarrier, _ponger); @@ -72,30 +72,29 @@ public void Run(LatencySessionContext sessionContext) private class Pinger : IValueEventHandler { private readonly ValueRingBuffer _buffer; - private readonly long _maxEvents; - private readonly long _pauseTimeNs; + private readonly int? _cpu; private readonly long _pauseTimeTicks; private HistogramBase _histogram; private long _t0; private long _counter; private CountdownEvent _globalSignal; private ManualResetEvent _signal; + private ThreadAffinityScope _affinityScope; - public Pinger(ValueRingBuffer buffer, long maxEvents, long pauseTimeNs) + public Pinger(ValueRingBuffer buffer, int? cpu) { _buffer = buffer; - _maxEvents = maxEvents; - _pauseTimeNs = pauseTimeNs; - _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs); + _cpu = cpu; + _pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos); } public void OnEvent(ref PerfValueEvent data, long sequence, bool endOfBatch) { var t1 = Stopwatch.GetTimestamp(); - _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs); + _histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos); - if (data.Value < _maxEvents) + if (data.Value < _iterations) { while (_pauseTimeTicks > (Stopwatch.GetTimestamp() - t1)) { @@ -122,6 +121,8 @@ private void Send() public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); @@ -132,6 +133,7 @@ public void OnStart() public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram) @@ -147,11 +149,14 @@ public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, Histogra private class Ponger : IValueEventHandler { private readonly ValueRingBuffer _buffer; + private readonly int? _cpu; private CountdownEvent _globalSignal; + private ThreadAffinityScope _affinityScope; - public Ponger(ValueRingBuffer buffer) + public Ponger(ValueRingBuffer buffer, int? cpu) { _buffer = buffer; + _cpu = cpu; } public void OnEvent(ref PerfValueEvent data, long sequence, bool endOfBatch) @@ -163,12 +168,15 @@ public void OnEvent(ref PerfValueEvent data, long sequence, bool endOfBatch) public void OnStart() { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest); + _globalSignal.Signal(); _globalSignal.Wait(); } public void OnShutdown() { + _affinityScope.Dispose(); } public void Reset(CountdownEvent globalSignal) diff --git a/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs b/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs index cbd47b7..8ff6c38 100644 --- a/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs +++ b/src/Disruptor.PerfTests/Support/AdditionEventHandler.cs @@ -9,21 +9,19 @@ public class AdditionEventHandler(int? cpu = null) : IEventHandler, I private PaddedLong _batchesProcessed; private long _latchSequence; private readonly ManualResetEvent _latch = new(false); - private ThreadAffinityUtil.Scope _affinityScope; + private ThreadAffinityScope _affinityScope; public long Value => _value.Value; public long BatchesProcessed => _batchesProcessed.Value; public void OnStart() { - if (cpu != null) - _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu.Value, ThreadPriority.Highest); + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu, ThreadPriority.Highest); } public void OnShutdown() { - if (cpu != null) - _affinityScope.Dispose(); + _affinityScope.Dispose(); } public void WaitForSequence() diff --git a/src/Disruptor.PerfTests/Support/LongArrayEventHandler.cs b/src/Disruptor.PerfTests/Support/LongArrayEventHandler.cs index aa7dd1b..bec69ac 100644 --- a/src/Disruptor.PerfTests/Support/LongArrayEventHandler.cs +++ b/src/Disruptor.PerfTests/Support/LongArrayEventHandler.cs @@ -3,16 +3,27 @@ namespace Disruptor.PerfTests.Support; -public class LongArrayEventHandler : IEventHandler +public class LongArrayEventHandler(int? cpu = null) : IEventHandler { private PaddedLong _value; private PaddedLong _batchesProcessed; private long _count; private ManualResetEvent _signal; + private ThreadAffinityScope _affinityScope; public long Value => _value.Value; public long BatchesProcessed => _batchesProcessed.Value; + public void OnStart() + { + _affinityScope = ThreadAffinityUtil.SetThreadAffinity(cpu, ThreadPriority.Highest); + } + + public void OnShutdown() + { + _affinityScope.Dispose(); + } + public void Reset(ManualResetEvent signal, long expectedCount) { _value.Value = 0; diff --git a/src/Disruptor.PerfTests/Support/ThreadAffinityScope.cs b/src/Disruptor.PerfTests/Support/ThreadAffinityScope.cs new file mode 100644 index 0000000..3769c4e --- /dev/null +++ b/src/Disruptor.PerfTests/Support/ThreadAffinityScope.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading; + +namespace Disruptor.PerfTests.Support; + +public readonly struct ThreadAffinityScope(bool hasAffinity, ThreadPriority? initialThreadPriority) : IDisposable +{ + public void Dispose() + { + if (hasAffinity) // It would be cleaner to restore previous affinity. + ThreadAffinityUtil.RemoveThreadAffinity(); + + if (initialThreadPriority != null) + Thread.CurrentThread.Priority = initialThreadPriority.Value; + } +} diff --git a/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs b/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs index cb66336..5924b56 100644 --- a/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs +++ b/src/Disruptor.PerfTests/Support/ThreadAffinityUtil.cs @@ -14,30 +14,35 @@ public class ThreadAffinityUtil [DllImport("libc.so.6")] private static extern int sched_setaffinity(int pid, IntPtr cpusetsize, ref ulong cpuset); - public static Scope SetThreadAffinity(int processorIndex, ThreadPriority? threadPriority = null) + public static ThreadAffinityScope SetThreadAffinity(int? cpuId, ThreadPriority? threadPriority = null) { - var previousThreadPriority = Thread.CurrentThread.Priority; - - Thread.BeginThreadAffinity(); - - var affinity = (1ul << processorIndex); - SetProcessorAffinity(affinity); + if (cpuId != null) + { + var affinity = (1ul << cpuId.Value); + SetProcessorAffinity(affinity); + } + ThreadPriority? previousThreadPriority; if (threadPriority != null) + { + previousThreadPriority = Thread.CurrentThread.Priority; Thread.CurrentThread.Priority = threadPriority.Value; + } + else + { + previousThreadPriority = null; + } - return new Scope(previousThreadPriority); + return new ThreadAffinityScope(cpuId != null, previousThreadPriority); } - private static void RemoveThreadAffinity() + public static void RemoveThreadAffinity() { var affinity = (1ul << Environment.ProcessorCount) - 1; SetProcessorAffinity(affinity); - - Thread.EndThreadAffinity(); } - private static void SetProcessorAffinity(ulong mask) + public static void SetProcessorAffinity(ulong mask) { #if NETFRAMEWORK SetProcessorAffinityWindows(mask); @@ -82,14 +87,4 @@ private static ProcessThread GetCurrentProcessThread() throw new InvalidOperationException($"Could not retrieve native thread with ID: {threadId}, current managed thread ID was {threadId}"); } - - public readonly struct Scope(ThreadPriority initialThreadPriority) : IDisposable - { - public void Dispose() - { - RemoveThreadAffinity(); - - Thread.CurrentThread.Priority = initialThreadPriority; - } - } } diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest.cs index 199f2df..d6740e4 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest.cs @@ -1,5 +1,6 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using Disruptor.PerfTests.Support; using Disruptor.Processing; @@ -36,14 +37,16 @@ public class OneToOneSequencedThroughputTest : IThroughputTest private const int _bufferSize = 1024 * 64; private const long _iterations = 1000L * 1000L * 100L; + private static readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); + private readonly ProgramOptions _options; private readonly RingBuffer _ringBuffer; private readonly AdditionEventHandler _eventHandler; - private readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest() + public OneToOneSequencedThroughputTest(ProgramOptions options) { - _eventHandler = new AdditionEventHandler(); + _options = options; + _eventHandler = new AdditionEventHandler(options.GetCustomCpu(1)); _ringBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _eventHandler); @@ -62,6 +65,8 @@ public long Run(ThroughputSessionContext sessionContext) _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + sessionContext.Start(); var ringBuffer = _ringBuffer; diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_BatchPublisher.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_BatchPublisher.cs index cf23466..937d626 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_BatchPublisher.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_BatchPublisher.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using Disruptor.PerfTests.Support; using Disruptor.Processing; @@ -38,19 +39,21 @@ public class OneToOneSequencedThroughputTest_BatchPublisher : IThroughputTest private const int _batchSize = 10; private const int _bufferSize = 1024 * 64; private const long _iterations = 1000L * 1000L * 100L; - private static readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations) * _batchSize; /////////////////////////////////////////////////////////////////////////////////////////////// + private static readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations) * _batchSize; + private readonly ProgramOptions _options; private readonly RingBuffer _ringBuffer; private readonly AdditionEventHandler _handler; private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest_BatchPublisher() + public OneToOneSequencedThroughputTest_BatchPublisher(ProgramOptions options) { + _options = options; + _handler = new AdditionEventHandler(options.GetCustomCpu(1)); _ringBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); - _handler = new AdditionEventHandler(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _handler); _ringBuffer.AddGatingSequences(_eventProcessor.Sequence); } @@ -66,6 +69,8 @@ public long Run(ThroughputSessionContext sessionContext) var processorTask = _eventProcessor.Start(); _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + sessionContext.Start(); var ringBuffer = _ringBuffer; @@ -92,4 +97,4 @@ public long Run(ThroughputSessionContext sessionContext) return _batchSize * _iterations; } -} \ No newline at end of file +} diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Blocking.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Blocking.cs index c26ee6d..09305d5 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Blocking.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Blocking.cs @@ -1,5 +1,6 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using Disruptor.PerfTests.Support; using Disruptor.Processing; @@ -34,6 +35,7 @@ namespace Disruptor.PerfTests.Throughput.OneToOne.EventHandler; /// public class OneToOneSequencedThroughputTest_Blocking : IThroughputTest { + private readonly ProgramOptions _options; private const int _bufferSize = 1024 * 64; private const long _iterations = 1000L * 1000L * 100L; @@ -42,9 +44,10 @@ public class OneToOneSequencedThroughputTest_Blocking : IThroughputTest private readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest_Blocking() + public OneToOneSequencedThroughputTest_Blocking(ProgramOptions options) { - _eventHandler = new AdditionEventHandler(); + _options = options; + _eventHandler = new AdditionEventHandler(options.GetCustomCpu(1)); _ringBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new BlockingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _eventHandler); @@ -63,6 +66,8 @@ public long Run(ThroughputSessionContext sessionContext) _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + sessionContext.Start(); var ringBuffer = _ringBuffer; diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_LongArray.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_LongArray.cs index 68bebb5..d98daae 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_LongArray.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_LongArray.cs @@ -39,15 +39,17 @@ public class OneToOneSequencedThroughputTest_LongArray : IThroughputTest /////////////////////////////////////////////////////////////////////////////////////////////// + private readonly ProgramOptions _options; private readonly RingBuffer _ringBuffer; private readonly LongArrayEventHandler _handler; private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest_LongArray() + public OneToOneSequencedThroughputTest_LongArray(ProgramOptions options) { + _options = options; + _handler = new LongArrayEventHandler(options.GetCustomCpu(1)); _ringBuffer = RingBuffer.CreateSingleProducer(() => new long[_arraySize], _bufferSize, new YieldingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); - _handler = new LongArrayEventHandler(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _handler); _ringBuffer.AddGatingSequences(_eventProcessor.Sequence); } @@ -58,12 +60,16 @@ public OneToOneSequencedThroughputTest_LongArray() public long Run(ThroughputSessionContext sessionContext) { - var signal = new ManualResetEvent(false); var expectedCount = _eventProcessor.Sequence.Value + _iterations; + + var signal = new ManualResetEvent(false); _handler.Reset(signal, _iterations); var processorTask = _eventProcessor.Start(); + _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + sessionContext.Start(); var ringBuffer = _ringBuffer; @@ -98,4 +104,4 @@ private void WaitForEventProcessorSequence(long expectedCount) Thread.Sleep(1); } } -} \ No newline at end of file +} diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Multi.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Multi.cs index c187e01..d75a6d0 100644 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Multi.cs +++ b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_Multi.cs @@ -1,5 +1,6 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using Disruptor.PerfTests.Support; using Disruptor.Processing; @@ -37,14 +38,16 @@ public class OneToOneSequencedThroughputTest_Multi : IThroughputTest private const int _bufferSize = 1024 * 64; private const long _iterations = 1000L * 1000L * 100L; + private static readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); + private readonly ProgramOptions _options; private readonly RingBuffer _ringBuffer; private readonly AdditionEventHandler _eventHandler; - private readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); private readonly IEventProcessor _eventProcessor; - public OneToOneSequencedThroughputTest_Multi() + public OneToOneSequencedThroughputTest_Multi(ProgramOptions options) { - _eventHandler = new AdditionEventHandler(); + _options = options; + _eventHandler = new AdditionEventHandler(options.GetCustomCpu(1)); _ringBuffer = RingBuffer.CreateMultiProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); var sequenceBarrier = _ringBuffer.NewBarrier(); _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _eventHandler); @@ -63,6 +66,8 @@ public long Run(ThroughputSessionContext sessionContext) _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); + using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest); + sessionContext.Start(); var ringBuffer = _ringBuffer; diff --git a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs b/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs deleted file mode 100644 index 4f401d7..0000000 --- a/src/Disruptor.PerfTests/Throughput/OneToOne/EventHandler/OneToOneSequencedThroughputTest_ThreadAffinity.cs +++ /dev/null @@ -1,96 +0,0 @@ -using System; -using System.Runtime.CompilerServices; -using System.Threading; -using Disruptor.PerfTests.Support; -using Disruptor.Processing; - -namespace Disruptor.PerfTests.Throughput.OneToOne.EventHandler; - -/// -/// Unicast a series of items between 1 publisher and 1 event processor. -/// Use thread-affinity to pin publishing and consuming threads on different cores. -/// -/// +----+ +-----+ -/// | P1 |--->| EP1 | -/// +----+ +-----+ -/// -/// Disruptor: -/// ========== -/// track to prevent wrap -/// +------------------+ -/// | | -/// | v -/// +----+ +====+ +====+ +-----+ -/// | P1 |---\| RB |/---| SB | | EP1 | -/// +----+ +====+ +====+ +-----+ -/// claim get ^ | -/// | | -/// +--------+ -/// waitFor -/// -/// P1 - Publisher 1 -/// RB - RingBuffer -/// SB - SequenceBarrier -/// EP1 - EventProcessor 1 -/// -public class OneToOneSequencedThroughputTest_ThreadAffinity : IThroughputTest -{ - private const int _bufferSize = 1024 * 64; - private const long _iterations = 1000L * 1000L * 100L; - - private readonly ProgramOptions _options; - private readonly RingBuffer _ringBuffer; - private readonly AdditionEventHandler _eventHandler; - private readonly long _expectedResult = PerfTestUtil.AccumulatedAddition(_iterations); - private readonly IEventProcessor _eventProcessor; - - public OneToOneSequencedThroughputTest_ThreadAffinity(ProgramOptions options) - { - _options = options; - _eventHandler = new AdditionEventHandler(options.CpuSet[1]); - _ringBuffer = RingBuffer.CreateSingleProducer(PerfEvent.EventFactory, _bufferSize, new YieldingWaitStrategy()); - var sequenceBarrier = _ringBuffer.NewBarrier(); - _eventProcessor = EventProcessorFactory.Create(_ringBuffer, sequenceBarrier, _eventHandler); - _ringBuffer.AddGatingSequences(_eventProcessor.Sequence); - } - - public int RequiredProcessorCount => 2; - - [MethodImpl(512)] - public long Run(ThroughputSessionContext sessionContext) - { - long expectedCount = _eventProcessor.Sequence.Value + _iterations; - - _eventHandler.Reset(expectedCount); - var processorTask = _eventProcessor.Start(); - - _eventProcessor.WaitUntilStarted(TimeSpan.FromSeconds(5)); - - using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.CpuSet[0]); - - Thread.CurrentThread.Priority = ThreadPriority.Highest; - - sessionContext.Start(); - - var ringBuffer = _ringBuffer; - - for (long i = 0; i < _iterations; i++) - { - var s = ringBuffer.Next(); - ringBuffer[s].Value = i; - ringBuffer.Publish(s); - } - - _eventHandler.WaitForSequence(); - sessionContext.Stop(); - PerfTestUtil.WaitForEventProcessorSequence(expectedCount, _eventProcessor); - _eventProcessor.Halt(); - processorTask.Wait(2000); - - sessionContext.SetBatchData(_eventHandler.BatchesProcessed, _iterations); - - PerfTestUtil.FailIfNot(_expectedResult, _eventHandler.Value, $"Handler should have processed {_expectedResult} events, but was: {_eventHandler.Value}"); - - return _iterations; - } -}