Skip to content

Commit

Permalink
Support handlers with explicit interface implementations
Browse files Browse the repository at this point in the history
(cherry picked from commit 2553b63)

# Conflicts:
#	src/Disruptor/BatchEventProcessorFactory.cs
  • Loading branch information
ocoanet committed Jun 11, 2019
1 parent b8d3fc2 commit 82867fe
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
25 changes: 20 additions & 5 deletions src/Disruptor.Tests/BatchEventProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,13 @@ public bool WaitShutdown(TimeSpan timeSpan)
}
}

[Test]
public void ShouldNotPassZeroSizeToBatchStartAware()
[TestCase(typeof(BatchAwareEventHandler))]
[TestCase(typeof(BatchAwareEventHandlerInternal))]
public void ShouldNotPassZeroSizeToBatchStartAware(Type eventHandlerType)
{
var latch = new CountdownEvent(3);

var eventHandler = new BatchAwareEventHandler(x => latch.Signal());
var eventHandler = (BatchAwareEventHandler)Activator.CreateInstance(eventHandlerType, (Action<StubEvent>)(x => latch.Signal()));

var batchEventProcessor = CreateBatchEventProcessor(_ringBuffer, new DelegatingSequenceBarrier(_sequenceBarrier), eventHandler);

Expand Down Expand Up @@ -283,19 +284,33 @@ public void CheckAlert()

// ReSharper disable once MemberCanBePrivate.Global
// Public to enable dynamic code generation
public class BatchAwareEventHandler : TestEventHandler<StubEvent>, IBatchStartAware
public class BatchAwareEventHandler : IEventHandler<StubEvent>, IBatchStartAware
{
public Action<StubEvent> OnEventAction { get; set; }
public Dictionary<long, int> BatchSizeToCount { get; } = new Dictionary<long, int>();

public BatchAwareEventHandler(Action<StubEvent> onEventAction)
: base(onEventAction)
{
OnEventAction = onEventAction;
}

public void OnBatchStart(long batchSize)
{
BatchSizeToCount[batchSize] = BatchSizeToCount.TryGetValue(batchSize, out var count) ? count + 1 : 1;
}

public void OnEvent(StubEvent data, long sequence, bool endOfBatch)
{
OnEventAction.Invoke(data);
}
}

internal class BatchAwareEventHandlerInternal : BatchAwareEventHandler
{
public BatchAwareEventHandlerInternal(Action<StubEvent> onEventAction)
: base(onEventAction)
{
}
}
}
}
27 changes: 26 additions & 1 deletion src/Disruptor.Tests/Internal/StructProxyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,25 @@ public void ShouldNotFailForInternalType()
IFoo fooProxy = null;

Assert.DoesNotThrow(() => fooProxy = StructProxy.CreateProxyInstance<IFoo>(foo));
Assert.DoesNotThrow(() => fooProxy.Value = 1);
Assert.DoesNotThrow(() => fooProxy.Compute(1, 2));

Assert.IsNotNull(fooProxy);
Assert.AreEqual(fooProxy, foo);
Assert.AreSame(fooProxy, foo);
}

[Test]
public void ShouldNotFailForExplicitImplementation()
{
var foo = new ExplicitImplementation();
IFoo fooProxy = null;

Assert.DoesNotThrow(() => fooProxy = StructProxy.CreateProxyInstance<IFoo>(foo));
Assert.DoesNotThrow(() => fooProxy.Value = 1);
Assert.DoesNotThrow(() => fooProxy.Compute(1, 2));

Assert.IsNotNull(fooProxy);
Assert.AreSame(fooProxy, foo);
}

[Test]
Expand Down Expand Up @@ -82,6 +98,15 @@ public void Compute(int a, long b)
}
}

public class ExplicitImplementation : IFoo
{
int IFoo.Value { get; set; }

void IFoo.Compute(int a, long b)
{
}
}

public interface IBar<T>
{
}
Expand Down
29 changes: 28 additions & 1 deletion src/Disruptor/BatchEventProcessorFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,44 @@ public static IBatchEventProcessor<T> Create<T>(IDataProvider<T> dataProvider, I
var dataProviderProxy = StructProxy.CreateProxyInstance(dataProvider);
var sequenceBarrierProxy = StructProxy.CreateProxyInstance(sequenceBarrier);
var eventHandlerProxy = StructProxy.CreateProxyInstance(eventHandler);
var batchStartAwareProxy = eventHandler is IBatchStartAware batchStartAware ? StructProxy.CreateProxyInstance(batchStartAware) : new NoopBatchStartAware();
var batchStartAwareProxy = CreateBatchStartAwareProxy(eventHandler);

var batchEventProcessorType = typeof(BatchEventProcessor<,,,,>).MakeGenericType(typeof(T), dataProviderProxy.GetType(), sequenceBarrierProxy.GetType(), eventHandlerProxy.GetType(), batchStartAwareProxy.GetType());
return (IBatchEventProcessor<T>)Activator.CreateInstance(batchEventProcessorType, dataProviderProxy, sequenceBarrierProxy, eventHandlerProxy, batchStartAwareProxy);
}

private static IBatchStartAware CreateBatchStartAwareProxy(object eventHandler)
{
if (!(eventHandler is IBatchStartAware batchStartAware))
return new NoopBatchStartAware();

var proxy = StructProxy.CreateProxyInstance(batchStartAware);
var proxyGenerationFailed = ReferenceEquals(proxy, batchStartAware);

return proxyGenerationFailed ? new DefaultBatchStartAware(batchStartAware) : proxy;
}

private struct NoopBatchStartAware : IBatchStartAware
{
public void OnBatchStart(long batchSize)
{
}
}

private struct DefaultBatchStartAware : IBatchStartAware
{
private readonly IBatchStartAware _target;

public DefaultBatchStartAware(IBatchStartAware target)
{
_target = target;
}

public void OnBatchStart(long batchSize)
{
if (batchSize != 0)
_target.OnBatchStart(batchSize);
}
}
}
}
14 changes: 11 additions & 3 deletions src/Disruptor/Internal/StructProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ public static TInterface CreateProxyInstance<TInterface>(TInterface target)

private static Type GenerateStructProxyType(Type targetType)
{
if (!targetType.IsVisible)
var interfaceTypes = targetType.GetInterfaces().Where(x => x.IsVisible).ToList();

if (!CanGenerateStructProxy(targetType, interfaceTypes))
return null;

var typeBuilder = _moduleBuilder.DefineType($"StructProxy_{targetType.Name}_{Guid.NewGuid():N}", TypeAttributes.Public, typeof(ValueType));

var field = typeBuilder.DefineField("_target", targetType, FieldAttributes.Private);

GenerateConstructor(targetType, typeBuilder, field);

var interfaceTypes = targetType.GetInterfaces().Where(x => x.IsVisible);

foreach (var interfaceType in interfaceTypes)
{
GenerateInterfaceImplementation(interfaceType, targetType, typeBuilder, field);
Expand All @@ -56,6 +57,13 @@ private static Type GenerateStructProxyType(Type targetType)
return typeBuilder.CreateTypeInfo();
}

private static bool CanGenerateStructProxy(Type targetType, List<Type> interfaceTypes)
{
if (!targetType.IsVisible)
return false;

return interfaceTypes.SelectMany(x => targetType.GetInterfaceMap(x).TargetMethods).All(x => x.IsPublic);
}

private static void GenerateConstructor(Type targetType, TypeBuilder typeBuilder, FieldBuilder field)
{
Expand Down

0 comments on commit 82867fe

Please sign in to comment.