Skip to content

Commit

Permalink
Add base interface for event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Dec 1, 2024
1 parent 5c29270 commit 38620ab
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 194 deletions.
18 changes: 6 additions & 12 deletions src/Disruptor/Dsl/ConsumerRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@ namespace Disruptor.Dsl;

internal class ConsumerRepository : IEnumerable<IConsumerInfo>
{
private readonly Dictionary<object, EventProcessorInfo> _eventProcessorInfoByEventHandler;
private readonly Dictionary<Sequence, IConsumerInfo> _eventProcessorInfoBySequence;
private readonly Dictionary<IEventHandler, EventProcessorInfo> _eventProcessorInfoByEventHandler = new(new IdentityComparer<IEventHandler>());
private readonly Dictionary<Sequence, IConsumerInfo> _eventProcessorInfoBySequence = new(new IdentityComparer<Sequence>());
private readonly List<IConsumerInfo> _consumerInfos = new();

public ConsumerRepository()
{
_eventProcessorInfoByEventHandler = new Dictionary<object, EventProcessorInfo>(new IdentityComparer<object>());
_eventProcessorInfoBySequence = new Dictionary<Sequence, IConsumerInfo>(new IdentityComparer<Sequence>());
}

public void Add(IEventProcessor eventProcessor, object eventHandler, DependentSequenceGroup dependentSequences)
public void Add(IEventProcessor eventProcessor, IEventHandler eventHandler, DependentSequenceGroup dependentSequences)
{
var consumerInfo = new EventProcessorInfo(eventProcessor, eventHandler, dependentSequences);
_eventProcessorInfoByEventHandler[eventHandler] = consumerInfo;
Expand Down Expand Up @@ -63,7 +57,7 @@ public bool HasBacklog(long cursor, bool includeStopped)
return false;
}

public IEventProcessor GetEventProcessorFor(object eventHandler)
public IEventProcessor GetEventProcessorFor(IEventHandler eventHandler)
{
var found = _eventProcessorInfoByEventHandler.TryGetValue(eventHandler, out var eventProcessorInfo);
if(!found)
Expand All @@ -74,7 +68,7 @@ public IEventProcessor GetEventProcessorFor(object eventHandler)
return eventProcessorInfo!.EventProcessor;
}

public Sequence GetSequenceFor(object eventHandler)
public Sequence GetSequenceFor(IEventHandler eventHandler)
{
return GetEventProcessorFor(eventHandler).Sequence;
}
Expand All @@ -90,7 +84,7 @@ public void UnMarkEventProcessorsAsEndOfChain(params Sequence[] barrierEventProc
}
}

