diff --git a/Cargo.toml b/Cargo.toml index d9c10f1..6870a34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "disruptor" -version = "2.1.0" +version = "2.2.0" edition = "2021" description = "Low latency inter-thread communication via a ringbuffer (inspired by the LMAX Disruptor)." license = "MIT" diff --git a/README.md b/README.md index 78cf626..b003cb1 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ It's heavily inspired by the brilliant Add the following to your `Cargo.toml` file: - disruptor = "2.1.0" + disruptor = "2.2.0" To read details of how to use the library, check out the documentation on [docs.rs/disruptor](https://docs.rs/disruptor). -Here's a minimal example: +Here's a minimal example demonstrating both single and batch publication. Note, batch publication should be used whenever possible for best latency and throughput (see benchmarks below). ```rust use disruptor::*; @@ -41,12 +41,19 @@ fn main() { .handle_events_with(processor) .build(); - // Publish into the Disruptor via the `Producer` handle. + // Publish single events into the Disruptor via the `Producer` handle. for i in 0..10 { producer.publish(|e| { e.price = i as f64; }); } + + // Publish a batch of events into the Disruptor. + producer.publish_batch(5, |iter| { + for e in iter { // `iter` is guaranteed to yield 5 events. + e.price = 42.0; + } + }); }// At this point, the Producer instance goes out of scope and when the // processor is done handling all events then the Disruptor is dropped // as well. @@ -155,6 +162,7 @@ fn main() { - [x] Multi Producer Single Consumer (MPSC). - [x] Multi Producer Multi Consumer (MPMC) with consumer interdependencies. - [x] Busy-spin wait strategies. +- [x] Batch publication of events. - [x] Batch consumption of events. - [x] Thread affinity can be set for the event processor thread(s). - [x] Set thread name of each event processor thread. @@ -187,17 +195,17 @@ The latencies below are the mean latency per element with 95% confidence interva | Burst Size | Crossbeam | Disruptor | Improvement | |------------:|----------:|----------:|------------:| -| 1 | 53 ns | 35 ns | 34% | -| 10 | 71 ns | 29 ns | 59% | -| 100 | 34 ns | 31 ns | 9% | +| 1 | 65 ns | 32 ns | 51% | +| 10 | 68 ns | 9 ns | 87% | +| 100 | 29 ns | 8 ns | 72% | *Throughput:* | Burst Size | Crossbeam | Disruptor | Improvement | |------------:|-----------:|------------:|------------:| -| 1 | 19.0M / s | 28.7M / s | 51% | -| 10 | 14.2M / s | 34.8M / s | 145% | -| 100 | 29.7M / s | 31.9M / s | 7% | +| 1 | 15.2M / s | 31.7M / s | 109% | +| 10 | 14.5M / s | 117.3M / s | 709% | +| 100 | 34.3M / s | 119.7M / s | 249% | ## 1 ms Pause Between Bursts @@ -205,17 +213,17 @@ The latencies below are the mean latency per element with 95% confidence interva | Burst Size | Crossbeam | Disruptor | Improvement | |------------:|----------:|-----------:|------------:| -| 1 | 52 ns | 35 ns | 32% | -| 10 | 71 ns | 31 ns | 56% | -| 100 | 33 ns | 29 ns | 12% | +| 1 | 63 ns | 33 ns | 48% | +| 10 | 67 ns | 8 ns | 88% | +| 100 | 30 ns | 9 ns | 70% | *Throughput:* -| Burst Size | Crossbeam | Disruptor | Improvement | -|------------:|-----------:|----------:|------------:| -| 1 | 19.0M / s | 28.5M / s | 50% | -| 10 | 14.1M / s | 32.6M / s | 131% | -| 100 | 30.6M / s | 34.4M / s | 12% | +| Burst Size | Crossbeam | Disruptor | Improvement | +|------------:|-----------:|-----------:|------------:| +| 1 | 15.9M / s | 30.7M / s | 93% | +| 10 | 14.9M / s | 117.7M / s | 690% | +| 100 | 33.8M / s | 105.0M / s | 211% | ## 10 ms Pause Between Bursts @@ -223,21 +231,22 @@ The latencies below are the mean latency per element with 95% confidence interva | Burst Size | Crossbeam | Disruptor | Improvement | |------------:|----------:|----------:|------------:| -| 1 | 56 ns | 35 ns | 38% | -| 10 | 75 ns | 29 ns | 61% | -| 100 | 35 ns | 31 ns | 11% | +| 1 | 51 ns | 32 ns | 37% | +| 10 | 67 ns | 9 ns | 87% | +| 100 | 30 ns | 10 ns | 67% | *Throughput:* -| Burst Size | Crossbeam | Disruptor | Improvement | -|------------:|----------:|----------:|------------:| -| 1 | 18.0M / s | 28.8M / s | 60% | -| 10 | 13.3M / s | 35.0M / s | 163% | -| 100 | 28.7M / s | 32.4M / s | 13% | +| Burst Size | Crossbeam | Disruptor | Improvement | +|------------:|----------:|-----------:|------------:| +| 1 | 19.5M / s | 31.6M / s | 62% | +| 10 | 14.9M / s | 114.5M / s | 668% | +| 100 | 33.6M / s | 105.0M / s | 213% | ## Conclusion -There's clearly a difference between the Disruptor and the Crossbeam libs. However, this is not because the Crossbeam library is not a great piece of software. It is. The Disruptor trades CPU and memory resources for lower latency and higher throughput and that is why it's able to achieve these results. +There's clearly a difference between the Disruptor and the Crossbeam libs. However, this is not because the Crossbeam library is not a great piece of software. It is. The Disruptor trades CPU and memory resources for lower latency and higher throughput and that is why it's able to achieve these results. The Disruptor also excels if you can publish batches of events as +demonstrated in the benchmarks with bursts of 10 and 100 events. Both libraries greatly improves as the burst size goes up but the Disruptor's performance is more resilient to the pauses between bursts which is one of the design goals. diff --git a/benches/counters.rs b/benches/counters.rs index 9b9bddb..49ebf14 100644 --- a/benches/counters.rs +++ b/benches/counters.rs @@ -4,13 +4,15 @@ use criterion::{criterion_group, criterion_main, Criterion}; use crossbeam::channel::*; +use std::hint::black_box; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::thread::{self, JoinHandle}; use disruptor::{build_multi_producer, build_single_producer, BusySpin, Producer}; -const BUF_SIZE: usize = 32_768; +const BUF_SIZE: usize = 32_768; const MAX_PRODUCER_EVENTS: usize = 10_000_000; +const BATCH_SIZE: usize = 2_000; fn crossbeam_spsc() { let (s, r) = bounded(BUF_SIZE); @@ -27,8 +29,7 @@ fn crossbeam_spsc() { // Consumer let c1: JoinHandle<()> = thread::spawn(move || { for msg in r { - let tmp = msg; - sink_clone.fetch_add(tmp, Ordering::Release); + sink_clone.fetch_add(msg, Ordering::Release); } }); @@ -36,7 +37,6 @@ fn crossbeam_spsc() { let _ = c1.join(); sink.load(Ordering::Acquire); - //println!("crossbeam_spsc total_events_count: {}", sink.load(Ordering::Acquire)); } fn crossbeam_mpsc() { @@ -46,14 +46,14 @@ fn crossbeam_mpsc() { // Producer 1 let t1 = thread::spawn(move || { for _ in 0..MAX_PRODUCER_EVENTS { - s.send(1).unwrap(); + s.send(black_box(1)).unwrap(); } }); // Producer 2 let t2 = thread::spawn(move || { for _ in 0..MAX_PRODUCER_EVENTS { - s2.send(1).unwrap(); + s2.send(black_box(1)).unwrap(); } }); @@ -62,8 +62,7 @@ fn crossbeam_mpsc() { // Consumer let c1: JoinHandle<()> = thread::spawn(move || { for msg in r { - let tmp = msg; - sink_clone.fetch_add(tmp, Ordering::Release); + sink_clone.fetch_add(msg, Ordering::Release); } }); @@ -108,17 +107,17 @@ fn disruptor_spsc() { }; let mut producer = build_single_producer(BUF_SIZE, factory, BusySpin) - .handle_events_with( - processor - ) + .handle_events_with(processor) .build(); // Publish into the Disruptor. thread::scope(|s| { s.spawn(move || { - for _ in 0..MAX_PRODUCER_EVENTS { - producer.publish(|e| { - e.val = 1 as i32; + for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE { + producer.batch_publish(BATCH_SIZE, |iter| { + for e in iter { + e.val = black_box(1); + } }); } }); @@ -140,9 +139,7 @@ fn disruptor_mpsc() { }; let mut producer1 = build_multi_producer(BUF_SIZE, factory, BusySpin) - .handle_events_with( - processor - ) + .handle_events_with(processor) .build(); let mut producer2 = producer1.clone(); @@ -150,17 +147,21 @@ fn disruptor_mpsc() { // Publish into the Disruptor. thread::scope(|s| { s.spawn(move || { - for _ in 0..MAX_PRODUCER_EVENTS { - producer1.publish(|e| { - e.val = 1 as i32; + for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE { + producer1.batch_publish(BATCH_SIZE, |iter| { + for e in iter { + e.val = black_box(1); + } }); } }); s.spawn(move || { - for _ in 0..MAX_PRODUCER_EVENTS { - producer2.publish(|e| { - e.val = 1 as i32; + for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE { + producer2.batch_publish(BATCH_SIZE, |iter| { + for e in iter { + e.val = 1; + } }); } }); diff --git a/benches/mpsc.rs b/benches/mpsc.rs index 170314e..84d940d 100644 --- a/benches/mpsc.rs +++ b/benches/mpsc.rs @@ -4,12 +4,13 @@ use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId, Throughput, BenchmarkGroup}; use criterion::measurement::WallTime; -use crossbeam::channel::bounded; +use crossbeam::channel::TrySendError::Full; +use crossbeam::channel::{bounded, TryRecvError::{Empty, Disconnected}}; use crossbeam_utils::CachePadded; use disruptor::{BusySpin, Producer}; -const PRODUCERS: usize = 3; -const DATA_STRUCTURE_SIZE: usize = 64; +const PRODUCERS: usize = 2; +const DATA_STRUCTURE_SIZE: usize = 256; const BURST_SIZES: [u64; 3] = [1, 10, 100]; const PAUSES_MS: [u64; 3] = [0, 1, 10]; @@ -42,6 +43,9 @@ pub fn mpsc_benchmark(c: &mut Criterion) { group.finish(); } +/// Structure for managing all producer threads so they can produce a burst again and again in +/// a benchmark after being released from a barrier. This is to avoid the overhead of creating +/// new threads for each sample. struct BurstProducer { start_barrier: Arc>, stop: Arc>, @@ -61,8 +65,8 @@ impl BurstProducer { let start_barrier = Arc::clone(&start_barrier); thread::spawn(move || { while !stop.load(Acquire) { + // Busy spin with a check if we're done. while start_barrier.compare_exchange(true, false, Acquire, Relaxed).is_err() { - // Busy spin with a check if we're done. if stop.load(Acquire) { return; } } produce_one_burst(); @@ -140,9 +144,15 @@ fn crossbeam(group: &mut BenchmarkGroup, params: (i64, u64), param_des let receiver = { let sink = Arc::clone(&sink); thread::spawn(move || { - while let Ok(event) = r.recv() { - black_box(event.data); - sink.fetch_add(1, Release); + loop { + match r.try_recv() { + Ok(event) => { + black_box(event.data); + sink.fetch_add(1, Release); + }, + Err(Empty) => continue, + Err(Disconnected) => break, + } } }) }; @@ -155,8 +165,14 @@ fn crossbeam(group: &mut BenchmarkGroup, params: (i64, u64), param_des let s = s.clone(); BurstProducer::new(move || { let burst_size = burst_size.load(Acquire); - for data in 1..=burst_size { - s.send(Event { data: black_box(data) }).expect("Should successfully send."); + for data in 0..burst_size { + let mut event = Event { data: black_box(data) }; + loop { + match s.try_send(event) { + Err(Full(e)) => event = e, + _ => break, + } + } } }) }) @@ -193,11 +209,11 @@ fn disruptor(group: &mut BenchmarkGroup, params: (i64, u64), param_des let mut producer = producer.clone(); BurstProducer::new(move || { let burst_size = burst_size.load(Acquire); - for data in 1..=burst_size { - producer.publish(|e| { - e.data = black_box(data); - }); - } + producer.batch_publish(burst_size as usize, |iter| { + for (i, e) in iter.enumerate() { + e.data = black_box(i as i64); + } + }); }) }) .collect::>(); diff --git a/benches/spsc.rs b/benches/spsc.rs index cb0ff62..566fc70 100644 --- a/benches/spsc.rs +++ b/benches/spsc.rs @@ -4,10 +4,11 @@ use std::thread; use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId, Throughput, BenchmarkGroup}; use criterion::measurement::WallTime; -use crossbeam::channel::bounded; +use crossbeam::channel::TrySendError::Full; +use crossbeam::channel::{bounded, TryRecvError::{Empty, Disconnected}}; use disruptor::{BusySpin, Producer}; -const DATA_STRUCTURE_SIZE: usize = 64; +const DATA_STRUCTURE_SIZE: usize = 128; const BURST_SIZES: [u64; 3] = [1, 10, 100]; const PAUSES_MS: [u64; 3] = [0, 1, 10]; @@ -65,8 +66,12 @@ fn crossbeam(group: &mut BenchmarkGroup, inputs: (i64, u64), param: &s let receiver = { let sink = Arc::clone(&sink); thread::spawn(move || { - while let Ok(event) = r.recv() { - sink.store(event.data, Ordering::Release); + loop { + match r.try_recv() { + Ok(event) => sink.store(event.data, Ordering::Release), + Err(Empty) => continue, + Err(Disconnected) => break, + } } }) }; @@ -76,7 +81,13 @@ fn crossbeam(group: &mut BenchmarkGroup, inputs: (i64, u64), param: &s let start = Instant::now(); for _ in 0..iters { for data in 1..=*size { - s.send(Event { data: black_box(data) }).expect("Should successfully send."); + let mut event = Event { data: black_box(data) }; + loop { + match s.try_send(event) { + Err(Full(e)) => event = e, + _ => break, + } + } } // Wait for the last data element to be received in the receiver thread. let last_data = black_box(*size); @@ -105,11 +116,11 @@ fn disruptor(group: &mut BenchmarkGroup, inputs: (i64, u64), param: &s pause(*pause_ms); let start = Instant::now(); for _ in 0..iters { - for data in 1..=*size { - producer.publish(|e| { - e.data = black_box(data); - }); - } + producer.batch_publish(*size as usize, |iter| { + for (i, e) in iter.enumerate() { + e.data = black_box(i as i64 + 1); + } + }); // Wait for the last data element to be received inside processor. let last_data = black_box(*size); while sink.load(Ordering::Acquire) != last_data {}