From 6541713803bbc3c5e7b54f301b10615130c5a1e2 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 9 Feb 2024 16:04:01 -0500 Subject: [PATCH] feat: Add segment persist of closed buffer --- Cargo.lock | 1 + influxdb3_write/Cargo.toml | 1 + influxdb3_write/src/catalog.rs | 29 +- influxdb3_write/src/lib.rs | 20 +- influxdb3_write/src/paths.rs | 15 + influxdb3_write/src/persister.rs | 15 +- influxdb3_write/src/wal.rs | 18 +- .../src/write_buffer/buffer_segment.rs | 307 +++++++++++++++++- influxdb3_write/src/write_buffer/mod.rs | 4 +- 9 files changed, 391 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d710499c9ee..85a1533bdf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2690,6 +2690,7 @@ dependencies = [ "crossbeam-channel", "data_types", "datafusion", + "datafusion_util", "futures-util", "influxdb-line-protocol", "iox_query", diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index edf1fa1a74d..d5d6d9e6622 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -22,6 +22,7 @@ chrono = "0.4" crc32fast = "1.2.0" crossbeam-channel = "0.5.11" datafusion = { workspace = true } +datafusion_util = { path = "../datafusion_util" } parking_lot = "0.11.1" parquet = { workspace = true } thiserror = "1.0" diff --git a/influxdb3_write/src/catalog.rs b/influxdb3_write/src/catalog.rs index 762b805277f..b6ab37abf98 100644 --- a/influxdb3_write/src/catalog.rs +++ b/influxdb3_write/src/catalog.rs @@ -40,6 +40,12 @@ impl Catalog { } } + pub fn from_inner(inner: InnerCatalog) -> Self { + Self { + inner: RwLock::new(inner), + } + } + pub(crate) fn replace_database( &self, sequence: SequenceNumber, @@ -94,9 +100,13 @@ impl Catalog { pub fn sequence_number(&self) -> SequenceNumber { self.inner.read().sequence } + + pub fn clone_inner(&self) -> InnerCatalog { + self.inner.read().clone() + } } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct InnerCatalog { /// The catalog is a map of databases with their table schemas databases: HashMap>, @@ -135,7 +145,11 @@ impl DatabaseSchema { pub fn get_table_schema(&self, table_name: &str) -> Option { self.tables .get(table_name) - .and_then(|table| table.schema.clone()) + .map(|table| table.schema.clone()) + } + + pub fn get_table(&self, table_name: &str) -> Option<&TableDefinition> { + self.tables.get(table_name) } pub fn table_names(&self) -> Vec { @@ -151,7 +165,7 @@ impl DatabaseSchema { pub struct TableDefinition { pub name: String, #[serde(skip_serializing, skip_deserializing)] - pub schema: Option, + pub schema: Schema, columns: BTreeMap, } @@ -218,7 +232,7 @@ impl TableDefinition { Self { name: name.into(), - schema: Some(schema), + schema, columns, } } @@ -241,7 +255,12 @@ impl TableDefinition { for (name, column_type) in columns.into_iter() { self.columns.insert(name, column_type); } - self.schema = Some(schema); + self.schema = schema; + } + + #[allow(dead_code)] + pub(crate) fn schema(&self) -> &Schema { + &self.schema } pub(crate) fn columns(&self) -> &BTreeMap { diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 1e7ed8bd89a..34581a17509 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -24,6 +24,7 @@ use datafusion::prelude::Expr; use iox_query::QueryChunk; use parquet::format::FileMetaData; use serde::{Deserialize, Serialize}; +use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::path::PathBuf; @@ -187,16 +188,18 @@ pub trait Persister: Debug + Send + Sync + 'static { /// for this segment. async fn persist_segment(&self, persisted_segment: PersistedSegment) -> Result<()>; - // Writes a SendableRecorgBatchStream to the Parquet format and perists it - // to Object Store at the given path + // Writes a SendableRecorgBatchStream to the Parquet format and persists it + // to Object Store at the given path. Returns the number of bytes written and the file metadata. async fn persist_parquet_file( &self, path: ParquetFilePath, record_batch: SendableRecordBatchStream, - ) -> crate::Result; + ) -> crate::Result<(u64, FileMetaData)>; /// Returns the configured `ObjectStore` that data is loaded from and persisted to. fn object_store(&self) -> Arc; + + fn as_any(&self) -> &dyn Any; } pub trait Wal: Debug + Send + Sync + 'static { @@ -224,6 +227,8 @@ pub struct SegmentFile { pub trait WalSegmentWriter: Debug + Send + Sync + 'static { fn id(&self) -> SegmentId; + fn bytes_written(&self) -> u64; + fn write_batch(&mut self, ops: Vec) -> wal::Result; fn last_sequence_number(&self) -> SequenceNumber; @@ -309,7 +314,12 @@ pub struct PersistedSegment { pub segment_max_time: i64, /// The collection of databases that had tables persisted in this segment. The tables will then have their /// name and the parquet files. - pub databases: HashMap, + pub databases: HashMap, +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct DatabaseTables { + pub tables: HashMap, } /// A collection of parquet files persisted in a segment for a specific table. @@ -328,7 +338,7 @@ pub struct TableParquetFiles { pub struct ParquetFile { pub path: String, pub size_bytes: u64, - pub row_count: u32, + pub row_count: u64, pub min_time: i64, pub max_time: i64, } diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index 44081cb6a00..0d56b0ae4a7 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -67,6 +67,21 @@ impl ParquetFilePath { )); Self(path) } + + pub fn new_with_parititon_key( + db_name: &str, + table_name: &str, + partition_key: &str, + file_number: u32, + ) -> Self { + let path = ObjPath::from(format!( + "dbs/{db_name}/{table_name}/{}/{:010}.{}", + partition_key, + object_store_file_stem(file_number), + PARQUET_FILE_EXTENSION + )); + Self(path) + } } impl Deref for ParquetFilePath { diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index a70e8123f03..2d29b9161c6 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -27,6 +27,7 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::format::FileMetaData; +use std::any::Any; use std::io::Write; use std::sync::Arc; @@ -214,16 +215,21 @@ impl Persister for PersisterImpl { &self, path: ParquetFilePath, record_batch: SendableRecordBatchStream, - ) -> Result { + ) -> Result<(u64, FileMetaData)> { let parquet = self.serialize_to_parquet(record_batch).await?; + let bytes_written = parquet.bytes.len() as u64; self.object_store.put(path.as_ref(), parquet.bytes).await?; - Ok(parquet.meta_data) + Ok((bytes_written, parquet.meta_data)) } fn object_store(&self) -> Arc { self.object_store.clone() } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } } pub struct ParquetBytes { @@ -482,7 +488,7 @@ async fn persist_and_load_parquet_bytes() { stream_builder.tx().send(Ok(batch2)).await.unwrap(); let path = ParquetFilePath::new("db_one", "table_one", Utc::now(), 1); - let meta = persister + let (bytes_written, meta) = persister .persist_parquet_file(path.clone(), stream_builder.build()) .await .unwrap(); @@ -493,5 +499,6 @@ async fn persist_and_load_parquet_bytes() { let bytes = persister.load_parquet_file(path).await.unwrap(); // Assert that we have a file of bytes > 0 - assert!(!bytes.is_empty()) + assert!(!bytes.is_empty()); + assert_eq!(bytes.len() as u64, bytes_written); } diff --git a/influxdb3_write/src/wal.rs b/influxdb3_write/src/wal.rs index 21286575a89..af4f51dc7ba 100644 --- a/influxdb3_write/src/wal.rs +++ b/influxdb3_write/src/wal.rs @@ -318,6 +318,10 @@ impl WalSegmentWriter for WalSegmentWriterImpl { self.segment_id } + fn bytes_written(&self) -> u64 { + self.bytes_written as u64 + } + fn write_batch(&mut self, ops: Vec) -> Result { self.write_batch(ops) } @@ -331,6 +335,7 @@ impl WalSegmentWriter for WalSegmentWriterImpl { pub struct WalSegmentWriterNoopImpl { segment_id: SegmentId, sequence_number: SequenceNumber, + wal_ops_written: usize, } impl WalSegmentWriterNoopImpl { @@ -338,6 +343,7 @@ impl WalSegmentWriterNoopImpl { Self { segment_id, sequence_number: SequenceNumber::new(0), + wal_ops_written: 0, } } } @@ -347,9 +353,19 @@ impl WalSegmentWriter for WalSegmentWriterNoopImpl { self.segment_id } - fn write_batch(&mut self, _ops: Vec) -> Result { + fn bytes_written(&self) -> u64 { + println!("getting bytes_written"); + self.wal_ops_written as u64 + } + + fn write_batch(&mut self, ops: Vec) -> Result { let sequence_number = self.sequence_number.next(); self.sequence_number = sequence_number; + self.wal_ops_written += ops.len(); + println!( + "write_batch called: wal_ops_written: {}", + self.wal_ops_written + ); Ok(sequence_number) } diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index dfefb34db04..6d5b31d624b 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -2,9 +2,14 @@ //! single WAL segment. Only one segment should be open for writes in the write buffer at any //! given time. +use crate::catalog::Catalog; +use crate::paths::ParquetFilePath; use crate::write_buffer::flusher::BufferedWriteResult; use crate::write_buffer::{FieldData, Row, TableBatch}; -use crate::{wal, write_buffer::Result, SegmentId, SequenceNumber, WalOp, WalSegmentWriter}; +use crate::{ + wal, write_buffer::Result, DatabaseTables, ParquetFile, PersistedSegment, Persister, SegmentId, + SequenceNumber, TableParquetFiles, WalOp, WalSegmentWriter, +}; use arrow::array::{ ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, UInt64Builder, @@ -12,6 +17,7 @@ use arrow::array::{ use arrow::datatypes::Int32Type; use arrow::record_batch::RecordBatch; use data_types::{ColumnType, NamespaceName, TimestampMinMax}; +use datafusion_util::stream_from_batch; use schema::Schema; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; @@ -82,6 +88,18 @@ impl OpenBufferSegment { Ok(self.segment_size) } + + #[allow(dead_code)] + pub fn into_closed_segment(self, catalog: Arc) -> ClosedBufferSegment { + ClosedBufferSegment::new( + self.segment_id, + self.starting_catalog_sequence_number, + catalog.sequence_number(), + self.segment_writer, + self.buffered_data, + catalog, + ) + } } #[derive(Debug, Default)] @@ -280,6 +298,8 @@ impl PartitionBuffer { cols.push(columns.remove(f.name()).unwrap().into_arrow()); } + println!("row count: {}", row_count); + RecordBatch::try_new(schema, cols).unwrap() } } @@ -308,14 +328,141 @@ impl Builder { } } +#[allow(dead_code)] #[derive(Debug)] -pub struct ClosedBufferSegment {} +pub struct ClosedBufferSegment { + segment_id: SegmentId, + catalog_start_sequence_number: SequenceNumber, + catalog_end_sequence_number: SequenceNumber, + segment_writer: Box, + buffered_data: HashMap, + catalog: Arc, +} + +impl ClosedBufferSegment { + #[allow(dead_code)] + fn new( + segment_id: SegmentId, + catalog_start_sequence_number: SequenceNumber, + catalog_end_sequence_number: SequenceNumber, + segment_writer: Box, + buffered_data: HashMap, + catalog: Arc, + ) -> Self { + Self { + segment_id, + catalog_start_sequence_number, + catalog_end_sequence_number, + segment_writer, + buffered_data, + catalog, + } + } + + #[allow(dead_code)] + pub async fn persist(&self, persister: Arc) -> crate::Result<()> { + if self.catalog_start_sequence_number != self.catalog_end_sequence_number { + let inner_catalog = self.catalog.clone_inner(); + + persister + .persist_catalog(self.segment_id, Catalog::from_inner(inner_catalog)) + .await?; + } + + let mut persisted_database_files = HashMap::new(); + let mut segment_parquet_size_bytes = 0; + let mut segment_row_count = 0; + let mut segment_min_time = i64::MAX; + let mut segment_max_time = i64::MIN; + + // persist every partition buffer + for (db_name, db_buffer) in &self.buffered_data { + let mut database_tables = DatabaseTables::default(); + + if let Some(db_schema) = self.catalog.db_schema(db_name) { + for (table_name, table_buffer) in &db_buffer.table_buffers { + if let Some(table) = db_schema.get_table(table_name) { + let mut table_parquet_files = TableParquetFiles { + table_name: table_name.to_string(), + parquet_files: vec![], + sort_key: vec![], + }; + + // persist every partition buffer + for (partition_key, partition_buffer) in &table_buffer.partition_buffers { + let data = partition_buffer + .rows_to_record_batch(table.schema(), table.columns()); + let row_count = data.num_rows(); + let batch_stream = stream_from_batch(table.schema().as_arrow(), data); + let parquet_file_path = ParquetFilePath::new_with_parititon_key( + db_name, + &table.name, + partition_key, + self.segment_id.0, + ); + let path = parquet_file_path.to_string(); + let (size_bytes, meta) = persister + .persist_parquet_file(parquet_file_path, batch_stream) + .await?; + + let parquet_file = ParquetFile { + path, + size_bytes, + row_count: row_count as u64, + min_time: partition_buffer.timestamp_min, + max_time: partition_buffer.timestamp_max, + }; + table_parquet_files.parquet_files.push(parquet_file); + + segment_parquet_size_bytes += size_bytes; + segment_row_count += meta.num_rows as u64; + segment_max_time = segment_max_time.max(partition_buffer.timestamp_max); + segment_min_time = segment_min_time.min(partition_buffer.timestamp_min); + } + + if !table_parquet_files.parquet_files.is_empty() { + database_tables + .tables + .insert(table_name.to_string(), table_parquet_files); + } + } + } + } + + if !database_tables.tables.is_empty() { + persisted_database_files.insert(db_name.to_string(), database_tables); + } + } + + let persisted_segment = PersistedSegment { + segment_id: self.segment_id, + segment_wal_size_bytes: self.segment_writer.bytes_written(), + segment_parquet_size_bytes, + segment_row_count, + segment_min_time, + segment_max_time, + databases: persisted_database_files, + }; + + persister.persist_segment(persisted_segment).await?; + + Ok(()) + } +} #[cfg(test)] mod tests { use super::*; use crate::wal::WalSegmentWriterNoopImpl; use crate::write_buffer::tests::lp_to_table_batches; + use crate::write_buffer::{parse_validate_and_update_schema, Partitioner}; + use crate::{LpWriteOp, PersistedCatalog}; + use bytes::Bytes; + use datafusion::execution::SendableRecordBatchStream; + use object_store::ObjectStore; + use parking_lot::Mutex; + use parquet::format::FileMetaData; + use std::any::Any; #[test] fn buffers_rows() { @@ -350,4 +497,160 @@ mod tests { assert_eq!(mem_partition.timestamp_min, 20); assert_eq!(mem_partition.timestamp_max, 20); } + + #[tokio::test] + async fn persist_closed_buffer() { + let segment_id = SegmentId::new(4); + let segment_writer = Box::new(WalSegmentWriterNoopImpl::new(segment_id)); + let mut open_segment = + OpenBufferSegment::new(segment_id, SequenceNumber::new(0), segment_writer); + + let catalog = Catalog::new(); + + let lp = "cpu,tag1=cupcakes bar=1 10\nmem,tag2=turtles bar=3 15\nmem,tag2=snakes bar=2 20"; + + let wal_op = WalOp::LpWrite(LpWriteOp { + db_name: "db1".to_string(), + lp: lp.to_string(), + default_time: 0, + }); + + let write_batch = lp_to_write_batch(&catalog, "db1", lp); + + open_segment.write_batch(vec![wal_op]).unwrap(); + open_segment.buffer_writes(write_batch).unwrap(); + + let catalog = Arc::new(catalog); + let closed_buffer_segment = open_segment.into_closed_segment(Arc::clone(&catalog)); + + let persister: Arc = Arc::new(TestPersister::default()); + closed_buffer_segment + .persist(Arc::clone(&persister)) + .await + .unwrap(); + + let persisted_state = persister + .as_any() + .downcast_ref::() + .unwrap() + .state + .lock(); + assert_eq!( + persisted_state.catalog.as_ref().unwrap().clone_inner(), + catalog.clone_inner() + ); + let segment_info = persisted_state.segment.as_ref().unwrap(); + + assert_eq!(segment_info.segment_id, segment_id); + assert_eq!(segment_info.segment_min_time, 10); + assert_eq!(segment_info.segment_max_time, 20); + assert_eq!(segment_info.segment_row_count, 2); + // in the mock each parquet file is 1 byte, so should have 2 + assert_eq!(segment_info.segment_parquet_size_bytes, 2); + // should have been one write into the wal + assert_eq!(segment_info.segment_wal_size_bytes, 1); + // one file for cpu, one for mem + assert_eq!(persisted_state.parquet_files.len(), 2); + + println!("segment_info:\n{:#?}", segment_info); + let db = segment_info.databases.get("db1").unwrap(); + let cpu = db.tables.get("cpu").unwrap(); + let cpu_parqet = &cpu.parquet_files[0]; + + // file number of the path should match the segment id + assert_eq!( + cpu_parqet.path, + ParquetFilePath::new_with_parititon_key("db1", "cpu", "1970-01-01", 4).to_string() + ); + assert_eq!(cpu_parqet.row_count, 1); + assert_eq!(cpu_parqet.min_time, 10); + assert_eq!(cpu_parqet.max_time, 10); + + let mem = db.tables.get("mem").unwrap(); + let mem_parqet = &mem.parquet_files[0]; + + // file number of the path should match the segment id + assert_eq!( + mem_parqet.path, + ParquetFilePath::new_with_parititon_key("db1", "mem", "1970-01-01", 4).to_string() + ); + assert_eq!(mem_parqet.row_count, 2); + assert_eq!(mem_parqet.min_time, 15); + assert_eq!(mem_parqet.max_time, 20); + } + + #[derive(Debug, Default)] + struct TestPersister { + state: Mutex, + } + + #[derive(Debug, Default)] + struct PersistedState { + catalog: Option, + segment: Option, + parquet_files: Vec, + } + + #[async_trait::async_trait] + impl Persister for TestPersister { + async fn persist_catalog( + &self, + _segment_id: SegmentId, + catalog: Catalog, + ) -> crate::Result<()> { + self.state.lock().catalog = Some(catalog); + Ok(()) + } + + async fn persist_parquet_file( + &self, + path: ParquetFilePath, + _data: SendableRecordBatchStream, + ) -> crate::Result<(u64, FileMetaData)> { + self.state.lock().parquet_files.push(path); + let meta = FileMetaData::new(1, vec![], 1, vec![], None, None, None, None, None); + Ok((1, meta)) + } + + async fn persist_segment(&self, segment: PersistedSegment) -> crate::Result<()> { + self.state.lock().segment = Some(segment); + Ok(()) + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + async fn load_catalog(&self) -> crate::Result> { + todo!() + } + + async fn load_segments( + &self, + _most_recent_n: usize, + ) -> crate::Result> { + todo!() + } + + async fn load_parquet_file(&self, _path: ParquetFilePath) -> crate::Result { + todo!() + } + + fn object_store(&self) -> Arc { + todo!() + } + } + + fn lp_to_write_batch(catalog: &Catalog, db_name: &'static str, lp: &str) -> WriteBatch { + let mut write_batch = WriteBatch::default(); + let (seq, db) = catalog.db_or_create(db_name); + let partitioner = Partitioner::new_per_day_partitioner(); + let result = parse_validate_and_update_schema(lp, &db, &partitioner, 0).unwrap(); + if let Some(db) = result.schema { + catalog.replace_database(seq, Arc::new(db)).unwrap(); + } + let db_name = NamespaceName::new(db_name).unwrap(); + write_batch.add_db_write(db_name, result.table_batches); + write_batch + } } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 6c3c0c51fc4..712b628f321 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -182,7 +182,7 @@ impl WriteBufferImpl { ) -> Result>, DataFusionError> { let db_schema = self.catalog.db_schema(database_name).unwrap(); let table = db_schema.tables.get(table_name).unwrap(); - let schema = table.schema.as_ref().cloned().unwrap(); + let schema = table.schema.clone(); let table_buffer = self.clone_table_buffer(database_name, table_name).unwrap(); @@ -223,7 +223,7 @@ impl WriteBufferImpl { fn get_table_record_batches(&self, datbase_name: &str, table_name: &str) -> Vec { let db_schema = self.catalog.db_schema(datbase_name).unwrap(); let table = db_schema.tables.get(table_name).unwrap(); - let schema = table.schema.as_ref().cloned().unwrap(); + let schema = table.schema.clone(); let table_buffer = self.clone_table_buffer(datbase_name, table_name).unwrap();