Skip to content

Commit

Permalink
Update benchmarks with batch publication. New version.
Browse files Browse the repository at this point in the history
- Also busy spin sending and receiving in Crossbeam benchmarks to lower
  latency further.
- Update benchmarks with batch publication.
- Improve README.md.
  • Loading branch information
nicholassm committed Jun 19, 2024
1 parent 75be329 commit f563eed
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "disruptor"
version = "2.1.0"
version = "2.2.0"
edition = "2021"
description = "Low latency inter-thread communication via a ringbuffer (inspired by the LMAX Disruptor)."
license = "MIT"
Expand Down
61 changes: 35 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ It's heavily inspired by the brilliant

Add the following to your `Cargo.toml` file:

disruptor = "2.1.0"
disruptor = "2.2.0"

To read details of how to use the library, check out the documentation on [docs.rs/disruptor](https://docs.rs/disruptor).

Here's a minimal example:
Here's a minimal example demonstrating both single and batch publication. Note, batch publication should be used whenever possible for best latency and throughput (see benchmarks below).

```rust
use disruptor::*;
Expand All @@ -41,12 +41,19 @@ fn main() {
.handle_events_with(processor)
.build();

// Publish into the Disruptor via the `Producer` handle.
// Publish single events into the Disruptor via the `Producer` handle.
for i in 0..10 {
producer.publish(|e| {
e.price = i as f64;
});
}

// Publish a batch of events into the Disruptor.
producer.publish_batch(5, |iter| {
for e in iter { // `iter` is guaranteed to yield 5 events.
e.price = 42.0;
}
});
}// At this point, the Producer instance goes out of scope and when the
// processor is done handling all events then the Disruptor is dropped
// as well.
Expand Down Expand Up @@ -155,6 +162,7 @@ fn main() {
- [x] Multi Producer Single Consumer (MPSC).
- [x] Multi Producer Multi Consumer (MPMC) with consumer interdependencies.
- [x] Busy-spin wait strategies.
- [x] Batch publication of events.
- [x] Batch consumption of events.
- [x] Thread affinity can be set for the event processor thread(s).
- [x] Set thread name of each event processor thread.
Expand Down Expand Up @@ -187,57 +195,58 @@ The latencies below are the mean latency per element with 95% confidence interva

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|----------:|----------:|------------:|
| 1 | 53 ns | 35 ns | 34% |
| 10 | 71 ns | 29 ns | 59% |
| 100 | 34 ns | 31 ns | 9% |
| 1 | 65 ns | 32 ns | 51% |
| 10 | 68 ns | 9 ns | 87% |
| 100 | 29 ns | 8 ns | 72% |

*Throughput:*

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|-----------:|------------:|------------:|
| 1 | 19.0M / s | 28.7M / s | 51% |
| 10 | 14.2M / s | 34.8M / s | 145% |
| 100 | 29.7M / s | 31.9M / s | 7% |
| 1 | 15.2M / s | 31.7M / s | 109% |
| 10 | 14.5M / s | 117.3M / s | 709% |
| 100 | 34.3M / s | 119.7M / s | 249% |

## 1 ms Pause Between Bursts

*Latency:*

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|----------:|-----------:|------------:|
| 1 | 52 ns | 35 ns | 32% |
| 10 | 71 ns | 31 ns | 56% |
| 100 | 33 ns | 29 ns | 12% |
| 1 | 63 ns | 33 ns | 48% |
| 10 | 67 ns | 8 ns | 88% |
| 100 | 30 ns | 9 ns | 70% |

*Throughput:*

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|-----------:|----------:|------------:|
| 1 | 19.0M / s | 28.5M / s | 50% |
| 10 | 14.1M / s | 32.6M / s | 131% |
| 100 | 30.6M / s | 34.4M / s | 12% |
| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|-----------:|-----------:|------------:|
| 1 | 15.9M / s | 30.7M / s | 93% |
| 10 | 14.9M / s | 117.7M / s | 690% |
| 100 | 33.8M / s | 105.0M / s | 211% |

## 10 ms Pause Between Bursts

*Latency:*

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|----------:|----------:|------------:|
| 1 | 56 ns | 35 ns | 38% |
| 10 | 75 ns | 29 ns | 61% |
| 100 | 35 ns | 31 ns | 11% |
| 1 | 51 ns | 32 ns | 37% |
| 10 | 67 ns | 9 ns | 87% |
| 100 | 30 ns | 10 ns | 67% |

*Throughput:*

| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|----------:|----------:|------------:|
| 1 | 18.0M / s | 28.8M / s | 60% |
| 10 | 13.3M / s | 35.0M / s | 163% |
| 100 | 28.7M / s | 32.4M / s | 13% |
| Burst Size | Crossbeam | Disruptor | Improvement |
|------------:|----------:|-----------:|------------:|
| 1 | 19.5M / s | 31.6M / s | 62% |
| 10 | 14.9M / s | 114.5M / s | 668% |
| 100 | 33.6M / s | 105.0M / s | 213% |

## Conclusion

There's clearly a difference between the Disruptor and the Crossbeam libs. However, this is not because the Crossbeam library is not a great piece of software. It is. The Disruptor trades CPU and memory resources for lower latency and higher throughput and that is why it's able to achieve these results.
There's clearly a difference between the Disruptor and the Crossbeam libs. However, this is not because the Crossbeam library is not a great piece of software. It is. The Disruptor trades CPU and memory resources for lower latency and higher throughput and that is why it's able to achieve these results. The Disruptor also excels if you can publish batches of events as
demonstrated in the benchmarks with bursts of 10 and 100 events.

Both libraries greatly improves as the burst size goes up but the Disruptor's performance is more resilient to the pauses between bursts which is one of the design goals.

Expand Down
47 changes: 24 additions & 23 deletions benches/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

use criterion::{criterion_group, criterion_main, Criterion};
use crossbeam::channel::*;
use std::hint::black_box;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use disruptor::{build_multi_producer, build_single_producer, BusySpin, Producer};

const BUF_SIZE: usize = 32_768;
const BUF_SIZE: usize = 32_768;
const MAX_PRODUCER_EVENTS: usize = 10_000_000;
const BATCH_SIZE: usize = 2_000;

fn crossbeam_spsc() {
let (s, r) = bounded(BUF_SIZE);
Expand All @@ -27,16 +29,14 @@ fn crossbeam_spsc() {
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
for msg in r {
let tmp = msg;
sink_clone.fetch_add(tmp, Ordering::Release);
sink_clone.fetch_add(msg, Ordering::Release);
}
});

let _ = t1.join();
let _ = c1.join();

sink.load(Ordering::Acquire);
//println!("crossbeam_spsc total_events_count: {}", sink.load(Ordering::Acquire));
}

fn crossbeam_mpsc() {
Expand All @@ -46,14 +46,14 @@ fn crossbeam_mpsc() {
// Producer 1
let t1 = thread::spawn(move || {
for _ in 0..MAX_PRODUCER_EVENTS {
s.send(1).unwrap();
s.send(black_box(1)).unwrap();
}
});

// Producer 2
let t2 = thread::spawn(move || {
for _ in 0..MAX_PRODUCER_EVENTS {
s2.send(1).unwrap();
s2.send(black_box(1)).unwrap();
}
});

Expand All @@ -62,8 +62,7 @@ fn crossbeam_mpsc() {
// Consumer
let c1: JoinHandle<()> = thread::spawn(move || {
for msg in r {
let tmp = msg;
sink_clone.fetch_add(tmp, Ordering::Release);
sink_clone.fetch_add(msg, Ordering::Release);
}
});

Expand Down Expand Up @@ -108,17 +107,17 @@ fn disruptor_spsc() {
};

let mut producer = build_single_producer(BUF_SIZE, factory, BusySpin)
.handle_events_with(
processor
)
.handle_events_with(processor)
.build();

// Publish into the Disruptor.
thread::scope(|s| {
s.spawn(move || {
for _ in 0..MAX_PRODUCER_EVENTS {
producer.publish(|e| {
e.val = 1 as i32;
for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE {
producer.batch_publish(BATCH_SIZE, |iter| {
for e in iter {
e.val = black_box(1);
}
});
}
});
Expand All @@ -140,27 +139,29 @@ fn disruptor_mpsc() {
};

let mut producer1 = build_multi_producer(BUF_SIZE, factory, BusySpin)
.handle_events_with(
processor
)
.handle_events_with(processor)
.build();

let mut producer2 = producer1.clone();

// Publish into the Disruptor.
thread::scope(|s| {
s.spawn(move || {
for _ in 0..MAX_PRODUCER_EVENTS {
producer1.publish(|e| {
e.val = 1 as i32;
for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE {
producer1.batch_publish(BATCH_SIZE, |iter| {
for e in iter {
e.val = black_box(1);
}
});
}
});

s.spawn(move || {
for _ in 0..MAX_PRODUCER_EVENTS {
producer2.publish(|e| {
e.val = 1 as i32;
for _ in 0..MAX_PRODUCER_EVENTS/BATCH_SIZE {
producer2.batch_publish(BATCH_SIZE, |iter| {
for e in iter {
e.val = 1;
}
});
}
});
Expand Down
44 changes: 30 additions & 14 deletions benches/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId, Throughput, BenchmarkGroup};
use criterion::measurement::WallTime;
use crossbeam::channel::bounded;
use crossbeam::channel::TrySendError::Full;
use crossbeam::channel::{bounded, TryRecvError::{Empty, Disconnected}};
use crossbeam_utils::CachePadded;
use disruptor::{BusySpin, Producer};

const PRODUCERS: usize = 3;
const DATA_STRUCTURE_SIZE: usize = 64;
const PRODUCERS: usize = 2;
const DATA_STRUCTURE_SIZE: usize = 256;
const BURST_SIZES: [u64; 3] = [1, 10, 100];
const PAUSES_MS: [u64; 3] = [0, 1, 10];

Expand Down Expand Up @@ -42,6 +43,9 @@ pub fn mpsc_benchmark(c: &mut Criterion) {
group.finish();
}

/// Structure for managing all producer threads so they can produce a burst again and again in
/// a benchmark after being released from a barrier. This is to avoid the overhead of creating
/// new threads for each sample.
struct BurstProducer {
start_barrier: Arc<CachePadded<AtomicBool>>,
stop: Arc<CachePadded<AtomicBool>>,
Expand All @@ -61,8 +65,8 @@ impl BurstProducer {
let start_barrier = Arc::clone(&start_barrier);
thread::spawn(move || {
while !stop.load(Acquire) {
// Busy spin with a check if we're done.
while start_barrier.compare_exchange(true, false, Acquire, Relaxed).is_err() {
// Busy spin with a check if we're done.
if stop.load(Acquire) { return; }
}
produce_one_burst();
Expand Down Expand Up @@ -140,9 +144,15 @@ fn crossbeam(group: &mut BenchmarkGroup<WallTime>, params: (i64, u64), param_des
let receiver = {
let sink = Arc::clone(&sink);
thread::spawn(move || {
while let Ok(event) = r.recv() {
black_box(event.data);
sink.fetch_add(1, Release);
loop {
match r.try_recv() {
Ok(event) => {
black_box(event.data);
sink.fetch_add(1, Release);
},
Err(Empty) => continue,
Err(Disconnected) => break,
}
}
})
};
Expand All @@ -155,8 +165,14 @@ fn crossbeam(group: &mut BenchmarkGroup<WallTime>, params: (i64, u64), param_des
let s = s.clone();
BurstProducer::new(move || {
let burst_size = burst_size.load(Acquire);
for data in 1..=burst_size {
s.send(Event { data: black_box(data) }).expect("Should successfully send.");
for data in 0..burst_size {
let mut event = Event { data: black_box(data) };
loop {
match s.try_send(event) {
Err(Full(e)) => event = e,
_ => break,
}
}
}
})
})
Expand Down Expand Up @@ -193,11 +209,11 @@ fn disruptor(group: &mut BenchmarkGroup<WallTime>, params: (i64, u64), param_des
let mut producer = producer.clone();
BurstProducer::new(move || {
let burst_size = burst_size.load(Acquire);
for data in 1..=burst_size {
producer.publish(|e| {
e.data = black_box(data);
});
}
producer.batch_publish(burst_size as usize, |iter| {
for (i, e) in iter.enumerate() {
e.data = black_box(i as i64);
}
});
})
})
.collect::<Vec<BurstProducer>>();
Expand Down
Loading

0 comments on commit f563eed

Please sign in to comment.