diff --git a/src/Disruptor.Tests/Processing/AsyncBatchEventProcessorTests.cs b/src/Disruptor.Tests/Processing/AsyncBatchEventProcessorTests.cs index 1f04e92..47c1dbc 100644 --- a/src/Disruptor.Tests/Processing/AsyncBatchEventProcessorTests.cs +++ b/src/Disruptor.Tests/Processing/AsyncBatchEventProcessorTests.cs @@ -190,7 +190,7 @@ public void ShouldAlwaysHalt() var waitStrategy = new AsyncWaitStrategy(); var sequencer = new SingleProducerSequencer(8, waitStrategy); var barrier = new AsyncSequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence())); - var dp = new ArrayDataProvider(sequencer.BufferSize); + var dp = new ArrayDataProvider(sequencer.BufferSize); var h1 = new LifeCycleHandler(); var p1 = CreateEventProcessor(dp, barrier, h1); @@ -227,6 +227,29 @@ public void ShouldAlwaysHalt() } } + [Test] + public void ShouldInvokeOnStartAndOnShutdown() + { + var handler = new LifeCycleHandler(); + var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler); + + var task = processor.Start(); + + var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(wasStarted); + + var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsFalse(wasShutdownAfterStart); + + processor.Halt(); + + var stopped = task.Wait(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(stopped); + + var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsTrue(wasShutdownAfterStop); + } + [Test] public void ShouldLimitMaxBatchSize() { @@ -274,12 +297,12 @@ public async ValueTask OnBatch(EventBatch batch, long sequence) } } - private class LifeCycleHandler : IAsyncBatchEventHandler + private class LifeCycleHandler : IAsyncBatchEventHandler { private readonly ManualResetEvent _startedSignal = new(false); private readonly ManualResetEvent _shutdownSignal = new(false); - public async ValueTask OnBatch(EventBatch batch, long sequence) + public async ValueTask OnBatch(EventBatch batch, long sequence) { await Task.Yield(); } diff --git a/src/Disruptor.Tests/Processing/BatchEventProcessorTests.cs b/src/Disruptor.Tests/Processing/BatchEventProcessorTests.cs index 138cdb2..ef97690 100644 --- a/src/Disruptor.Tests/Processing/BatchEventProcessorTests.cs +++ b/src/Disruptor.Tests/Processing/BatchEventProcessorTests.cs @@ -191,7 +191,7 @@ public void ShouldAlwaysHalt() var waitStrategy = new BusySpinWaitStrategy(); var sequencer = new SingleProducerSequencer(8, waitStrategy); var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence())); - var dp = new ArrayDataProvider(sequencer.BufferSize); + var dp = new ArrayDataProvider(sequencer.BufferSize); var h1 = new LifeCycleHandler(); var p1 = CreateEventProcessor(dp, barrier, h1); @@ -228,6 +228,29 @@ public void ShouldAlwaysHalt() } } + [Test] + public void ShouldInvokeOnStartAndOnShutdown() + { + var handler = new LifeCycleHandler(); + var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler); + + var task = processor.Start(); + + var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(wasStarted); + + var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsFalse(wasShutdownAfterStart); + + processor.Halt(); + + var stopped = task.Wait(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(stopped); + + var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsTrue(wasShutdownAfterStop); + } + [Test] public void ShouldLimitMaxBatchSize() { @@ -273,12 +296,12 @@ public void OnBatch(EventBatch batch, long sequence) } } - private class LifeCycleHandler : IBatchEventHandler + private class LifeCycleHandler : IBatchEventHandler { private readonly ManualResetEvent _startedSignal = new(false); private readonly ManualResetEvent _shutdownSignal = new(false); - public void OnBatch(EventBatch batch, long sequence) + public void OnBatch(EventBatch batch, long sequence) { } diff --git a/src/Disruptor.Tests/Processing/EventProcessorTests.cs b/src/Disruptor.Tests/Processing/EventProcessorTests.cs index 4d9720b..2b070ae 100644 --- a/src/Disruptor.Tests/Processing/EventProcessorTests.cs +++ b/src/Disruptor.Tests/Processing/EventProcessorTests.cs @@ -252,7 +252,7 @@ public void ShouldAlwaysHalt() var waitStrategy = new BusySpinWaitStrategy(); var sequencer = new SingleProducerSequencer(8, waitStrategy); var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence())); - var dp = new ArrayDataProvider(sequencer.BufferSize); + var dp = new ArrayDataProvider(sequencer.BufferSize); var h1 = new LifeCycleHandler(); var p1 = CreateEventProcessor(dp, barrier, h1); @@ -289,12 +289,35 @@ public void ShouldAlwaysHalt() } } - private class LifeCycleHandler : IEventHandler + [Test] + public void ShouldInvokeOnStartAndOnShutdown() + { + var handler = new LifeCycleHandler(); + var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler); + + var task = processor.Start(); + + var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(wasStarted); + + var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsFalse(wasShutdownAfterStart); + + processor.Halt(); + + var stopped = task.Wait(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(stopped); + + var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsTrue(wasShutdownAfterStop); + } + + private class LifeCycleHandler : IEventHandler { private readonly ManualResetEvent _startedSignal = new(false); private readonly ManualResetEvent _shutdownSignal = new(false); - public void OnEvent(object data, long sequence, bool endOfBatch) + public void OnEvent(StubEvent data, long sequence, bool endOfBatch) { } diff --git a/src/Disruptor.Tests/Processing/ValueEventProcessorTests.cs b/src/Disruptor.Tests/Processing/ValueEventProcessorTests.cs index ae8fd87..20a3495 100644 --- a/src/Disruptor.Tests/Processing/ValueEventProcessorTests.cs +++ b/src/Disruptor.Tests/Processing/ValueEventProcessorTests.cs @@ -214,7 +214,7 @@ public void ShouldAlwaysHalt() var waitStrategy = new BusySpinWaitStrategy(); var sequencer = new SingleProducerSequencer(8, waitStrategy); var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence())); - var dp = new ArrayValueDataProvider(sequencer.BufferSize); + var dp = new ArrayValueDataProvider(sequencer.BufferSize); var h1 = new LifeCycleHandler(); var p1 = CreateEventProcessor(dp, barrier, h1); @@ -251,12 +251,35 @@ public void ShouldAlwaysHalt() } } - private class LifeCycleHandler : IValueEventHandler + [Test] + public void ShouldInvokeOnStartAndOnShutdown() + { + var handler = new LifeCycleHandler(); + var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler); + + var task = processor.Start(); + + var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(wasStarted); + + var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsFalse(wasShutdownAfterStart); + + processor.Halt(); + + var stopped = task.Wait(TimeSpan.FromMilliseconds(500)); + Assert.IsTrue(stopped); + + var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10)); + Assert.IsTrue(wasShutdownAfterStop); + } + + private class LifeCycleHandler : IValueEventHandler { private readonly ManualResetEvent _startedSignal = new(false); private readonly ManualResetEvent _shutdownSignal = new(false); - public void OnEvent(ref long data, long sequence, bool endOfBatch) + public void OnEvent(ref StubValueEvent data, long sequence, bool endOfBatch) { }