Skip to content

Commit

Permalink
Add section on correctness and TLA+ models.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholassm committed Aug 1, 2024
1 parent 9882c73 commit 94a963f
Show file tree
Hide file tree
Showing 5 changed files with 429 additions and 0 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ It's heavily inspired by the brilliant
- [Getting Started](#getting-started)
- [Features](#features)
- [Design Choices](#design-choices)
- [Correctness](#correctness)
- [Performance](#performance)
- [Related Work](#related-work)
- [Contributions](#contributions)
Expand Down Expand Up @@ -188,6 +189,16 @@ Hence, there's synchronization happening in the allocator.

There's also no use of dynamic dispatch - everything is monomorphed.

# Correctness

This library needs to use Unsafe to achieve low latency.
Although the absence of bugs cannot be guaranteed, these approaches have been used to eliminate bugs:

- Minimal usage of Unsafe blocks.
- High test coverage.
- All tests are run on Miri in CI/CD.
- Verification in TLA+ (see the `verification/` folder).

# Performance

The SPSC and MPSC Disruptor variants have been benchmarked and compared to Crossbeam. See the code in the `benches/spsc.rs` and `benches/mpsc.rs` files.
Expand Down
152 changes: 152 additions & 0 deletions verification/Disruptor.tla
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
----------------------------- MODULE Disruptor -----------------------------
(***************************************************************************)
(* Models a Single Producer, Multi Consumer Disruptor (SPMC). *)
(* *)
(* The model verifies that no data races occur between the publisher *)
(* and consumers and that all consumers eventually read all published *)
(* values. *)
(* *)
(* To see a data race, try and run the model with two publishers. *)
(***************************************************************************)

EXTENDS Naturals, Integers, FiniteSets, Sequences

CONSTANTS
Writers, (* Writer/publisher thread ids. *)
Readers, (* Reader/consumer thread ids. *)
MaxPublished, (* Max number of published events. *)
Size, (* Ringbuffer size. *)
NULL

VARIABLES
ringbuffer,
published, (* Publisher Cursor. *)
read, (* Read Cursors. One per consumer. *)
consumed, (* Sequence of all read events by the Readers. *)
pc (* Program Counter of each Writer/Reader. *)

vars == <<
ringbuffer,
published,
read,
consumed,
pc
>>

(***************************************************************************)
(* Each publisher/consumer can be in one of two states: *)
(* 1. Accessing a slot in the Disruptor or *)
(* 2. Advancing to the next slot. *)
(***************************************************************************)
Access == "Access"
Advance == "Advance"

Transition(t, from, to) ==
/\ pc[t] = from
/\ pc' = [ pc EXCEPT ![t] = to ]

Buffer == INSTANCE RingBuffer

TypeInvariant ==
/\ Buffer!TypeInvariant
/\ published \in Int
/\ read \in [ Readers -> Int ]
/\ consumed \in [ Readers -> Seq(Nat) ]
/\ pc \in [ Writers \union Readers -> { Access, Advance } ]

Init ==
/\ Buffer!Init
/\ published = -1
/\ read = [ r \in Readers |-> -1 ]
/\ consumed = [ r \in Readers |-> << >> ]
/\ pc = [ a \in Writers \union Readers |-> Access ]

Range(f) ==
{ f[x] : x \in DOMAIN(f) }

MinReadSequence ==
CHOOSE min \in Range(read) : \A r \in Readers : min <= read[r]

(***************************************************************************)
(* Publisher Actions: *)
(***************************************************************************)

BeginWrite ==
LET
next == published + 1
index == Buffer!IndexOf(next)
min_read == MinReadSequence
IN
\* Are we clear of all consumers? (Potentially a full cycle behind).
/\ min_read >= next - Size
/\ next < MaxPublished
/\ \E w \in Writers :
/\ Transition(w, Access, Advance)
/\ Buffer!Write(index, w, next)
/\ UNCHANGED << consumed, published, read >>

EndWrite ==
LET
next == published + 1
index == Buffer!IndexOf(next)
IN
/\ \E w \in Writers :
/\ Transition(w, Advance, Access)
/\ Buffer!EndWrite(index, w)
/\ published' = next
/\ UNCHANGED << consumed, read >>

(***************************************************************************)
(* Consumer Actions: *)
(***************************************************************************)

BeginRead ==
/\ \E r \in Readers :
LET
next == read[r] + 1
index == Buffer!IndexOf(next)
IN
/\ published >= next
/\ Transition(r, Access, Advance)
/\ Buffer!BeginRead(index, r)
\* Track what we read from the ringbuffer.
/\ consumed' = [ consumed EXCEPT ![r] = Append(@, Buffer!Read(index)) ]
/\ UNCHANGED << published, read >>

EndRead ==
/\ \E r \in Readers :
LET
next == read[r] + 1
index == Buffer!IndexOf(next)
IN
/\ Transition(r, Advance, Access)
/\ Buffer!EndRead(index, r)
/\ read' = [ read EXCEPT ![r] = next ]
/\ UNCHANGED << consumed, published >>

(***************************************************************************)
(* Spec: *)
(***************************************************************************)

Next ==
\/ BeginWrite
\/ EndWrite
\/ BeginRead
\/ EndRead

Spec ==
/\ Init
/\ [][Next]_vars
/\ WF_vars(BeginWrite)
/\ WF_vars(EndWrite)
/\ WF_vars(BeginRead)
/\ WF_vars(EndRead)

-----------------------------------------------------------------------------

NoDataRaces == Buffer!NoDataRaces

Liveliness ==
<>[] (\A r \in Readers : consumed[r] = [i \in 1..MaxPublished |-> i - 1])

=============================================================================
167 changes: 167 additions & 0 deletions verification/MPMC.tla
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
-------------------------------- MODULE MPMC --------------------------------
(***************************************************************************)
(* Models a Multi Producer, Multi Consumer Disruptor (MPMC). *)
(* The model verifies that no data races occur between the publishers *)
(* and consumers and that all consumers eventually read all published *)
(* values. *)
(***************************************************************************)

EXTENDS Naturals, Integers, FiniteSets, Sequences

CONSTANTS
Writers, (* Writer/publisher thread ids. *)
Readers, (* Reader/consumer thread ids. *)
MaxPublished, (* Max number of published events. *)
Size, (* Ringbuffer size. *)
NULL

VARIABLES
ringbuffer,
next_sequence, (* Shared counter for claiming a sequence for a Writer. *)
claimed_sequence, (* Claimed sequence by each Writer. *)
published, (* Encodes whether each slot is published. *)
read, (* Read Cursors. One per Reader. *)
consumed, (* Sequence of all read events by the Reader. *)
pc (* Program Counter of each Writer/Reader. *)

vars == <<
ringbuffer,
next_sequence,
claimed_sequence,
published,
read,
consumed,
pc
>>

(***************************************************************************)
(* Each publisher/consumer can be in one of two states: *)
(* 1. Accessing a slot in the Disruptor or *)
(* 2. Advancing to the next slot. *)
(***************************************************************************)
Access == "Access"
Advance == "Advance"

Transition(t, from, to) ==
/\ pc[t] = from
/\ pc' = [ pc EXCEPT ![t] = to ]

Buffer == INSTANCE RingBuffer

TypeInvariant ==
/\ Buffer!TypeInvariant
/\ next_sequence \in Nat
/\ claimed_sequence \in [ Writers -> Int ]
/\ published \in [ 0..Size -> Int ]
/\ read \in [ Readers -> Int ]
/\ consumed \in [ Readers -> Seq(Nat) ]
/\ pc \in [ Writers \union Readers -> { Access, Advance } ]

Init ==
/\ Buffer!Init
/\ next_sequence = 0
/\ claimed_sequence = [ w \in Writers |-> -1 ]
/\ published = [ i \in 0..Size |-> -1 ]
/\ read = [ r \in Readers |-> -1 ]
/\ consumed = [ r \in Readers |-> << >> ]
/\ pc = [ a \in Writers \union Readers |-> Access ]

Range(f) ==
{ f[x] : x \in DOMAIN(f) }

MinReadSequence ==
CHOOSE min \in Range(read) : \A r \in Readers : min <= read[r]

IsPublished(sequence) ==
LET index == Buffer!IndexOf(sequence)
IN published[index] = sequence

Publish(sequence) ==
LET index == Buffer!IndexOf(sequence)
IN published' = [ published EXCEPT ![index] = sequence ]

(***************************************************************************)
(* Publisher Actions: *)
(***************************************************************************)

BeginWrite ==
LET
seq == next_sequence
index == Buffer!IndexOf(seq)
min_read == MinReadSequence
IN
\* Are we clear of all consumers? (Potentially a full cycle behind).
/\ min_read >= seq - Size
/\ seq < MaxPublished
/\ \E w \in Writers :
/\ claimed_sequence' = [ claimed_sequence EXCEPT ![w] = seq ]
/\ next_sequence' = seq + 1
/\ Transition(w, Access, Advance)
/\ Buffer!Write(index, w, seq)
/\ UNCHANGED << consumed, published, read >>

EndWrite ==
/\ \E w \in Writers :
LET
seq == claimed_sequence[w]
index == Buffer!IndexOf(seq)
IN
/\ Transition(w, Advance, Access)
/\ Buffer!EndWrite(index, w)
/\ Publish(seq)
/\ UNCHANGED << claimed_sequence, next_sequence, consumed, read >>

(***************************************************************************)
(* Consumer Actions: *)
(***************************************************************************)

BeginRead ==
/\ \E r \in Readers :
LET
next == read[r] + 1
index == Buffer!IndexOf(next)
IN
/\ IsPublished(next)
/\ Transition(r, Access, Advance)
/\ Buffer!BeginRead(index, r)
\* Track what we read from the ringbuffer.
/\ consumed' = [ consumed EXCEPT ![r] = Append(@, Buffer!Read(index)) ]
/\ UNCHANGED << claimed_sequence, next_sequence, published, read >>

EndRead ==
/\ \E r \in Readers :
LET
next == read[r] + 1
index == Buffer!IndexOf(next)
IN
/\ Transition(r, Advance, Access)
/\ Buffer!EndRead(index, r)
/\ read' = [ read EXCEPT ![r] = next ]
/\ UNCHANGED << claimed_sequence, next_sequence, consumed, published >>

(***************************************************************************)
(* Spec: *)
(***************************************************************************)

Next ==
\/ BeginWrite
\/ EndWrite
\/ BeginRead
\/ EndRead

Spec ==
/\ Init
/\ [][Next]_vars
/\ WF_vars(BeginWrite)
/\ WF_vars(EndWrite)
/\ WF_vars(BeginRead)
/\ WF_vars(EndRead)

-----------------------------------------------------------------------------

NoDataRaces == Buffer!NoDataRaces

Liveliness ==
<>[] (\A r \in Readers : consumed[r] = [i \in 1..MaxPublished |-> i - 1])

=============================================================================
21 changes: 21 additions & 0 deletions verification/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Verification in TLA+

This folder contains two models for verifying the SPMC and MPMC scenarios.

To run the verifications in less than a minute, these model configurations can be used:

**SPMC:**

- MaxPublished <- 10
- Size <- 8
- Writers <- { "w" }
- Readers <- { "r1", "r2" }
- NULL <- [ model value ]

**MPMC:**

- MaxPublished <- 10
- Size <- 8
- Writers <- { "w1", "w2" }
- Readers <- { "r1", "r2" }
- NULL <- [ model value ]
Loading

0 comments on commit 94a963f

Please sign in to comment.