Skip to content

Commit

Permalink
refactor: use crossbeam in flusher and pr cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Feb 2, 2024
1 parent 865be23 commit 3cce1fd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-trait = "0.1"
byteorder = "1.3.4"
chrono = "0.4"
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { workspace = true }
parking_lot = "0.11.1"
parquet = { workspace = true }
Expand Down
18 changes: 12 additions & 6 deletions influxdb3_write/src/write_buffer/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::write_buffer::buffer_segment::{BufferedWrite, DatabaseWrite, WriteBatch, WriteSummary};
use crate::write_buffer::{Error, SegmentState, TableBatch};
use crate::{wal, SequenceNumber, WalOp};
use crossbeam_channel::{bounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use data_types::NamespaceName;
use observability_deps::tracing::debug;
use parking_lot::{Mutex, RwLock};
Expand All @@ -11,6 +12,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::MissedTickBehavior;

// Duration to buffer writes before flushing them to the wal
const BUFFER_FLUSH_INTERVAL: Duration = Duration::from_millis(10);
Expand All @@ -25,6 +27,9 @@ pub enum BufferedWriteResult {
Error(String),
}

/// The WriteBufferFlusher buffers writes and flushes them to the configured wal. The wal IO is done in a native
/// thread rather than a tokio task to avoid blocking the tokio runtime. As referenced in this post, continuous
/// long-running IO threads should be off the tokio runtime: `<https://ryhl.io/blog/async-what-is-blocking/>`.
#[derive(Debug)]
pub struct WriteBufferFlusher {
join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
Expand All @@ -38,8 +43,8 @@ impl WriteBufferFlusher {
pub fn new(segment_state: Arc<RwLock<SegmentState>>) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(());
let (buffer_tx, buffer_rx) = mpsc::channel(BUFFER_CHANNEL_LIMIT);
let (io_flush_tx, io_flush_rx) = std::sync::mpsc::channel();
let (io_flush_notify_tx, io_flush_notify_rx) = std::sync::mpsc::channel();
let (io_flush_tx, io_flush_rx) = bounded(1);
let (io_flush_notify_tx, io_flush_notify_rx) = bounded(1);

let flusher = Self {
join_handle: Default::default(),
Expand Down Expand Up @@ -102,14 +107,15 @@ impl WriteBufferFlusher {
async fn run_wal_op_buffer(
segment_state: Arc<RwLock<SegmentState>>,
mut buffer_rx: mpsc::Receiver<BufferedWrite>,
io_flush_tx: std::sync::mpsc::Sender<Vec<WalOp>>,
io_flush_notify_rx: std::sync::mpsc::Receiver<wal::Result<SequenceNumber>>,
io_flush_tx: CrossbeamSender<Vec<WalOp>>,
io_flush_notify_rx: CrossbeamReceiver<wal::Result<SequenceNumber>>,
mut shutdown: watch::Receiver<()>,
) {
let mut ops = Vec::new();
let mut write_batch = crate::write_buffer::buffer_segment::WriteBatch::default();
let mut notifies = Vec::new();
let mut interval = tokio::time::interval(BUFFER_FLUSH_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
// select on either buffering an op, ticking the flush interval, or shutting down
Expand Down Expand Up @@ -166,8 +172,8 @@ async fn run_wal_op_buffer(

fn run_io_flush(
segment_state: Arc<RwLock<SegmentState>>,
buffer_rx: std::sync::mpsc::Receiver<Vec<WalOp>>,
buffer_notify: std::sync::mpsc::Sender<wal::Result<SequenceNumber>>,
buffer_rx: CrossbeamReceiver<Vec<WalOp>>,
buffer_notify: CrossbeamSender<wal::Result<SequenceNumber>>,
) {
loop {
let batch = match buffer_rx.recv() {
Expand Down
7 changes: 2 additions & 5 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,11 @@ impl<W: Wal> WriteBufferImpl<W> {
wal: Option<Arc<W>>,
next_segment_id: SegmentId,
) -> Result<Self> {
let segment_writer = match wal
let segment_writer = wal
.as_ref()
.map(|w| w.open_segment_writer(next_segment_id))
.transpose()?
{
Some(writer) => writer,
None => Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)),
};
.unwrap_or_else(|| Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)));

let open_segment = OpenBufferSegment::new(next_segment_id, segment_writer);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));
Expand Down

0 comments on commit 3cce1fd

Please sign in to comment.