Skip to content

Commit

Permalink
Add tests for OnStart and OnShutdown invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Aug 9, 2023
1 parent ed192e4 commit 95a0af3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 12 deletions.
29 changes: 26 additions & 3 deletions src/Disruptor.Tests/Processing/AsyncBatchEventProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void ShouldAlwaysHalt()
var waitStrategy = new AsyncWaitStrategy();
var sequencer = new SingleProducerSequencer(8, waitStrategy);
var barrier = new AsyncSequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence()));
var dp = new ArrayDataProvider<object>(sequencer.BufferSize);
var dp = new ArrayDataProvider<StubEvent>(sequencer.BufferSize);

var h1 = new LifeCycleHandler();
var p1 = CreateEventProcessor(dp, barrier, h1);
Expand Down Expand Up @@ -227,6 +227,29 @@ public void ShouldAlwaysHalt()
}
}

[Test]
public void ShouldInvokeOnStartAndOnShutdown()
{
var handler = new LifeCycleHandler();
var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler);

var task = processor.Start();

var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(wasStarted);

var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsFalse(wasShutdownAfterStart);

processor.Halt();

var stopped = task.Wait(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(stopped);

var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsTrue(wasShutdownAfterStop);
}

[Test]
public void ShouldLimitMaxBatchSize()
{
Expand Down Expand Up @@ -274,12 +297,12 @@ public async ValueTask OnBatch(EventBatch<StubEvent> batch, long sequence)
}
}

private class LifeCycleHandler : IAsyncBatchEventHandler<object>
private class LifeCycleHandler : IAsyncBatchEventHandler<StubEvent>
{
private readonly ManualResetEvent _startedSignal = new(false);
private readonly ManualResetEvent _shutdownSignal = new(false);

public async ValueTask OnBatch(EventBatch<object> batch, long sequence)
public async ValueTask OnBatch(EventBatch<StubEvent> batch, long sequence)
{
await Task.Yield();
}
Expand Down
29 changes: 26 additions & 3 deletions src/Disruptor.Tests/Processing/BatchEventProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void ShouldAlwaysHalt()
var waitStrategy = new BusySpinWaitStrategy();
var sequencer = new SingleProducerSequencer(8, waitStrategy);
var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence()));
var dp = new ArrayDataProvider<object>(sequencer.BufferSize);
var dp = new ArrayDataProvider<StubEvent>(sequencer.BufferSize);

var h1 = new LifeCycleHandler();
var p1 = CreateEventProcessor(dp, barrier, h1);
Expand Down Expand Up @@ -228,6 +228,29 @@ public void ShouldAlwaysHalt()
}
}

[Test]
public void ShouldInvokeOnStartAndOnShutdown()
{
var handler = new LifeCycleHandler();
var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler);

var task = processor.Start();

var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(wasStarted);

var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsFalse(wasShutdownAfterStart);

processor.Halt();

var stopped = task.Wait(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(stopped);

var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsTrue(wasShutdownAfterStop);
}

[Test]
public void ShouldLimitMaxBatchSize()
{
Expand Down Expand Up @@ -273,12 +296,12 @@ public void OnBatch(EventBatch<StubEvent> batch, long sequence)
}
}

private class LifeCycleHandler : IBatchEventHandler<object>
private class LifeCycleHandler : IBatchEventHandler<StubEvent>
{
private readonly ManualResetEvent _startedSignal = new(false);
private readonly ManualResetEvent _shutdownSignal = new(false);

public void OnBatch(EventBatch<object> batch, long sequence)
public void OnBatch(EventBatch<StubEvent> batch, long sequence)
{
}

Expand Down
29 changes: 26 additions & 3 deletions src/Disruptor.Tests/Processing/EventProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void ShouldAlwaysHalt()
var waitStrategy = new BusySpinWaitStrategy();
var sequencer = new SingleProducerSequencer(8, waitStrategy);
var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence()));
var dp = new ArrayDataProvider<object>(sequencer.BufferSize);
var dp = new ArrayDataProvider<StubEvent>(sequencer.BufferSize);

var h1 = new LifeCycleHandler();
var p1 = CreateEventProcessor(dp, barrier, h1);
Expand Down Expand Up @@ -289,12 +289,35 @@ public void ShouldAlwaysHalt()
}
}

private class LifeCycleHandler : IEventHandler<object>
[Test]
public void ShouldInvokeOnStartAndOnShutdown()
{
var handler = new LifeCycleHandler();
var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler);

var task = processor.Start();

var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(wasStarted);

var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsFalse(wasShutdownAfterStart);

processor.Halt();

var stopped = task.Wait(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(stopped);

var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsTrue(wasShutdownAfterStop);
}

private class LifeCycleHandler : IEventHandler<StubEvent>
{
private readonly ManualResetEvent _startedSignal = new(false);
private readonly ManualResetEvent _shutdownSignal = new(false);

public void OnEvent(object data, long sequence, bool endOfBatch)
public void OnEvent(StubEvent data, long sequence, bool endOfBatch)
{
}

Expand Down
29 changes: 26 additions & 3 deletions src/Disruptor.Tests/Processing/ValueEventProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void ShouldAlwaysHalt()
var waitStrategy = new BusySpinWaitStrategy();
var sequencer = new SingleProducerSequencer(8, waitStrategy);
var barrier = new SequenceBarrier(sequencer, waitStrategy, new DependentSequenceGroup(new Sequence()));
var dp = new ArrayValueDataProvider<long>(sequencer.BufferSize);
var dp = new ArrayValueDataProvider<StubValueEvent>(sequencer.BufferSize);

var h1 = new LifeCycleHandler();
var p1 = CreateEventProcessor(dp, barrier, h1);
Expand Down Expand Up @@ -251,12 +251,35 @@ public void ShouldAlwaysHalt()
}
}

private class LifeCycleHandler : IValueEventHandler<long>
[Test]
public void ShouldInvokeOnStartAndOnShutdown()
{
var handler = new LifeCycleHandler();
var processor = CreateEventProcessor(_ringBuffer, _sequenceBarrier, handler);

var task = processor.Start();

var wasStarted = handler.WaitStart(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(wasStarted);

var wasShutdownAfterStart = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsFalse(wasShutdownAfterStart);

processor.Halt();

var stopped = task.Wait(TimeSpan.FromMilliseconds(500));
Assert.IsTrue(stopped);

var wasShutdownAfterStop = handler.WaitShutdown(TimeSpan.FromMilliseconds(10));
Assert.IsTrue(wasShutdownAfterStop);
}

private class LifeCycleHandler : IValueEventHandler<StubValueEvent>
{
private readonly ManualResetEvent _startedSignal = new(false);
private readonly ManualResetEvent _shutdownSignal = new(false);

public void OnEvent(ref long data, long sequence, bool endOfBatch)
public void OnEvent(ref StubValueEvent data, long sequence, bool endOfBatch)
{
}

Expand Down

0 comments on commit 95a0af3

Please sign in to comment.