Skip to content

Commit

Permalink
feat: add catalog sequence tracking to OpenBufferSegment
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Feb 2, 2024
1 parent 3cce1fd commit 2e88ad0
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
25 changes: 18 additions & 7 deletions influxdb3_write/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::SequenceNumber;
use data_types::ColumnType;
use observability_deps::tracing::info;
use parking_lot::RwLock;
Expand Down Expand Up @@ -39,22 +40,26 @@ impl Catalog {
}
}

pub(crate) fn replace_database(&self, sequence: u64, db: Arc<DatabaseSchema>) -> Result<()> {
pub(crate) fn replace_database(
&self,
sequence: SequenceNumber,
db: Arc<DatabaseSchema>,
) -> Result<()> {
let mut inner = self.inner.write();
if inner.sequence != sequence {
info!("catalog updated elsewhere");
return Err(Error::CatalogUpdatedElsewhere);
}

info!("inserted {}", db.name);
info!("inserted/updated database in catalog: {}", db.name);

inner.sequence += 1;
inner.sequence = inner.sequence.next();
inner.databases.insert(db.name.clone(), db);

Ok(())
}

pub(crate) fn db_or_create(&self, db_name: &str) -> (u64, Arc<DatabaseSchema>) {
pub(crate) fn db_or_create(&self, db_name: &str) -> (SequenceNumber, Arc<DatabaseSchema>) {
let (sequence, db) = {
let inner = self.inner.read();
(inner.sequence, inner.databases.get(db_name).cloned())
Expand Down Expand Up @@ -85,20 +90,24 @@ impl Catalog {
pub fn into_inner(self) -> InnerCatalog {
self.inner.into_inner()
}

pub fn sequence_number(&self) -> SequenceNumber {
self.inner.read().sequence
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
databases: HashMap<String, Arc<DatabaseSchema>>,
sequence: u64,
sequence: SequenceNumber,
}

impl InnerCatalog {
pub(crate) fn new() -> Self {
Self {
databases: HashMap::new(),
sequence: 0,
sequence: SequenceNumber::new(0),
}
}

Expand Down Expand Up @@ -271,7 +280,9 @@ mod tests {
),
);
let database = Arc::new(database);
catalog.replace_database(0, database).unwrap();
catalog
.replace_database(SequenceNumber::new(0), database)
.unwrap();
let inner = catalog.inner.read();

let serialized = serde_json::to_string(&*inner).unwrap();
Expand Down
10 changes: 9 additions & 1 deletion influxdb3_write/src/write_buffer/buffer_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@ pub struct OpenBufferSegment {
segment_writer: Box<dyn WalSegmentWriter>,
segment_id: SegmentId,
buffered_data: HashMap<String, DatabaseBuffer>,
#[allow(dead_code)]
starting_catalog_sequence_number: SequenceNumber,
// TODO: This is temporarily just the number of rows in the segment. When the buffer gets refactored to use
// different structures, we want this to be a representation of approximate memory usage.
segment_size: usize,
}

impl OpenBufferSegment {
pub fn new(segment_id: SegmentId, segment_writer: Box<dyn WalSegmentWriter>) -> Self {
pub fn new(
segment_id: SegmentId,
starting_catalog_sequence_number: SequenceNumber,
segment_writer: Box<dyn WalSegmentWriter>,
) -> Self {
Self {
buffered_data: Default::default(),
segment_writer,
segment_id,
starting_catalog_sequence_number,
segment_size: 0,
}
}
Expand Down Expand Up @@ -314,6 +321,7 @@ mod tests {
fn buffers_rows() {
let mut open_segment = OpenBufferSegment::new(
SegmentId::new(0),
SequenceNumber::new(0),
Box::new(WalSegmentWriterNoopImpl::new(SegmentId::new(0))),
);

Expand Down
1 change: 1 addition & 0 deletions influxdb3_write/src/write_buffer/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ mod tests {
let segment_id = SegmentId::new(3);
let open_segment = OpenBufferSegment::new(
segment_id,
SequenceNumber::new(0),
Box::new(WalSegmentWriterNoopImpl::new(segment_id)),
);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));
Expand Down
3 changes: 2 additions & 1 deletion influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl<W: Wal> WriteBufferImpl<W> {
.transpose()?
.unwrap_or_else(|| Box::new(WalSegmentWriterNoopImpl::new(next_segment_id)));

let open_segment = OpenBufferSegment::new(next_segment_id, segment_writer);
let open_segment =
OpenBufferSegment::new(next_segment_id, catalog.sequence_number(), segment_writer);
let segment_state = Arc::new(RwLock::new(SegmentState::new(open_segment)));

let write_buffer_flusher = WriteBufferFlusher::new(Arc::clone(&segment_state));
Expand Down

0 comments on commit 2e88ad0

Please sign in to comment.