Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Remove allocations from AsyncWaitStrategy
Browse files Browse the repository at this point in the history
ocoanet committed Feb 14, 2024
1 parent d610e62 commit abf370d
Showing 11 changed files with 207 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -13,14 +13,14 @@ public class PingPongAsyncWaitStrategyBenchmarks : IDisposable
private readonly AsyncWaitStrategy _pongWaitStrategy = new();
private readonly Sequence _pingCursor = new();
private readonly Sequence _pongCursor = new();
private readonly AsyncWaitState _pingAsyncWaitState;
private readonly AsyncWaitState _pongAsyncWaitState;
private readonly Task _pongTask;
private readonly DependentSequenceGroup _pingDependentSequences;
private readonly DependentSequenceGroup _pongDependentSequences;

public PingPongAsyncWaitStrategyBenchmarks()
{
_pingDependentSequences = new DependentSequenceGroup(_pingCursor);
_pongDependentSequences = new DependentSequenceGroup(_pongCursor);
_pingAsyncWaitState = new AsyncWaitState(new DependentSequenceGroup(_pingCursor), _cancellationTokenSource.Token);
_pongAsyncWaitState = new AsyncWaitState(new DependentSequenceGroup(_pongCursor), _cancellationTokenSource.Token);
_pongTask = Task.Run(RunPong);
}

@@ -40,8 +40,10 @@ private async Task RunPong()
{
sequence++;

await _pingWaitStrategy.WaitForAsync(sequence, _pingDependentSequences, _cancellationTokenSource.Token).ConfigureAwait(false);
// Wait for ping
await _pingWaitStrategy.WaitForAsync(sequence, _pingAsyncWaitState).ConfigureAwait(false);

// Publish pong
_pongCursor.SetValue(sequence);
_pongWaitStrategy.SignalAllWhenBlocking();
}
@@ -62,10 +64,12 @@ public async Task Run()

for (var s = start; s < end; s++)
{
// Publish ping
_pingCursor.SetValue(s);
_pingWaitStrategy.SignalAllWhenBlocking();

await _pongWaitStrategy.WaitForAsync(s, _pongDependentSequences, _cancellationTokenSource.Token).ConfigureAwait(false);
// Wait for pong
await _pongWaitStrategy.WaitForAsync(s, _pongAsyncWaitState).ConfigureAwait(false);
}
}
}
16 changes: 8 additions & 8 deletions src/Disruptor.Tests/AsyncWaitStrategyTests.cs
Original file line number Diff line number Diff line change
@@ -19,12 +19,12 @@ public void ShouldWaitFromMultipleThreadsAsync()

var waitTask1 = Task.Run(async () =>
{
waitResult1.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor), CancellationToken));
waitResult1.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor), CancellationToken)));
Thread.Sleep(1);
sequence1.SetValue(10);
});

var waitTask2 = Task.Run(async () => waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, sequence1), CancellationToken)));
var waitTask2 = Task.Run(async () => waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, sequence1), CancellationToken))));

// Ensure waiting tasks are blocked
AssertIsNotCompleted(waitResult1.Task);
@@ -62,12 +62,12 @@ public void ShouldWaitFromMultipleThreadsSyncAndAsync()

var waitTask2 = Task.Run(async () =>
{
waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, sequence1), CancellationToken));
waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, sequence1), CancellationToken)));
Thread.Sleep(1);
sequence2.SetValue(10);
});

var waitTask3 = Task.Run(async () => waitResult3.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, sequence2), CancellationToken)));
var waitTask3 = Task.Run(async () => waitResult3.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, sequence2), CancellationToken))));

// Ensure waiting tasks are blocked
AssertIsNotCompleted(waitResult1.Task);
@@ -103,7 +103,7 @@ public void ShouldWaitAfterCancellationAsync()
{
try
{
await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, dependentSequence), CancellationToken);
await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, dependentSequence), CancellationToken));
}
catch (Exception e)
{
@@ -129,7 +129,7 @@ public void ShouldUnblockAfterCancellationAsync()
{
try
{
await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, dependentSequence), CancellationToken);
await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, dependentSequence), CancellationToken));
}
catch (Exception e)
{
@@ -165,7 +165,7 @@ public void ShouldWaitMultipleTimesAsync()

for (var i = 0; i < 500; i++)
{
await waitStrategy.WaitForAsync(i, dependentSequences, cancellationTokenSource.Token).ConfigureAwait(false);
await waitStrategy.WaitForAsync(i, new AsyncWaitState(dependentSequences, cancellationTokenSource.Token)).ConfigureAwait(false);
sequence1.SetValue(i);
}
});
@@ -177,7 +177,7 @@ public void ShouldWaitMultipleTimesAsync()

