Skip to content

Commit

Permalink
Support IEventProcessorSequenceAware in ValueBatchEventProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Jan 8, 2020
1 parent bd7899d commit ebed19a
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 46 deletions.
67 changes: 47 additions & 20 deletions src/Disruptor.Tests/SequenceReportingCallbackTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Disruptor.Tests.Support;
using NUnit.Framework;

Expand All @@ -7,59 +9,84 @@ namespace Disruptor.Tests
[TestFixture]
public class SequenceReportingCallbackTests
{
private readonly ManualResetEvent _callbackSignal = new ManualResetEvent(false);
private readonly ManualResetEvent _onEndOfBatchSignal = new ManualResetEvent(false);

[Test]
public void ShouldReportProgressByUpdatingSequenceViaCallback()
public void ShouldReportEventHandlerProgressByUpdatingSequenceViaCallback()
{
var ringBuffer = RingBuffer<StubEvent>.CreateMultiProducer(() => new StubEvent(-1), 16);
var sequenceBarrier = ringBuffer.NewBarrier();
ISequenceReportingEventHandler<StubEvent> handler = new TestSequenceReportingEventHandler(_callbackSignal, _onEndOfBatchSignal);
var handler = new TestSequenceReportingEventHandler();
var batchEventProcessor = BatchEventProcessorFactory.Create(ringBuffer, sequenceBarrier, handler);
ringBuffer.AddGatingSequences(batchEventProcessor.Sequence);

var task = Task.Run(batchEventProcessor.Run);

Assert.AreEqual(-1L, batchEventProcessor.Sequence.Value);
ringBuffer.Publish(ringBuffer.Next());

handler.CallbackSignal.WaitOne();
Assert.AreEqual(0L, batchEventProcessor.Sequence.Value);

handler.OnEndOfBatchSignal.Set();
Assert.AreEqual(0L, batchEventProcessor.Sequence.Value);

batchEventProcessor.Halt();
Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
}

[Test]
public void ShouldReportValueEventHandlerProgressByUpdatingSequenceViaCallback()
{
var ringBuffer = ValueRingBuffer<StubValueEvent>.CreateMultiProducer(() => new StubValueEvent(-1), 16);
var sequenceBarrier = ringBuffer.NewBarrier();
var handler = new TestSequenceReportingEventHandler();
var batchEventProcessor = BatchEventProcessorFactory.Create(ringBuffer, sequenceBarrier, handler);
ringBuffer.AddGatingSequences(batchEventProcessor.Sequence);

var thread = new Thread(batchEventProcessor.Run) {IsBackground = true};
thread.Start();
var task = Task.Run(batchEventProcessor.Run);

Assert.AreEqual(-1L, batchEventProcessor.Sequence.Value);
ringBuffer.Publish(ringBuffer.Next());

_callbackSignal.WaitOne();
handler.CallbackSignal.WaitOne();
Assert.AreEqual(0L, batchEventProcessor.Sequence.Value);

_onEndOfBatchSignal.Set();
handler.OnEndOfBatchSignal.Set();
Assert.AreEqual(0L, batchEventProcessor.Sequence.Value);

batchEventProcessor.Halt();
thread.Join();
Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10)));
}