public DependentSequenceGroup? GetDependentSequencesFor(object eventHandler)
public DependentSequenceGroup? GetDependentSequencesFor(IEventHandler eventHandler)
{
return _eventProcessorInfoByEventHandler.TryGetValue(eventHandler, out var eventProcessorInfo) ? eventProcessorInfo.DependentSequences : null;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Disruptor/Dsl/EventProcessorInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Disruptor.Dsl;
/// </summary>
internal class EventProcessorInfo : IConsumerInfo
{
public EventProcessorInfo(IEventProcessor eventProcessor, object? eventHandler, DependentSequenceGroup? dependentSequences)
public EventProcessorInfo(IEventProcessor eventProcessor, IEventHandler? eventHandler, DependentSequenceGroup? dependentSequences)
{
EventProcessor = eventProcessor;
Handler = eventHandler;
Expand All @@ -22,7 +22,7 @@ public EventProcessorInfo(IEventProcessor eventProcessor, object? eventHandler,

public Sequence[] Sequences => new[] { EventProcessor.Sequence };

public object? Handler { get; }
public IEventHandler? Handler { get; }

public DependentSequenceGroup? DependentSequences { get; }

Expand Down
4 changes: 2 additions & 2 deletions src/Disruptor/Dsl/ExceptionHandlerSetting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ namespace Disruptor.Dsl;
/// <typeparam name="T">the type of event being handled.</typeparam>
public class ExceptionHandlerSetting<T> where T : class
{
private readonly object _eventHandler;
private readonly IEventHandler _eventHandler;
private readonly ConsumerRepository _consumerRepository;

internal ExceptionHandlerSetting(object eventHandler, ConsumerRepository consumerRepository)
internal ExceptionHandlerSetting(IEventHandler eventHandler, ConsumerRepository consumerRepository)
{
_eventHandler = eventHandler;
_consumerRepository = consumerRepository;
Expand Down
39 changes: 2 additions & 37 deletions src/Disruptor/IAsyncBatchEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,9 @@ namespace Disruptor;
/// </para>
/// </remarks>
/// <typeparam name="T">Type of events for sharing during exchange or parallel coordination of an event.</typeparam>
public interface IAsyncBatchEventHandler<T> where T : class
public interface IAsyncBatchEventHandler<T> : IEventHandler
where T : class
{
/// <summary>
/// Limits the size of event batches.
/// </summary>
/// <remarks>
/// The value will be read only once on start, thus dynamically changing the max batch size is not supported.
/// </remarks>
int? MaxBatchSize => null;

/// <summary>
/// Called when a publisher has committed events to the <see cref="RingBuffer{T}"/>. The <see cref="IAsyncBatchEventHandler{T}"/> will
/// read messages from the <see cref="RingBuffer{T}"/> in batches, where a batch is all of the events available to be
Expand All @@ -41,32 +34,4 @@ public interface IAsyncBatchEventHandler<T> where T : class
/// <param name="batch">Batch of events committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="sequence">Sequence number of the first event of the batch</param>
ValueTask OnBatch(EventBatch<T> batch, long sequence);

///<summary>
/// Called once on thread start before first event is available.
///</summary>
void OnStart()
{
}

/// <summary>
/// Called once just before the thread is shutdown.
/// </summary>
/// <remarks>
/// Sequence event processing will already have stopped before this method is called. No events will
/// be processed after this message.
/// </remarks>
void OnShutdown()
{
}

/// <summary>
/// Invoked when the wait strategy timeouts.
/// </summary>
/// <remarks>
/// This only happens if the current wait strategy can return timeouts (e.g.: <see cref="TimeoutBlockingWaitStrategy"/>).
/// </remarks>
void OnTimeout(long sequence)
{
}
}
39 changes: 1 addition & 38 deletions src/Disruptor/IBatchEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using Disruptor.Processing;

namespace Disruptor;
Expand All @@ -20,17 +19,9 @@ namespace Disruptor;
/// </para>
/// </remarks>
/// <typeparam name="T">Type of events for sharing during exchange or parallel coordination of an event.</typeparam>
public interface IBatchEventHandler<T>
public interface IBatchEventHandler<T> : IEventHandler
where T : class
{
/// <summary>
/// Limits the size of event batches.
/// </summary>
/// <remarks>
/// The value will be read only once on start, thus dynamically changing the max batch size is not supported.
/// </remarks>
int? MaxBatchSize => null;

/// <summary>
/// Called when a publisher has committed events to the <see cref="RingBuffer{T}"/>. The <see cref="IEventProcessor{T}"/> will
/// read messages from the <see cref="RingBuffer{T}"/> in batches, where a batch is all of the events available to be
Expand All @@ -40,32 +31,4 @@ public interface IBatchEventHandler<T>
/// <param name="batch">Batch of events committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="sequence">Sequence number of the first event of the batch</param>
void OnBatch(EventBatch<T> batch, long sequence);

///<summary>
/// Called once on thread start before first event is available.
///</summary>
void OnStart()
{
}

/// <summary>
/// Called once just before the thread is shutdown.
/// </summary>
/// <remarks>
/// Sequence event processing will already have stopped before this method is called. No events will
/// be processed after this message.
/// </remarks>
void OnShutdown()
{
}

/// <summary>
/// Invoked when the wait strategy timeouts.
/// </summary>
/// <remarks>
/// This only happens if the current wait strategy can return timeouts (e.g.: <see cref="TimeoutBlockingWaitStrategy"/>).
/// </remarks>
void OnTimeout(long sequence)
{
}
}
38 changes: 4 additions & 34 deletions src/Disruptor/IEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
using Disruptor.Dsl;
using Disruptor.Processing;

namespace Disruptor;
namespace Disruptor;

/// <summary>
/// Callback interface to be implemented for processing events as they become available in the <see cref="RingBuffer{T}"/>
/// Marker interface for event handler interfaces.
/// </summary>
/// <typeparam name="T">Type of events for sharing during exchange or parallel coordination of an event.</typeparam>
/// <remarks>
/// See <see cref="Dsl.Disruptor{T}.SetDefaultExceptionHandler"/> or <see cref="Disruptor{T}.HandleExceptionsFor(IEventHandler{T})"/>
/// if you want to handle exceptions propagated out of the handler.
/// See: <see cref="IEventHandler{T}"/>, <see cref="IValueEventHandler{T}"/>, <see cref="IBatchEventHandler{T}"/>, <see cref="IAsyncBatchEventHandler{T}"/>.
/// </remarks>
public interface IEventHandler<in T>
public interface IEventHandler
{
/// <summary>
/// Limits the size of event batches.
Expand All @@ -21,31 +16,6 @@ public interface IEventHandler<in T>
/// </remarks>
int? MaxBatchSize => null;

/// <summary>
/// Called when a publisher has committed events to the <see cref="RingBuffer{T}"/>. The <see cref="IEventProcessor{T}"/> will
/// read messages from the <see cref="RingBuffer{T}"/> in batches, where a batch is all of the events available to be
/// processed without having to wait for any new event to arrive. This can be useful for event handlers that need
/// to do slower operations like I/O as they can group together the data from multiple events into a single
/// operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
/// the time between that message an the next one is indeterminate.
/// </summary>
/// <param name="data">Data committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="sequence">Sequence number committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="endOfBatch">flag to indicate if this is the last event in a batch from the <see cref="RingBuffer{T}"/></param>
void OnEvent(T data, long sequence, bool endOfBatch);

/// <summary>
/// Called on each batch start before the first call to <see cref="IEventHandler{T}.OnEvent"/>.
/// </summary>
/// <param name="batchSize">the batch size.</param>
/// <remarks>
/// If your handler needs to explicitly process batches, please consider using <see cref="IBatchEventHandler{T}"/>
/// instead of implementing this method.
/// </remarks>
void OnBatchStart(long batchSize)
{
}

///<summary>
/// Called once on thread start before first event is available.
///</summary>
Expand Down
40 changes: 40 additions & 0 deletions src/Disruptor/IEventHandler`1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Disruptor.Dsl;
using Disruptor.Processing;

namespace Disruptor;

/// <summary>
/// Callback interface to be implemented for processing events as they become available in the <see cref="RingBuffer{T}"/>
/// </summary>
/// <typeparam name="T">Type of events for sharing during exchange or parallel coordination of an event.</typeparam>
/// <remarks>
/// See <see cref="Dsl.Disruptor{T}.SetDefaultExceptionHandler"/> or <see cref="Disruptor{T}.HandleExceptionsFor(IEventHandler{T})"/>
/// if you want to handle exceptions propagated out of the handler.
/// </remarks>
public interface IEventHandler<in T> : IEventHandler
{
/// <summary>
/// Called when a publisher has committed events to the <see cref="RingBuffer{T}"/>. The <see cref="IEventProcessor{T}"/> will
/// read messages from the <see cref="RingBuffer{T}"/> in batches, where a batch is all of the events available to be
/// processed without having to wait for any new event to arrive. This can be useful for event handlers that need
/// to do slower operations like I/O as they can group together the data from multiple events into a single
/// operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
/// the time between that message an the next one is indeterminate.
/// </summary>
/// <param name="data">Data committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="sequence">Sequence number committed to the <see cref="RingBuffer{T}"/></param>
/// <param name="endOfBatch">flag to indicate if this is the last event in a batch from the <see cref="RingBuffer{T}"/></param>
void OnEvent(T data, long sequence, bool endOfBatch);

/// <summary>
/// Called on each batch start before the first call to <see cref="IEventHandler{T}.OnEvent"/>.
/// </summary>
/// <param name="batchSize">the batch size.</param>
/// <remarks>
/// If your handler needs to explicitly process batches, please consider using <see cref="IBatchEventHandler{T}"/>
/// instead of implementing this method.
/// </remarks>
void OnBatchStart(long batchSize)
{
}
}
10 changes: 5 additions & 5 deletions src/Disruptor/IExceptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public interface IExceptionHandler<T>
void HandleEventException(Exception ex, long sequence, T evt);

/// <summary>
/// Callback to notify of an exception during <see cref="IEventHandler{T}.OnTimeout"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnTimeout"/>
/// </summary>
/// <param name="ex">ex throw during the starting process.</param>
/// <param name="sequence">sequence of the event which cause the exception.</param>
void HandleOnTimeoutException(Exception ex, long sequence);

/// <summary>
/// Strategy for handling uncaught exceptions when processing an event.
/// Strategy for handling uncaught exceptions when processing an event batch.
///
/// If the strategy wishes to terminate further processing by the <see cref="IEventProcessor{T}"/>
/// then it should throw a <see cref="ApplicationException"/>
Expand All @@ -39,14 +39,14 @@ public interface IExceptionHandler<T>
void HandleEventException(Exception ex, long sequence, EventBatch<T> batch);

/// <summary>
/// Callback to notify of an exception during <see cref="IEventHandler{T}.OnStart"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnStart"/>
/// </summary>
/// <param name="ex">ex throw during the starting process.</param>
void HandleOnStartException(Exception ex);

/// <summary>
/// Callback to notify of an exception during <see cref="IEventHandler{T}.OnShutdown"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnShutdown"/>
/// </summary>
/// <param name="ex">ex throw during the shutdown process.</param>
void HandleOnShutdownException(Exception ex);
}
}
38 changes: 1 addition & 37 deletions src/Disruptor/IValueEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,9 @@ namespace Disruptor;
/// See <see cref="Dsl.ValueDisruptor{T,TRingBuffer}.SetDefaultExceptionHandler"/> or <see cref="ValueDisruptor{T,TRingBuffer}.HandleExceptionsFor"/>
/// if you want to handle exceptions propagated out of the handler.
/// </remarks>
public interface IValueEventHandler<T>
public interface IValueEventHandler<T> : IEventHandler
where T : struct
{
/// <summary>
/// Limits the size of event batches.
/// </summary>
/// <remarks>
/// The value will be read only once on start, thus dynamically changing the max batch size is not supported.
/// </remarks>
int? MaxBatchSize => null;

/// <summary>
/// Called when a publisher has committed an event to the <see cref="ValueRingBuffer{T}"/>. The <see cref="IValueEventProcessor{T}"/> will
/// read messages from the <see cref="ValueRingBuffer{T}"/> in batches, where a batch is all of the events available to be
Expand All @@ -42,32 +34,4 @@ public interface IValueEventHandler<T>
void OnBatchStart(long batchSize)
{
}

///<summary>
/// Called once on thread start before first event is available.
///</summary>
void OnStart()
{
}

/// <summary>
/// Called once just before the thread is shutdown.
/// </summary>
/// <remarks>
/// Sequence event processing will already have stopped before this method is called. No events will
/// be processed after this message.
/// </remarks>
void OnShutdown()
{
}

/// <summary>
/// Invoked when the wait strategy timeouts.
/// </summary>
/// <remarks>
/// This only happens if the current wait strategy can return timeouts (e.g.: <see cref="TimeoutBlockingWaitStrategy"/>).
/// </remarks>
void OnTimeout(long sequence)
{
}
}
8 changes: 4 additions & 4 deletions src/Disruptor/IValueExceptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ public interface IValueExceptionHandler<T> where T : struct
void HandleEventException(Exception ex, long sequence, ref T evt);

/// <summary>
/// Callback to notify of an exception during <see cref="IValueEventHandler{T}.OnTimeout"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnTimeout"/>
/// </summary>
/// <param name="ex">ex throw during the starting process.</param>
/// <param name="sequence">sequence of the event which cause the exception.</param>
void HandleOnTimeoutException(Exception ex, long sequence);

/// <summary>
/// Callback to notify of an exception during <see cref="IValueEventHandler{T}.OnStart"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnStart"/>
/// </summary>
/// <param name="ex">ex throw during the starting process.</param>
void HandleOnStartException(Exception ex);

/// <summary>
/// Callback to notify of an exception during <see cref="IValueEventHandler{T}.OnShutdown"/>
/// Callback to notify of an exception during <see cref="IEventHandler.OnShutdown"/>
/// </summary>
/// <param name="ex">ex throw during the shutdown process.</param>
void HandleOnShutdownException(Exception ex);
}
}
Loading

0 comments on commit 38620ab

Please sign in to comment.