for (var i = 0; i < 500; i++)
{
await waitStrategy.WaitForAsync(i, dependentSequences, cancellationTokenSource.Token).ConfigureAwait(false);
await waitStrategy.WaitForAsync(i, new AsyncWaitState(dependentSequences, cancellationTokenSource.Token)).ConfigureAwait(false);
}
});

4 changes: 2 additions & 2 deletions src/Disruptor.Tests/AsyncWaitStrategyTestsWithTimeout.cs
Original file line number Diff line number Diff line change
@@ -66,12 +66,12 @@ public void ShouldWaitFromMultipleThreadsWithTimeoutsAsync()

var waitTask1 = Task.Run(async () =>
{
waitResult1.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor), CancellationToken));
waitResult1.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor), CancellationToken)));
Thread.Sleep(1);
sequence1.SetValue(10);
});

var waitTask2 = Task.Run(async () => waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new DependentSequenceGroup(Cursor, sequence1), CancellationToken)));
var waitTask2 = Task.Run(async () => waitResult2.SetResult(await waitStrategy.WaitForAsync(10, new AsyncWaitState(new DependentSequenceGroup(Cursor, sequence1), CancellationToken))));

// Ensure waiting tasks are blocked
AssertIsNotCompleted(waitResult1.Task);
53 changes: 18 additions & 35 deletions src/Disruptor.Tests/DisruptorStressTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor.Dsl;
@@ -27,7 +28,7 @@ public void ShouldHandleLotsOfThreads_AsyncBatchEventHandler()
ShouldHandleLotsOfThreads<TestAsyncBatchEventHandler>(new AsyncWaitStrategy(), 2_000_000);
}

private static void ShouldHandleLotsOfThreads<T>(IWaitStrategy waitStrategy, int iterations) where T : IHandler, new()
private static void ShouldHandleLotsOfThreads<T>(IWaitStrategy waitStrategy, int iterations) where T : ITestHandler, new()
{
var disruptor = new Disruptor<TestEvent>(TestEvent.Factory, 65_536, TaskScheduler.Current, ProducerType.Multi, waitStrategy);
var ringBuffer = disruptor.RingBuffer;
@@ -36,7 +37,6 @@ public void ShouldHandleLotsOfThreads_AsyncBatchEventHandler()
var publisherCount = Math.Clamp(Environment.ProcessorCount / 2, 1, 8);
var handlerCount = Math.Clamp(Environment.ProcessorCount / 2, 1, 8);

var end = new CountdownEvent(publisherCount);
var start = new CountdownEvent(publisherCount);

var handlers = new T[handlerCount];
@@ -50,26 +50,15 @@ public void ShouldHandleLotsOfThreads_AsyncBatchEventHandler()
var publishers = new Publisher[publisherCount];
for (var i = 0; i < publishers.Length; i++)
{
publishers[i] = new Publisher(ringBuffer, iterations, start, end);
publishers[i] = new Publisher(ringBuffer, iterations, start);
}

disruptor.Start();

foreach (var publisher in publishers)
{
Task.Run(publisher.Run);
}

end.Wait();

var spinWait = new SpinWait();
var publisherTasks = publishers.Select(x => Task.Run(x.Run)).ToArray();
Task.WaitAll(publisherTasks);

while (ringBuffer.Cursor < (iterations - 1))
{
spinWait.SpinOnce();
}

disruptor.Shutdown();
disruptor.Shutdown(TimeSpan.FromSeconds(10));

foreach (var publisher in publishers)
{
@@ -78,20 +67,20 @@ public void ShouldHandleLotsOfThreads_AsyncBatchEventHandler()

foreach (var handler in handlers)
{
Assert.That(handler.MessagesSeen, Is.Not.EqualTo(0));
Assert.That(handler.MessagesSeen, Is.EqualTo(iterations * publishers.Length));
Assert.That(handler.FailureCount, Is.EqualTo(0));
}
}

private interface IHandler
private interface ITestHandler
{
int FailureCount { get; }
int MessagesSeen { get; }

void Register(Disruptor<TestEvent> disruptor);
}

private class TestEventHandler : IEventHandler<TestEvent>, IHandler
private class TestEventHandler : IEventHandler<TestEvent>, ITestHandler
{
public int FailureCount { get; private set; }
public int MessagesSeen { get; private set; }
@@ -112,7 +101,7 @@ public void OnEvent(TestEvent @event, long sequence, bool endOfBatch)
}
}