private class TestSequenceReportingEventHandler : ISequenceReportingEventHandler<StubEvent>
private class TestSequenceReportingEventHandler : IEventHandler<StubEvent>, IValueEventHandler<StubValueEvent>, IEventProcessorSequenceAware
{
private ISequence _sequenceCallback;
private readonly ManualResetEvent _callbackSignal;
private readonly ManualResetEvent _onEndOfBatchSignal;

public TestSequenceReportingEventHandler(ManualResetEvent callbackSignal, ManualResetEvent onEndOfBatchSignal)
{
_callbackSignal = callbackSignal;
_onEndOfBatchSignal = onEndOfBatchSignal;
}
public ManualResetEvent CallbackSignal { get; } = new ManualResetEvent(false);
public ManualResetEvent OnEndOfBatchSignal { get; } = new ManualResetEvent(false);

public void SetSequenceCallback(ISequence sequenceTrackerCallback)
{
_sequenceCallback = sequenceTrackerCallback;
}

public void OnEvent(ref StubValueEvent data, long sequence, bool endOfBatch)
{
OnEvent(sequence, endOfBatch);
}

public void OnEvent(StubEvent evt, long sequence, bool endOfBatch)
{
OnEvent(sequence, endOfBatch);
}

private void OnEvent(long sequence, bool endOfBatch)
{
_sequenceCallback.SetValue(sequence);
_callbackSignal.Set();
CallbackSignal.Set();

if (endOfBatch)
{
_onEndOfBatchSignal.WaitOne();
OnEndOfBatchSignal.WaitOne();
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions src/Disruptor/BatchEventProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ namespace Disruptor
/// <summary>
/// Convenience class for handling the batching semantics of consuming events from a <see cref="RingBuffer{T}"/>
/// and delegating the available events to an <see cref="IEventHandler{T}"/>.
///
///
/// If the <see cref="IEventHandler{T}"/> also implements <see cref="ILifecycleAware"/> it will be notified just after the thread
/// is started and just before the thread is shutdown.
///
///
/// This class is kept mainly for compatibility reasons.
///
///
/// Consider using <see cref="BatchEventProcessorFactory"/> to create your <see cref="IEventProcessor"/>.
/// </summary>
/// <typeparam name="T">the type of event used.</typeparam>
Expand All @@ -21,7 +21,7 @@ public class BatchEventProcessor<T> : BatchEventProcessor<T, IDataProvider<T>, I
/// <summary>
/// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when
/// the <see cref="IEventHandler{T}.OnEvent"/> method returns.
///
///
/// Consider using <see cref="BatchEventProcessorFactory"/> to create your <see cref="IEventProcessor"/>.
/// </summary>
/// <param name="dataProvider">dataProvider to which events are published</param>
Expand Down Expand Up @@ -52,7 +52,7 @@ public void OnBatchStart(long batchSize)
/// <summary>
/// Convenience class for handling the batching semantics of consuming events from a <see cref="RingBuffer{T}"/>
/// and delegating the available events to an <see cref="IEventHandler{T}"/>.
///
///
/// If the <see cref="IEventHandler{T}"/> also implements <see cref="ILifecycleAware"/> it will be notified just after the thread
/// is started and just before the thread is shutdown.
/// </summary>
Expand All @@ -63,7 +63,6 @@ public void OnBatchStart(long batchSize)
/// <typeparam name="TBatchStartAware">the type of the <see cref="IBatchStartAware"/> used.</typeparam>
public class BatchEventProcessor<T, TDataProvider, TSequenceBarrier, TEventHandler, TBatchStartAware> : IBatchEventProcessor<T>
where T : class

where TDataProvider : IDataProvider<T>
where TSequenceBarrier : ISequenceBarrier
where TEventHandler : IEventHandler<T>
Expand Down Expand Up @@ -92,7 +91,7 @@ private static class RunningStates
/// <summary>
/// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when
/// the <see cref="IEventHandler{T}.OnEvent"/> method returns.
///
///
/// Consider using <see cref="BatchEventProcessorFactory"/> to create your <see cref="IEventProcessor"/>.
/// </summary>
/// <param name="dataProvider">dataProvider to which events are published</param>
Expand All @@ -106,8 +105,8 @@ public BatchEventProcessor(TDataProvider dataProvider, TSequenceBarrier sequence
_eventHandler = eventHandler;
_batchStartAware = batchStartAware;

if (eventHandler is ISequenceReportingEventHandler<T> sequenceReportingEventHandler)
sequenceReportingEventHandler.SetSequenceCallback(_sequence);
if (eventHandler is IEventProcessorSequenceAware sequenceAware)
sequenceAware.SetSequenceCallback(_sequence);

_timeoutHandler = eventHandler as ITimeoutHandler;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Disruptor/IBatchStartAware.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace Disruptor
{
/// <summary>
/// Implement this interface in your <see cref="IEventHandler{T}"/> to be notified when a batch is starting.
/// Implement this interface in your event handler to be notified when a batch is starting.
/// </summary>
public interface IBatchStartAware
{
/// <summary>
/// Called on each batch start before the first call to <see cref="IEventHandler{T}.OnEvent"/>.
/// Called on each batch start before the first call to <see cref="IEventHandler{T}.OnEvent"/> or <see cref="IValueEventHandler{T}.OnEvent"/>.
/// </summary>
/// <param name="batchSize">the batch size.</param>
void OnBatchStart(long batchSize);
Expand Down
21 changes: 21 additions & 0 deletions src/Disruptor/IEventProcessorSequenceAware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace Disruptor
{
/// <summary>
/// Implement this interface in your event handler to obtain the <see cref="IEventProcessor"/> sequence.
///
/// Used by the <see cref="IEventProcessor"/> to set a callback allowing the event handler to notify
/// when it has finished consuming an event if this happens after the OnEvent call.
///
/// Typically this would be used when the handler is performing some sort of batching operation such as writing to an IO
/// device; after the operation has completed, the implementation should set <see cref="Sequence.Value"/> to update the
/// sequence and allow other processes that are dependent on this handler to progress.
/// </summary>
public interface IEventProcessorSequenceAware
{
/// <summary>
/// Call by the <see cref="IEventProcessor"/> to setup the callback.
/// </summary>
/// <param name="sequenceCallback">callback on which to notify the <see cref="IEventProcessor"/> that the sequence has progressed.</param>
void SetSequenceCallback(ISequence sequenceCallback);
}
}
5 changes: 4 additions & 1 deletion src/Disruptor/IEventReleaseAware.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
namespace Disruptor
{
/// <summary>
/// Implement this interface in your <see cref="IWorkHandler{T}"/> to obtain the <see cref="IEventReleaser"/>.
/// </summary>
public interface IEventReleaseAware
{
void SetEventReleaser(IEventReleaser eventReleaser);
}
}
}
5 changes: 2 additions & 3 deletions src/Disruptor/ILifecycleAware.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace Disruptor
{
/// <summary>
/// Implement this interface in your <see cref="IEventHandler{T}"/> to be notified when a thread for the
/// <see cref="IBatchEventProcessor{T}"/> starts and shuts down.
/// Implement this interface in your event handler to be notified when the processing thread starts and shuts down.
/// </summary>
public interface ILifecycleAware
{
Expand All @@ -13,7 +12,7 @@ public interface ILifecycleAware

/// <summary>
/// Called once just before the thread is shutdown.
///
///
/// Sequence event processing will already have stopped before this method is called. No events will
/// be processed after this message.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@ namespace Disruptor
/// <summary>
/// Used by the <see cref="IBatchEventProcessor{T}"/> to set a callback allowing the <see cref="IEventHandler{T}"/> to notify
/// when it has finished consuming an event if this happens after the <see cref="IEventHandler{T}.OnEvent"/> call.
///
///
/// Typically this would be used when the handler is performing some sort of batching operation such as writing to an IO
/// device; after the operation has completed, the implementation should set <see cref="Sequence.Value"/> to update the
/// sequence and allow other processes that are dependent on this handler to progress.
/// </summary>
/// <typeparam name="T">event implementation storing the data for sharing during exchange or parallel coordination of an event.</typeparam>
public interface ISequenceReportingEventHandler<in T> : IEventHandler<T>
public interface ISequenceReportingEventHandler<in T> : IEventHandler<T>, IEventProcessorSequenceAware
{
/// <summary>
/// Call by the <see cref="IBatchEventProcessor{T}"/> to setup the callback.
/// </summary>
/// <param name="sequenceCallback">callback on which to notify the <see cref="IBatchEventProcessor{T}"/> that the sequence has progressed.</param>
void SetSequenceCallback(ISequence sequenceCallback);
}
}
8 changes: 4 additions & 4 deletions src/Disruptor/ValueBatchEventProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Disruptor
/// <summary>
/// Convenience class for handling the batching semantics of consuming events from a <see cref="ValueRingBuffer{T}"/>
/// and delegating the available events to an <see cref="IValueEventHandler{T}"/>.
///
///
/// If the <see cref="IValueEventHandler{T}"/> also implements <see cref="ILifecycleAware"/> it will be notified just after the thread
/// is started and just before the thread is shutdown.
/// </summary>
Expand Down Expand Up @@ -46,7 +46,7 @@ private static class RunningStates
/// <summary>
/// Construct a BatchEventProcessor that will automatically track the progress by updating its sequence when
/// the <see cref="IValueEventHandler{T}.OnEvent"/> method returns.
///
///
/// Consider using <see cref="BatchEventProcessorFactory"/> to create your <see cref="IEventProcessor"/>.
/// </summary>
/// <param name="dataProvider">dataProvider to which events are published</param>
Expand All @@ -60,8 +60,8 @@ public ValueBatchEventProcessor(TDataProvider dataProvider, TSequenceBarrier seq
_eventHandler = eventHandler;
_batchStartAware = batchStartAware;

if (eventHandler is ISequenceReportingEventHandler<T> sequenceReportingEventHandler)
sequenceReportingEventHandler.SetSequenceCallback(_sequence);
if (eventHandler is IEventProcessorSequenceAware sequenceAware)
sequenceAware.SetSequenceCallback(_sequence);

_timeoutHandler = eventHandler as ITimeoutHandler;
}
Expand Down

0 comments on commit ebed19a

Please sign in to comment.