-
Notifications
You must be signed in to change notification settings - Fork 227
Getting Started
The .NET Disruptor is available as a NuGet package.
PM> Install-Package Disruptor
To get started with the Disruptor we are going to consider very simple and contrived example, one that will pass a single long value from a producer to a consumer, where the consumer will simply print out the value. Firstly we will define the Event that will carry the data.
public class LongEvent
{
public long Value { get; set; }
}
Once we have the event defined we need to create a consumer that will handle these events. In our case all we want to do is print the value out the the console.
public class LongEventHandler : IEventHandler<LongEvent>
{
public void OnEvent(LongEvent data, long sequence, bool endOfBatch)
{
Console.WriteLine($"Event: {data.Value}");
}
}
We will need a source for these events, for the sake of an example I am going to assume that the data is coming from some sort of I/O device, e.g. network or file.
public class LongEventProducer
{
private readonly RingBuffer<LongEvent> _ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
_ringBuffer = ringBuffer;
}
public void OnData(ReadOnlyMemory<byte> input)
{
// Grab the next sequence
var sequence = _ringBuffer.Next();
try
{
// Get the entry in the Disruptor for the sequence
var data = _ringBuffer[sequence];
// Fill with data
data.Value = MemoryMarshal.Read<long>(input.Span);
}
finally
{
_ringBuffer.Publish(sequence);
}
}
}
What becomes immediately obvious is that event publication becomes more involved than using a simple queue. This is due to the desire for event preallocation. It requires (at the lowest level) a 2-phase approach to message publication, i.e. claim the slot in the ring buffer then publish the available data. It is also necessary to wrap publication in a try/finally block. If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence. Failing to do so can result in corruption of the state of the Disruptor. Specifically, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart.
The final step is to wire the whole thing together. It is possible to wire all of the components manually, however it can be a little bit complicated so a DSL is provided to simplify construction. Some of the more complicated options are not available via the DSL, however it is suitable for most circumstances.
public static class Program
{
public static void Main()
{
// Specify the size of the ring buffer, must be power of 2.
const int bufferSize = 1024;
// Construct the Disruptor
var disruptor = new Dsl.Disruptor<LongEvent>(() => new LongEvent(), bufferSize);
// Connect the handler
disruptor.HandleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.Start();
// Get the ring buffer from the Disruptor to be used for publishing.
var ringBuffer = disruptor.RingBuffer;
var producer = new LongEventProducer(ringBuffer);
var memory = new Memory<byte>(new byte[8]);
for (var l = 0L; ; l++)
{
MemoryMarshal.Write(memory.Span, ref l);
producer.OnData(memory);
Thread.Sleep(1000);
}
}