private class TestBatchEventHandler : IBatchEventHandler<TestEvent>, IHandler
private class TestBatchEventHandler : IBatchEventHandler<TestEvent>, ITestHandler
{
public int FailureCount { get; private set; }
public int MessagesSeen { get; private set; }
@@ -139,7 +128,7 @@ public void OnBatch(EventBatch<TestEvent> batch, long sequence)
}
}

private class TestAsyncBatchEventHandler : IAsyncBatchEventHandler<TestEvent>, IHandler
private class TestAsyncBatchEventHandler : IAsyncBatchEventHandler<TestEvent>, ITestHandler
{
public int FailureCount { get; private set; }
public int MessagesSeen { get; private set; }
@@ -171,16 +160,14 @@ public async ValueTask OnBatch(EventBatch<TestEvent> batch, long sequence)
private class Publisher
{
private readonly RingBuffer<TestEvent> _ringBuffer;
private readonly CountdownEvent _end;
private readonly CountdownEvent _start;
private readonly int _iterations;

public bool Failed;

public Publisher(RingBuffer<TestEvent> ringBuffer, int iterations, CountdownEvent start, CountdownEvent end)
public Publisher(RingBuffer<TestEvent> ringBuffer, int iterations, CountdownEvent start)
{
_ringBuffer = ringBuffer;
_end = end;
_start = start;
_iterations = iterations;
}
@@ -195,22 +182,18 @@ public void Run()
var i = _iterations;
while (--i != -1)
{
var next = _ringBuffer.Next();
var testEvent = _ringBuffer[next];
testEvent.Sequence = next;
testEvent.A = next + 13;
testEvent.B = next - 7;
_ringBuffer.Publish(next);
var sequence = _ringBuffer.Next();
var testEvent = _ringBuffer[sequence];
testEvent.Sequence = sequence;
testEvent.A = sequence + 13;
testEvent.B = sequence - 7;
_ringBuffer.Publish(sequence);
}
}
catch (Exception)
{
Failed = true;
}
finally
{
_end.Signal();
}
}
}

4 changes: 2 additions & 2 deletions src/Disruptor.Tests/Dsl/DisruptorTests.cs
Original file line number Diff line number Diff line change
@@ -104,8 +104,8 @@ public void ShouldPublishAndHandleEvent_AsyncBatchEventHandler()
var eventCounter = new CountdownEvent(2);
var values = new List<int>();

_disruptor.HandleEventsWith(new TestBatchEventHandler<TestEvent>(e => values.Add(e.Value)))
.Then(new TestBatchEventHandler<TestEvent>(e => eventCounter.Signal()));
_disruptor.HandleEventsWith(new TestAsyncBatchEventHandler<TestEvent>(e => values.Add(e.Value)))
.Then(new TestAsyncBatchEventHandler<TestEvent>(e => eventCounter.Signal()));

_disruptor.Start();

12 changes: 5 additions & 7 deletions src/Disruptor/AsyncEventStream.cs
Original file line number Diff line number Diff line change
@@ -119,12 +119,14 @@ private class Enumerator : IAsyncEnumerator<EventBatch<T>>
private readonly Sequence _sequence;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
private readonly CancellationTokenSource _linkedTokenSource;
private readonly AsyncWaitState _asyncWaitState;

