diff --git a/README.md b/README.md index cff5261..e146db2 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ It's heavily inspired by the brilliant - [Getting Started](#getting-started) - [Features](#features) - [Design Choices](#design-choices) +- [Correctness](#correctness) - [Performance](#performance) - [Related Work](#related-work) - [Contributions](#contributions) @@ -188,6 +189,16 @@ Hence, there's synchronization happening in the allocator. There's also no use of dynamic dispatch - everything is monomorphed. +# Correctness + +This library needs to use Unsafe to achieve low latency. +Although the absence of bugs cannot be guaranteed, these approaches have been used to eliminate bugs: + +- Minimal usage of Unsafe blocks. +- High test coverage. +- All tests are run on Miri in CI/CD. +- Verification in TLA+ (see the `verification/` folder). + # Performance The SPSC and MPSC Disruptor variants have been benchmarked and compared to Crossbeam. See the code in the `benches/spsc.rs` and `benches/mpsc.rs` files. diff --git a/verification/Disruptor.tla b/verification/Disruptor.tla new file mode 100644 index 0000000..1d71fe1 --- /dev/null +++ b/verification/Disruptor.tla @@ -0,0 +1,152 @@ +----------------------------- MODULE Disruptor ----------------------------- +(***************************************************************************) +(* Models a Single Producer, Multi Consumer Disruptor (SPMC). *) +(* *) +(* The model verifies that no data races occur between the publisher *) +(* and consumers and that all consumers eventually read all published *) +(* values. *) +(* *) +(* To see a data race, try and run the model with two publishers. *) +(***************************************************************************) + +EXTENDS Naturals, Integers, FiniteSets, Sequences + +CONSTANTS + Writers, (* Writer/publisher thread ids. *) + Readers, (* Reader/consumer thread ids. *) + MaxPublished, (* Max number of published events. *) + Size, (* Ringbuffer size. *) + NULL + +VARIABLES + ringbuffer, + published, (* Publisher Cursor. *) + read, (* Read Cursors. One per consumer. *) + consumed, (* Sequence of all read events by the Readers. *) + pc (* Program Counter of each Writer/Reader. *) + +vars == << + ringbuffer, + published, + read, + consumed, + pc +>> + +(***************************************************************************) +(* Each publisher/consumer can be in one of two states: *) +(* 1. Accessing a slot in the Disruptor or *) +(* 2. Advancing to the next slot. *) +(***************************************************************************) +Access == "Access" +Advance == "Advance" + +Transition(t, from, to) == + /\ pc[t] = from + /\ pc' = [ pc EXCEPT ![t] = to ] + +Buffer == INSTANCE RingBuffer + +TypeInvariant == + /\ Buffer!TypeInvariant + /\ published \in Int + /\ read \in [ Readers -> Int ] + /\ consumed \in [ Readers -> Seq(Nat) ] + /\ pc \in [ Writers \union Readers -> { Access, Advance } ] + +Init == + /\ Buffer!Init + /\ published = -1 + /\ read = [ r \in Readers |-> -1 ] + /\ consumed = [ r \in Readers |-> << >> ] + /\ pc = [ a \in Writers \union Readers |-> Access ] + +Range(f) == + { f[x] : x \in DOMAIN(f) } + +MinReadSequence == + CHOOSE min \in Range(read) : \A r \in Readers : min <= read[r] + +(***************************************************************************) +(* Publisher Actions: *) +(***************************************************************************) + +BeginWrite == + LET + next == published + 1 + index == Buffer!IndexOf(next) + min_read == MinReadSequence + IN + \* Are we clear of all consumers? (Potentially a full cycle behind). + /\ min_read >= next - Size + /\ next < MaxPublished + /\ \E w \in Writers : + /\ Transition(w, Access, Advance) + /\ Buffer!Write(index, w, next) + /\ UNCHANGED << consumed, published, read >> + +EndWrite == + LET + next == published + 1 + index == Buffer!IndexOf(next) + IN + /\ \E w \in Writers : + /\ Transition(w, Advance, Access) + /\ Buffer!EndWrite(index, w) + /\ published' = next + /\ UNCHANGED << consumed, read >> + +(***************************************************************************) +(* Consumer Actions: *) +(***************************************************************************) + +BeginRead == + /\ \E r \in Readers : + LET + next == read[r] + 1 + index == Buffer!IndexOf(next) + IN + /\ published >= next + /\ Transition(r, Access, Advance) + /\ Buffer!BeginRead(index, r) + \* Track what we read from the ringbuffer. + /\ consumed' = [ consumed EXCEPT ![r] = Append(@, Buffer!Read(index)) ] + /\ UNCHANGED << published, read >> + +EndRead == + /\ \E r \in Readers : + LET + next == read[r] + 1 + index == Buffer!IndexOf(next) + IN + /\ Transition(r, Advance, Access) + /\ Buffer!EndRead(index, r) + /\ read' = [ read EXCEPT ![r] = next ] + /\ UNCHANGED << consumed, published >> + +(***************************************************************************) +(* Spec: *) +(***************************************************************************) + +Next == + \/ BeginWrite + \/ EndWrite + \/ BeginRead + \/ EndRead + +Spec == + /\ Init + /\ [][Next]_vars + /\ WF_vars(BeginWrite) + /\ WF_vars(EndWrite) + /\ WF_vars(BeginRead) + /\ WF_vars(EndRead) + +----------------------------------------------------------------------------- + +NoDataRaces == Buffer!NoDataRaces + +Liveliness == + <>[] (\A r \in Readers : consumed[r] = [i \in 1..MaxPublished |-> i - 1]) + +============================================================================= diff --git a/verification/MPMC.tla b/verification/MPMC.tla new file mode 100644 index 0000000..0043dc2 --- /dev/null +++ b/verification/MPMC.tla @@ -0,0 +1,167 @@ +-------------------------------- MODULE MPMC -------------------------------- +(***************************************************************************) +(* Models a Multi Producer, Multi Consumer Disruptor (MPMC). *) +(* The model verifies that no data races occur between the publishers *) +(* and consumers and that all consumers eventually read all published *) +(* values. *) +(***************************************************************************) + +EXTENDS Naturals, Integers, FiniteSets, Sequences + +CONSTANTS + Writers, (* Writer/publisher thread ids. *) + Readers, (* Reader/consumer thread ids. *) + MaxPublished, (* Max number of published events. *) + Size, (* Ringbuffer size. *) + NULL + +VARIABLES + ringbuffer, + next_sequence, (* Shared counter for claiming a sequence for a Writer. *) + claimed_sequence, (* Claimed sequence by each Writer. *) + published, (* Encodes whether each slot is published. *) + read, (* Read Cursors. One per Reader. *) + consumed, (* Sequence of all read events by the Reader. *) + pc (* Program Counter of each Writer/Reader. *) + +vars == << + ringbuffer, + next_sequence, + claimed_sequence, + published, + read, + consumed, + pc +>> + +(***************************************************************************) +(* Each publisher/consumer can be in one of two states: *) +(* 1. Accessing a slot in the Disruptor or *) +(* 2. Advancing to the next slot. *) +(***************************************************************************) +Access == "Access" +Advance == "Advance" + +Transition(t, from, to) == + /\ pc[t] = from + /\ pc' = [ pc EXCEPT ![t] = to ] + +Buffer == INSTANCE RingBuffer + +TypeInvariant == + /\ Buffer!TypeInvariant + /\ next_sequence \in Nat + /\ claimed_sequence \in [ Writers -> Int ] + /\ published \in [ 0..Size -> Int ] + /\ read \in [ Readers -> Int ] + /\ consumed \in [ Readers -> Seq(Nat) ] + /\ pc \in [ Writers \union Readers -> { Access, Advance } ] + +Init == + /\ Buffer!Init + /\ next_sequence = 0 + /\ claimed_sequence = [ w \in Writers |-> -1 ] + /\ published = [ i \in 0..Size |-> -1 ] + /\ read = [ r \in Readers |-> -1 ] + /\ consumed = [ r \in Readers |-> << >> ] + /\ pc = [ a \in Writers \union Readers |-> Access ] + +Range(f) == + { f[x] : x \in DOMAIN(f) } + +MinReadSequence == + CHOOSE min \in Range(read) : \A r \in Readers : min <= read[r] + +IsPublished(sequence) == + LET index == Buffer!IndexOf(sequence) + IN published[index] = sequence + +Publish(sequence) == + LET index == Buffer!IndexOf(sequence) + IN published' = [ published EXCEPT ![index] = sequence ] + +(***************************************************************************) +(* Publisher Actions: *) +(***************************************************************************) + +BeginWrite == + LET + seq == next_sequence + index == Buffer!IndexOf(seq) + min_read == MinReadSequence + IN + \* Are we clear of all consumers? (Potentially a full cycle behind). + /\ min_read >= seq - Size + /\ seq < MaxPublished + /\ \E w \in Writers : + /\ claimed_sequence' = [ claimed_sequence EXCEPT ![w] = seq ] + /\ next_sequence' = seq + 1 + /\ Transition(w, Access, Advance) + /\ Buffer!Write(index, w, seq) + /\ UNCHANGED << consumed, published, read >> + +EndWrite == + /\ \E w \in Writers : + LET + seq == claimed_sequence[w] + index == Buffer!IndexOf(seq) + IN + /\ Transition(w, Advance, Access) + /\ Buffer!EndWrite(index, w) + /\ Publish(seq) + /\ UNCHANGED << claimed_sequence, next_sequence, consumed, read >> + +(***************************************************************************) +(* Consumer Actions: *) +(***************************************************************************) + +BeginRead == + /\ \E r \in Readers : + LET + next == read[r] + 1 + index == Buffer!IndexOf(next) + IN + /\ IsPublished(next) + /\ Transition(r, Access, Advance) + /\ Buffer!BeginRead(index, r) + \* Track what we read from the ringbuffer. + /\ consumed' = [ consumed EXCEPT ![r] = Append(@, Buffer!Read(index)) ] + /\ UNCHANGED << claimed_sequence, next_sequence, published, read >> + +EndRead == + /\ \E r \in Readers : + LET + next == read[r] + 1 + index == Buffer!IndexOf(next) + IN + /\ Transition(r, Advance, Access) + /\ Buffer!EndRead(index, r) + /\ read' = [ read EXCEPT ![r] = next ] + /\ UNCHANGED << claimed_sequence, next_sequence, consumed, published >> + +(***************************************************************************) +(* Spec: *) +(***************************************************************************) + +Next == + \/ BeginWrite + \/ EndWrite + \/ BeginRead + \/ EndRead + +Spec == + /\ Init + /\ [][Next]_vars + /\ WF_vars(BeginWrite) + /\ WF_vars(EndWrite) + /\ WF_vars(BeginRead) + /\ WF_vars(EndRead) + +----------------------------------------------------------------------------- + +NoDataRaces == Buffer!NoDataRaces + +Liveliness == + <>[] (\A r \in Readers : consumed[r] = [i \in 1..MaxPublished |-> i - 1]) + +============================================================================= diff --git a/verification/README.md b/verification/README.md new file mode 100644 index 0000000..c912ee3 --- /dev/null +++ b/verification/README.md @@ -0,0 +1,21 @@ +# Verification in TLA+ + +This folder contains two models for verifying the SPMC and MPMC scenarios. + +To run the verifications in less than a minute, these model configurations can be used: + +**SPMC:** + +- MaxPublished <- 10 +- Size <- 8 +- Writers <- { "w" } +- Readers <- { "r1", "r2" } +- NULL <- [ model value ] + +**MPMC:** + +- MaxPublished <- 10 +- Size <- 8 +- Writers <- { "w1", "w2" } +- Readers <- { "r1", "r2" } +- NULL <- [ model value ] diff --git a/verification/RingBuffer.tla b/verification/RingBuffer.tla new file mode 100644 index 0000000..1d1bc7e --- /dev/null +++ b/verification/RingBuffer.tla @@ -0,0 +1,78 @@ +----------------------------- MODULE RingBuffer ----------------------------- +(***************************************************************************) +(* Models a Ring Buffer where each slot can contain an integer. *) +(* Initially all slots contains NULL. *) +(* *) +(* Read and write accesses to each slot are tracked to detect data races. *) +(* This entails that each write and read of a slot has multiple steps. *) +(* *) +(* All models using the RingBuffer should assert the NoDataRaces invariant *) +(* that checks against data races between multiple producer threads and *) +(* consumer threads. *) +(***************************************************************************) + +LOCAL INSTANCE Naturals +LOCAL INSTANCE Integers +LOCAL INSTANCE FiniteSets + +CONSTANTS + Size, + Readers, + Writers, + NULL + +VARIABLE ringbuffer + +LastIndex == Size - 1 + +TypeInvariant == + ringbuffer \in [ + slots: UNION { [0 .. LastIndex -> Int \union { NULL }] }, + readers: UNION { [0 .. LastIndex -> SUBSET(Readers) ] }, + writers: UNION { [0 .. LastIndex -> SUBSET(Writers) ] } + ] + +Init == + ringbuffer = [ + slots |-> [i \in 0 .. LastIndex |-> NULL ], + readers |-> [i \in 0 .. LastIndex |-> {} ], + writers |-> [i \in 0 .. LastIndex |-> {} ] + ] + +(* The index into the Ring Buffer for a sequence number.*) +IndexOf(sequence) == + sequence % Size + +(***************************************************************************) +(* Write operations. *) +(***************************************************************************) +Write(index, writer, value) == + ringbuffer' = [ + ringbuffer EXCEPT + !.writers[index] = @ \union { writer }, + !.slots[index] = value + ] + +EndWrite(index, writer) == + ringbuffer' = [ ringbuffer EXCEPT !.writers[index] = @ \ { writer } ] + +(***************************************************************************) +(* Read operations. *) +(***************************************************************************) +BeginRead(index, reader) == + ringbuffer' = [ ringbuffer EXCEPT !.readers[index] = @ \union { reader } ] + +Read(index) == + ringbuffer.slots[index] + +EndRead(index, reader) == + ringbuffer' = [ ringbuffer EXCEPT !.readers[index] = @ \ { reader } ] + +----------------------------------------------------------------------------- + +NoDataRaces == + \A i \in 0 .. LastIndex : + /\ ringbuffer.readers[i] = {} \/ ringbuffer.writers[i] = {} + /\ Cardinality(ringbuffer.writers[i]) <= 1 + +=============================================================================