public Enumerator(AsyncEventStream<T> asyncEventStream, Sequence sequence, CancellationToken streamCancellationToken, CancellationToken enumeratorCancellationToken)
{
_asyncEventStream = asyncEventStream;
_sequence = sequence;
_linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(streamCancellationToken, enumeratorCancellationToken);
_asyncWaitState = new AsyncWaitState(asyncEventStream._dependentSequences, _linkedTokenSource.Token, asyncEventStream._sequencer);

_cancellationTokenRegistration = _linkedTokenSource.Token.Register(x => ((IAsyncWaitStrategy)x!).SignalAllWhenBlocking(), asyncEventStream._waitStrategy);
}
@@ -151,16 +153,12 @@ public async ValueTask<bool> MoveNextAsync()

_linkedTokenSource.Token.ThrowIfCancellationRequested();

var waitResult = await _asyncEventStream._waitStrategy.WaitForAsync(nextSequence, _asyncEventStream._dependentSequences, _linkedTokenSource.Token).ConfigureAwait(false);
var waitResult = await _asyncEventStream._waitStrategy.WaitForAsync(nextSequence, _asyncWaitState).ConfigureAwait(false);
if (waitResult.UnsafeAvailableSequence < nextSequence)
continue;

var availableSequence = _asyncEventStream._sequencer.GetHighestPublishedSequence(nextSequence, waitResult.UnsafeAvailableSequence);
if (availableSequence >= nextSequence)
{
Current = _asyncEventStream._dataProvider.GetBatch(nextSequence, availableSequence);
return true;
}
Current = _asyncEventStream._dataProvider.GetBatch(nextSequence, waitResult.UnsafeAvailableSequence);
return true;
}
}
}
20 changes: 5 additions & 15 deletions src/Disruptor/AsyncSequenceBarrier.cs
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ public sealed class AsyncSequenceBarrier
private readonly IAsyncWaitStrategy _waitStrategy;
private readonly DependentSequenceGroup _dependentSequences;
private CancellationTokenSource _cancellationTokenSource;
private AsyncWaitState _asyncWaitState;

public AsyncSequenceBarrier(ISequencer sequencer, IWaitStrategy waitStrategy, DependentSequenceGroup dependentSequences)
{
@@ -23,6 +24,7 @@ public AsyncSequenceBarrier(ISequencer sequencer, IWaitStrategy waitStrategy, De
_waitStrategy = asyncWaitStrategy;
_dependentSequences = dependentSequences;
_cancellationTokenSource = new CancellationTokenSource();
_asyncWaitState = new AsyncWaitState(dependentSequences, _cancellationTokenSource.Token, _sequencer);
}

public DependentSequenceGroup DependentSequences => _dependentSequences;
@@ -65,26 +67,13 @@ public ValueTask<SequenceWaitResult> WaitForAsync<TSequenceBarrierOptions>(long
return new ValueTask<SequenceWaitResult>(_sequencer.GetHighestPublishedSequence(sequence, availableSequence));
}

if (typeof(TSequenceBarrierOptions) == typeof(ISequenceBarrierOptions.IsDependentSequencePublished))
{
return InvokeWaitStrategy(sequence);
}

return InvokeWaitStrategyAndWaitForPublishedSequence(sequence);
return InvokeWaitStrategy(sequence);
}

[MethodImpl(MethodImplOptions.NoInlining)]
private ValueTask<SequenceWaitResult> InvokeWaitStrategy(long sequence)
{
return _waitStrategy.WaitForAsync(sequence, _dependentSequences, _cancellationTokenSource.Token);
}

[MethodImpl(MethodImplOptions.NoInlining)]
private async ValueTask<SequenceWaitResult> InvokeWaitStrategyAndWaitForPublishedSequence(long sequence)
{
var waitResult = await _waitStrategy.WaitForAsync(sequence, _dependentSequences, _cancellationTokenSource.Token).ConfigureAwait(false);

return waitResult.UnsafeAvailableSequence >= sequence ? _sequencer.GetHighestPublishedSequence(sequence, waitResult.UnsafeAvailableSequence) : waitResult;
return _waitStrategy.WaitForAsync(sequence, _asyncWaitState);
}

public void ResetProcessing()
@@ -93,6 +82,7 @@ public void ResetProcessing()
// has no finalizer and no unmanaged resources to release.

_cancellationTokenSource = new CancellationTokenSource();
_asyncWaitState = new AsyncWaitState(_dependentSequences, _cancellationTokenSource.Token, _sequencer);
}

public void CancelProcessing()
Loading

0 comments on commit abf370d

Please sign in to comment.