From 536c70ec7e870e90d1852fe8f3c09c97c295f697 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Tue, 18 Feb 2025 12:38:35 +0000 Subject: [PATCH 1/8] feat: break down persist jobs closes: https://github.com/influxdata/influxdb/issues/25991 --- influxdb3/src/commands/serve.rs | 3 +- .../src/write_buffer/queryable_buffer.rs | 134 +++++++++++++++--- 2 files changed, 113 insertions(+), 24 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 5107e0b47b6..194671ec020 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -575,7 +575,8 @@ pub async fn command(config: Config) -> Result<()> { info!("setting up background mem check for query buffer"); background_buffer_checker( - config.force_snapshot_mem_threshold.as_num_bytes(), + // config.force_snapshot_mem_threshold.bytes(), + 734003200, &write_buffer_impl, ) .await; diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index ba7dbb1718b..153774fa35c 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -5,7 +5,12 @@ use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; use anyhow::Context; -use arrow::record_batch::RecordBatch; +use arrow::{ + array::AsArray, + datatypes::TimestampNanosecondType, + record_batch::RecordBatch, + row::{RowConverter, SortField}, +}; use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TimestampMinMax, @@ -25,7 +30,7 @@ use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics}; use iox_query::exec::Executor; use iox_query::frontend::reorg::ReorgPlanner; use object_store::path::Path; -use observability_deps::tracing::{error, info}; +use observability_deps::tracing::{debug, error, info}; use parking_lot::Mutex; use parking_lot::RwLock; use parquet::format::FileMetaData; @@ -197,27 +202,81 @@ impl QueryableBuffer { for chunk in snapshot_chunks { let table_name = db_schema.table_id_to_name(table_id).expect("table exists"); - let persist_job = PersistJob { - database_id: *database_id, - table_id: *table_id, - table_name: Arc::clone(&table_name), - chunk_time: chunk.chunk_time, - path: ParquetFilePath::new( - self.persister.node_identifier_prefix(), - db_schema.name.as_ref(), - database_id.as_u32(), - table_name.as_ref(), - table_id.as_u32(), - chunk.chunk_time, - snapshot_details.last_wal_sequence_number, - ), - batch: chunk.record_batch, - schema: chunk.schema, - timestamp_min_max: chunk.timestamp_min_max, - sort_key: table_buffer.sort_key.clone(), - }; - - persisting_chunks.push(persist_job); + // mapping between time to main record batch array's index + let mut smaller_chunks: HashMap)> = + HashMap::new(); + let smaller_duration = Duration::from_secs(10).as_nanos() as i64; + let all_times = chunk + .record_batch + .column_by_name("time") + .expect("time col to be present") + .as_primitive::() + .values(); + for (idx, time) in all_times.iter().enumerate() { + let smaller_chunk_time = time - (time % smaller_duration); + let (min_max, vec_indices) = + smaller_chunks.entry(smaller_chunk_time).or_insert_with(|| { + (MinMax::new(i64::MAX, i64::MIN), Vec::new()) + }); + + min_max.update(*time); + vec_indices.push(idx); + } + + // at this point we have a bucket for each 10 sec block, we can create + // smaller record batches here but maybe wasteful if we ever needed one + // batch (let's see how this works first and then decide what can happen) + let batch_schema = chunk.record_batch.schema(); + let parent_cols = chunk.record_batch.columns(); + let fields = batch_schema + .fields() + .iter() + .map(|field| SortField::new(field.data_type().clone())) + .collect(); + debug!(?fields, ">>> schema fields"); + + let converter = + RowConverter::new(fields).expect("row converter created from fields"); + let rows = converter + .convert_columns(parent_cols) + .expect("convert cols to rows to succeed"); + + for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { + // create a record batch using just all_indexes from parent recordbatch + let all_rows = all_indexes + .iter() + .map(|idx| rows.row(*idx)) + .collect::>(); + + let child_cols = converter + .convert_rows(all_rows) + .expect("should convert rows back to cols"); + + let smaller_rec_batch = + RecordBatch::try_new(Arc::clone(&batch_schema), child_cols) + .expect("create smaller record batch"); + let persist_job = PersistJob { + database_id: *database_id, + table_id: *table_id, + table_name: Arc::clone(&table_name), + chunk_time: *smaller_chunk_time, + path: ParquetFilePath::new( + self.persister.node_identifier_prefix(), + db_schema.name.as_ref(), + database_id.as_u32(), + table_name.as_ref(), + table_id.as_u32(), + *smaller_chunk_time, + snapshot_details.last_wal_sequence_number, + ), + batch: smaller_rec_batch, + // this schema.clone() can be avoided? + schema: chunk.schema.clone(), + timestamp_min_max: min_max.to_ts_min_max(), + sort_key: table_buffer.sort_key.clone(), + }; + persisting_chunks.push(persist_job); + } } } } @@ -435,6 +494,34 @@ impl QueryableBuffer { } } +struct MinMax { + min: i64, + max: i64, +} + +impl MinMax { + fn new(min: i64, max: i64) -> Self { + // this doesn't check if min < max, a lot of the times + // it's good to start with i64::MAX for min and i64::MIN + // for max in loops so this type unlike TimestampMinMax + // doesn't check this pre-condition + Self { + min, + max + } + } + + fn update(&mut self, other: i64) { + self.min = other.min(self.min); + self.max = other.max(self.max); + } + + fn to_ts_min_max(&self) -> TimestampMinMax { + // at this point min < max + TimestampMinMax::new(self.min, self.max) + } +} + #[async_trait] impl WalFileNotifier for QueryableBuffer { async fn notify(&self, write: Arc) { @@ -653,6 +740,7 @@ async fn sort_dedupe_persist( persist_job.path.to_string() ); + // TODO: this is a good place to use multiple batches let chunk_stats = create_chunk_statistics( Some(row_count), &persist_job.schema, From 4b15108707ea65b0c9049b29677ad8786f75a487 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Wed, 19 Feb 2025 08:06:05 +0000 Subject: [PATCH 2/8] chore: more debug logs --- influxdb3_write/src/write_buffer/mod.rs | 2 +- .../src/write_buffer/queryable_buffer.rs | 135 +++++++++++++----- 2 files changed, 97 insertions(+), 40 deletions(-) diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index eca5244a527..9ddea969ebe 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -2051,7 +2051,7 @@ mod tests { ); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn notifies_watchers_of_snapshot() { let obj_store: Arc = Arc::new(InMemory::new()); let (wbuf, _, _) = setup( diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 153774fa35c..a268574035a 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -6,10 +6,10 @@ use crate::write_buffer::table_buffer::TableBuffer; use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; use anyhow::Context; use arrow::{ - array::AsArray, + array::{AsArray, UInt64Array}, + compute::take, datatypes::TimestampNanosecondType, record_batch::RecordBatch, - row::{RowConverter, SortField}, }; use async_trait::async_trait; use data_types::{ @@ -203,8 +203,7 @@ impl QueryableBuffer { let table_name = db_schema.table_id_to_name(table_id).expect("table exists"); // mapping between time to main record batch array's index - let mut smaller_chunks: HashMap)> = - HashMap::new(); + let mut smaller_chunks: HashMap)> = HashMap::new(); let smaller_duration = Duration::from_secs(10).as_nanos() as i64; let all_times = chunk .record_batch @@ -214,59 +213,62 @@ impl QueryableBuffer { .values(); for (idx, time) in all_times.iter().enumerate() { let smaller_chunk_time = time - (time % smaller_duration); - let (min_max, vec_indices) = - smaller_chunks.entry(smaller_chunk_time).or_insert_with(|| { - (MinMax::new(i64::MAX, i64::MIN), Vec::new()) - }); + let (min_max, vec_indices) = smaller_chunks + .entry(smaller_chunk_time) + .or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new())); min_max.update(*time); - vec_indices.push(idx); + vec_indices.push(idx as u64); + } + + let total_row_count = chunk.record_batch.column(0).len(); + + for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { + debug!( + ?smaller_chunk_time, + ?min_max, + num_indexes = ?all_indexes.len(), + ?total_row_count, + ">>> number of small chunks"); } // at this point we have a bucket for each 10 sec block, we can create // smaller record batches here but maybe wasteful if we ever needed one // batch (let's see how this works first and then decide what can happen) let batch_schema = chunk.record_batch.schema(); + debug!(schema = ?chunk.schema, ">>> influx schema"); + debug!(arrow_schema = ?batch_schema, ">>> batch schema"); let parent_cols = chunk.record_batch.columns(); - let fields = batch_schema - .fields() - .iter() - .map(|field| SortField::new(field.data_type().clone())) - .collect(); - debug!(?fields, ">>> schema fields"); - - let converter = - RowConverter::new(fields).expect("row converter created from fields"); - let rows = converter - .convert_columns(parent_cols) - .expect("convert cols to rows to succeed"); - - for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { - // create a record batch using just all_indexes from parent recordbatch - let all_rows = all_indexes - .iter() - .map(|idx| rows.row(*idx)) - .collect::>(); - - let child_cols = converter - .convert_rows(all_rows) - .expect("should convert rows back to cols"); + for (smaller_chunk_time, (min_max, all_indexes)) in + smaller_chunks.into_iter() + { + let mut smaller_chunk_cols = vec![]; + let indices = UInt64Array::from_iter(all_indexes); + for arr in parent_cols { + let filtered = + take(&arr, &indices, None) + .expect("index should be accessible in parent cols"); + + debug!(smaller_chunk_len = ?filtered.len(), ">>> filtered size"); + smaller_chunk_cols.push(filtered); + } + debug!(smaller_chunk_len = ?smaller_chunk_cols.len(), ">>> smaller chunks size"); let smaller_rec_batch = - RecordBatch::try_new(Arc::clone(&batch_schema), child_cols) + RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols) .expect("create smaller record batch"); let persist_job = PersistJob { database_id: *database_id, table_id: *table_id, table_name: Arc::clone(&table_name), - chunk_time: *smaller_chunk_time, + chunk_time: smaller_chunk_time, path: ParquetFilePath::new( self.persister.node_identifier_prefix(), db_schema.name.as_ref(), database_id.as_u32(), table_name.as_ref(), table_id.as_u32(), - *smaller_chunk_time, + smaller_chunk_time, snapshot_details.last_wal_sequence_number, ), batch: smaller_rec_batch, @@ -277,6 +279,63 @@ impl QueryableBuffer { }; persisting_chunks.push(persist_job); } + // let fields = batch_schema + // .fields() + // .iter() + // .map(|field| SortField::new(field.data_type().clone())) + // .collect(); + // debug!(?fields, ">>> schema fields"); + // + // let converter = + // RowConverter::new(fields).expect("row converter created from fields"); + // debug!(?converter, ">>> converter"); + // + // let rows = converter + // .convert_columns(parent_cols) + // .expect("convert cols to rows to succeed"); + // debug!(?rows, ">>> all rows"); + // + // for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { + // + // // create a record batch using just all_indexes from parent recordbatch + // let all_rows = all_indexes + // .iter() + // .map(|idx| rows.row(*idx)) + // .collect::>(); + // debug!(?rows, ">>> all filtered child rows"); + // + // // hmmm this conversion turns Dictionary types to StringArray, not sure + // // why + // let child_cols = converter + // .convert_rows(all_rows) + // .expect("should convert rows back to cols"); + // debug!(?child_cols, ">>> all child cols"); + // + // let smaller_rec_batch = + // RecordBatch::try_new(Arc::clone(&batch_schema), child_cols) + // .expect("create smaller record batch"); + // let persist_job = PersistJob { + // database_id: *database_id, + // table_id: *table_id, + // table_name: Arc::clone(&table_name), + // chunk_time: *smaller_chunk_time, + // path: ParquetFilePath::new( + // self.persister.node_identifier_prefix(), + // db_schema.name.as_ref(), + // database_id.as_u32(), + // table_name.as_ref(), + // table_id.as_u32(), + // *smaller_chunk_time, + // snapshot_details.last_wal_sequence_number, + // ), + // batch: smaller_rec_batch, + // // this schema.clone() can be avoided? + // schema: chunk.schema.clone(), + // timestamp_min_max: min_max.to_ts_min_max(), + // sort_key: table_buffer.sort_key.clone(), + // }; + // persisting_chunks.push(persist_job); + // } } } } @@ -494,6 +553,7 @@ impl QueryableBuffer { } } +#[derive(Debug)] struct MinMax { min: i64, max: i64, @@ -505,10 +565,7 @@ impl MinMax { // it's good to start with i64::MAX for min and i64::MIN // for max in loops so this type unlike TimestampMinMax // doesn't check this pre-condition - Self { - min, - max - } + Self { min, max } } fn update(&mut self, other: i64) { From 2eaffc2e25c5b423d7b2244ec9c84810a34a4adc Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Thu, 20 Feb 2025 08:47:01 +0000 Subject: [PATCH 3/8] feat: produce snapshot chunks lazily --- influxdb3/src/commands/serve.rs | 1 + influxdb3_write/src/paths.rs | 27 +- influxdb3_write/src/persister.rs | 1 + .../src/write_buffer/queryable_buffer.rs | 255 +++++++++--------- .../src/write_buffer/table_buffer.rs | 163 +++++++++-- 5 files changed, 284 insertions(+), 163 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 194671ec020..235f6d1234f 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -577,6 +577,7 @@ pub async fn command(config: Config) -> Result<()> { background_buffer_checker( // config.force_snapshot_mem_threshold.bytes(), 734003200, + // 536870912, &write_buffer_impl, ) .await; diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index b6c88d2a404..eba53ba040b 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -55,6 +55,7 @@ pub struct ParquetFilePath(ObjPath); impl ParquetFilePath { /// Generate a parquet file path using the given arguments. This will convert the provided /// `chunk_time` into a date time string with format `'YYYY-MM-DD/HH-MM'` + #[allow(clippy::too_many_arguments)] pub fn new( host_prefix: &str, db_name: &str, @@ -63,14 +64,26 @@ impl ParquetFilePath { table_id: u32, chunk_time: i64, wal_file_sequence_number: WalFileSequenceNumber, + sub_chunk_index: Option, ) -> Self { let date_time = DateTime::::from_timestamp_nanos(chunk_time); - let path = ObjPath::from(format!( - "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}", - date_string = date_time.format("%Y-%m-%d/%H-%M"), - wal_seq = wal_file_sequence_number.as_u64(), - ext = PARQUET_FILE_EXTENSION - )); + let path = if sub_chunk_index.is_some() { + ObjPath::from(format!( + "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}-{chunk_idx}.{ext}", + date_string = date_time.format("%Y-%m-%d/%H-%M"), + wal_seq = wal_file_sequence_number.as_u64(), + chunk_idx = sub_chunk_index.unwrap(), + ext = PARQUET_FILE_EXTENSION + )) + + } else { + ObjPath::from(format!( + "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}", + date_string = date_time.format("%Y-%m-%d/%H-%M"), + wal_seq = wal_file_sequence_number.as_u64(), + ext = PARQUET_FILE_EXTENSION + )) + }; Self(path) } } @@ -143,6 +156,7 @@ fn parquet_file_path_new() { .timestamp_nanos_opt() .unwrap(), WalFileSequenceNumber::new(1337), + None, ), ObjPath::from("my_host/dbs/my_db-0/my_table-0/2038-01-19/03-14/0000001337.parquet") ); @@ -162,6 +176,7 @@ fn parquet_file_percent_encoded() { .timestamp_nanos_opt() .unwrap(), WalFileSequenceNumber::new(100), + None, ) .as_ref() .as_ref(), diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index 79e78c672d4..f0924b83095 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -968,6 +968,7 @@ mod tests { 0, Utc::now().timestamp_nanos_opt().unwrap(), WalFileSequenceNumber::new(1), + None, ); let (bytes_written, meta, _) = persister .persist_parquet_file(path.clone(), stream_builder.build()) diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index a268574035a..652c62891cf 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -1,4 +1,4 @@ -use crate::chunk::BufferChunk; +use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter}; use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; @@ -193,149 +193,137 @@ impl QueryableBuffer { for (database_id, table_map) in buffer.db_to_table.iter_mut() { let db_schema = catalog.db_schema_by_id(database_id).expect("db exists"); for (table_id, table_buffer) in table_map.iter_mut() { + info!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table"); let table_def = db_schema .table_definition_by_id(table_id) .expect("table exists"); - let snapshot_chunks = - table_buffer.snapshot(table_def, snapshot_details.end_time_marker); + let sort_key = table_buffer.sort_key.clone(); + let all_keys_to_remove = table_buffer.get_keys_to_remove(snapshot_details.end_time_marker); + info!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); + + let chunk_time_to_chunk = &mut table_buffer.chunk_time_to_chunks; + let snapshot_chunks = &mut table_buffer.snapshotting_chunks; + let snapshot_chunks_iter = SnaphotChunkIter { + keys_to_remove: all_keys_to_remove.iter(), + map: chunk_time_to_chunk, + table_def, + }; - for chunk in snapshot_chunks { + for chunk in snapshot_chunks_iter { + debug!(">>> starting with new chunk"); let table_name = db_schema.table_id_to_name(table_id).expect("table exists"); - // mapping between time to main record batch array's index - let mut smaller_chunks: HashMap)> = HashMap::new(); - let smaller_duration = Duration::from_secs(10).as_nanos() as i64; - let all_times = chunk - .record_batch - .column_by_name("time") - .expect("time col to be present") - .as_primitive::() - .values(); - for (idx, time) in all_times.iter().enumerate() { - let smaller_chunk_time = time - (time % smaller_duration); - let (min_max, vec_indices) = smaller_chunks - .entry(smaller_chunk_time) - .or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new())); - - min_max.update(*time); - vec_indices.push(idx as u64); - } - let total_row_count = chunk.record_batch.column(0).len(); + // TODO: just for experimentation we want to force all snapshots to go + // through without breaking down the record batches + if !snapshot_details.forced { + // when forced, we're already under memory pressure so create smaller + // chunks (by time) and they need to be non-overlapping. + // 1. Create smaller groups (using smaller duration), 10 secs here + let mut smaller_chunks: BTreeMap)> = BTreeMap::new(); + let smaller_duration = Duration::from_secs(10).as_nanos() as i64; + let all_times = chunk + .record_batch + .column_by_name("time") + .expect("time col to be present") + .as_primitive::() + .values(); + + for (idx, time) in all_times.iter().enumerate() { + let smaller_chunk_time = time - (time % smaller_duration); + let (min_max, vec_indices) = smaller_chunks + .entry(smaller_chunk_time) + .or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new())); + + min_max.update(*time); + vec_indices.push(idx as u64); + } + + let total_row_count = chunk.record_batch.column(0).len(); - for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { - debug!( - ?smaller_chunk_time, - ?min_max, - num_indexes = ?all_indexes.len(), - ?total_row_count, - ">>> number of small chunks"); - } + for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { + debug!( + ?smaller_chunk_time, + ?min_max, + num_indexes = ?all_indexes.len(), + ?total_row_count, + ">>> number of small chunks"); + } - // at this point we have a bucket for each 10 sec block, we can create - // smaller record batches here but maybe wasteful if we ever needed one - // batch (let's see how this works first and then decide what can happen) - let batch_schema = chunk.record_batch.schema(); - debug!(schema = ?chunk.schema, ">>> influx schema"); - debug!(arrow_schema = ?batch_schema, ">>> batch schema"); - let parent_cols = chunk.record_batch.columns(); - - for (smaller_chunk_time, (min_max, all_indexes)) in - smaller_chunks.into_iter() - { - let mut smaller_chunk_cols = vec![]; - let indices = UInt64Array::from_iter(all_indexes); - for arr in parent_cols { - let filtered = - take(&arr, &indices, None) - .expect("index should be accessible in parent cols"); - - debug!(smaller_chunk_len = ?filtered.len(), ">>> filtered size"); - smaller_chunk_cols.push(filtered); + // 2. At this point we have a bucket for each 10 sec block with related + // indexes from main chunk. Use those indexes to "cheaply" create + // smaller record batches. + let batch_schema = chunk.record_batch.schema(); + let parent_cols = chunk.record_batch.columns(); + + for (loop_idx, (smaller_chunk_time, (min_max, all_indexes))) in + smaller_chunks.into_iter().enumerate() + { + let mut smaller_chunk_cols = vec![]; + let indices = UInt64Array::from_iter(all_indexes); + for arr in parent_cols { + // `take` here minimises allocations but is not completely free, + // it still needs to allocate for smaller batches. The + // allocations are in `ScalarBuffer::from_iter` under the hood + let filtered = + take(&arr, &indices, None) + .expect("index should be accessible in parent cols"); + + smaller_chunk_cols.push(filtered); + } + let smaller_rec_batch = + RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols) + .expect("create smaller record batch"); + let persist_job = PersistJob { + database_id: *database_id, + table_id: *table_id, + table_name: Arc::clone(&table_name), + chunk_time: smaller_chunk_time, + path: ParquetFilePath::new( + self.persister.node_identifier_prefix(), + db_schema.name.as_ref(), + database_id.as_u32(), + table_name.as_ref(), + table_id.as_u32(), + smaller_chunk_time, + snapshot_details.last_wal_sequence_number, + Some(loop_idx as u64), + ), + batch: smaller_rec_batch, + schema: chunk.schema.clone(), + timestamp_min_max: min_max.to_ts_min_max(), + sort_key: sort_key.clone(), + }; + persisting_chunks.push(persist_job); } - debug!(smaller_chunk_len = ?smaller_chunk_cols.len(), ">>> smaller chunks size"); - let smaller_rec_batch = - RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols) - .expect("create smaller record batch"); + + } else { let persist_job = PersistJob { database_id: *database_id, table_id: *table_id, table_name: Arc::clone(&table_name), - chunk_time: smaller_chunk_time, + chunk_time: chunk.chunk_time, path: ParquetFilePath::new( self.persister.node_identifier_prefix(), db_schema.name.as_ref(), database_id.as_u32(), table_name.as_ref(), table_id.as_u32(), - smaller_chunk_time, + chunk.chunk_time, snapshot_details.last_wal_sequence_number, + None, ), - batch: smaller_rec_batch, - // this schema.clone() can be avoided? + // these clones are cheap and done one at a time + batch: chunk.record_batch.clone(), schema: chunk.schema.clone(), - timestamp_min_max: min_max.to_ts_min_max(), - sort_key: table_buffer.sort_key.clone(), + timestamp_min_max: chunk.timestamp_min_max, + sort_key: sort_key.clone(), }; persisting_chunks.push(persist_job); } - // let fields = batch_schema - // .fields() - // .iter() - // .map(|field| SortField::new(field.data_type().clone())) - // .collect(); - // debug!(?fields, ">>> schema fields"); - // - // let converter = - // RowConverter::new(fields).expect("row converter created from fields"); - // debug!(?converter, ">>> converter"); - // - // let rows = converter - // .convert_columns(parent_cols) - // .expect("convert cols to rows to succeed"); - // debug!(?rows, ">>> all rows"); - // - // for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { - // - // // create a record batch using just all_indexes from parent recordbatch - // let all_rows = all_indexes - // .iter() - // .map(|idx| rows.row(*idx)) - // .collect::>(); - // debug!(?rows, ">>> all filtered child rows"); - // - // // hmmm this conversion turns Dictionary types to StringArray, not sure - // // why - // let child_cols = converter - // .convert_rows(all_rows) - // .expect("should convert rows back to cols"); - // debug!(?child_cols, ">>> all child cols"); - // - // let smaller_rec_batch = - // RecordBatch::try_new(Arc::clone(&batch_schema), child_cols) - // .expect("create smaller record batch"); - // let persist_job = PersistJob { - // database_id: *database_id, - // table_id: *table_id, - // table_name: Arc::clone(&table_name), - // chunk_time: *smaller_chunk_time, - // path: ParquetFilePath::new( - // self.persister.node_identifier_prefix(), - // db_schema.name.as_ref(), - // database_id.as_u32(), - // table_name.as_ref(), - // table_id.as_u32(), - // *smaller_chunk_time, - // snapshot_details.last_wal_sequence_number, - // ), - // batch: smaller_rec_batch, - // // this schema.clone() can be avoided? - // schema: chunk.schema.clone(), - // timestamp_min_max: min_max.to_ts_min_max(), - // sort_key: table_buffer.sort_key.clone(), - // }; - // persisting_chunks.push(persist_job); - // } + snapshot_chunks.push_back(chunk); + // snapshot_chunks.add_one(chunk); + debug!(">>> finished with chunk"); } } } @@ -388,24 +376,31 @@ impl QueryableBuffer { wal_file_number.as_u64(), ); // persist the individual files, building the snapshot as we go - let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( + // let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( + // persister.node_identifier_prefix().to_string(), + // snapshot_details.snapshot_sequence_number, + // snapshot_details.last_wal_sequence_number, + // catalog.sequence_number(), + // ))); + // + let mut persisted_snapshot = PersistedSnapshot::new( persister.node_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, snapshot_details.last_wal_sequence_number, catalog.sequence_number(), - ))); + ); let persist_jobs_empty = persist_jobs.is_empty(); - let mut set = JoinSet::new(); + // let mut set = JoinSet::new(); for persist_job in persist_jobs { let persister = Arc::clone(&persister); let executor = Arc::clone(&executor); - let persisted_snapshot = Arc::clone(&persisted_snapshot); + // let persisted_snapshot = Arc::clone(&persisted_snapshot); let parquet_cache = parquet_cache.clone(); let buffer = Arc::clone(&buffer); let persisted_files = Arc::clone(&persisted_files); - set.spawn(async move { + // set.spawn(async move { let path = persist_job.path.to_string(); let database_id = persist_job.database_id; let table_id = persist_job.table_id; @@ -459,12 +454,12 @@ impl QueryableBuffer { } persisted_snapshot - .lock() + // .lock() .add_parquet_file(database_id, table_id, parquet_file) - }); + // }); } - set.join_all().await; + // set.join_all().await; // persist the snapshot file - only if persist jobs are present // if persist_jobs is empty, then parquet file wouldn't have been @@ -499,9 +494,11 @@ impl QueryableBuffer { // force_snapshot) snapshot runs, snapshot_tracker will check if // wal_periods are empty so it won't trigger a snapshot in the first // place. - let persisted_snapshot = Arc::into_inner(persisted_snapshot) - .expect("Should only have one strong reference") - .into_inner(); + + // let persisted_snapshot = Arc::into_inner(persisted_snapshot) + // .expect("Should only have one strong reference") + // .into_inner(); + if !persist_jobs_empty { loop { match persister.persist_snapshot(&persisted_snapshot).await { diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index ad361890fe9..75afaf3a75d 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -14,10 +14,14 @@ use influxdb3_wal::{FieldData, Row}; use observability_deps::tracing::error; use schema::sort::SortKey; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; -use std::collections::BTreeMap; use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, LinkedList}; use std::mem::size_of; use std::sync::Arc; +use std::{ + collections::btree_map::Entry, + slice::Iter, +}; use thiserror::Error; use crate::ChunkFilter; @@ -34,8 +38,10 @@ pub enum Error { pub(crate) type Result = std::result::Result; pub struct TableBuffer { - chunk_time_to_chunks: BTreeMap, - snapshotting_chunks: Vec, + pub(crate) chunk_time_to_chunks: BTreeMap, + // pub(crate) snapshotting_chunks: SnapshotChunksContainer, + // pub(crate) snapshotting_chunks: Vec, + pub(crate) snapshotting_chunks: LinkedList, pub(crate) sort_key: SortKey, } @@ -43,7 +49,8 @@ impl TableBuffer { pub fn new(sort_key: SortKey) -> Self { Self { chunk_time_to_chunks: BTreeMap::default(), - snapshotting_chunks: vec![], + // snapshotting_chunks: SnapshotChunksContainer::new(), + snapshotting_chunks: LinkedList::new(), sort_key, } } @@ -77,6 +84,7 @@ impl TableBuffer { ) -> Result)>> { let mut batches = HashMap::new(); let schema = table_def.schema.as_arrow(); + // for sc in self.snapshotting_chunks.as_filtered_vec(filter) { for sc in self.snapshotting_chunks.iter().filter(|sc| { filter.test_time_stamp_min_max(sc.timestamp_min_max.min, sc.timestamp_min_max.max) }) { @@ -125,9 +133,10 @@ impl TableBuffer { (a_min.min(b_min), a_max.max(b_max)) }) }; + // self.snapshotting_chunks.find_min_max(min, max) let mut timestamp_min_max = TimestampMinMax::new(min, max); - for sc in &self.snapshotting_chunks { + for sc in self.snapshotting_chunks.iter() { timestamp_min_max = timestamp_min_max.union(&sc.timestamp_min_max); } @@ -148,41 +157,125 @@ impl TableBuffer { size } - pub fn snapshot( + pub fn get_keys_to_remove(&self, older_than_chunk_time: i64) -> Vec { + self.chunk_time_to_chunks + .keys() + .filter(|k| **k < older_than_chunk_time) + .copied() + .collect::>() + } + + pub fn snapshot_lazy( &mut self, table_def: Arc, older_than_chunk_time: i64, - ) -> Vec { - let keys_to_remove = self - .chunk_time_to_chunks + ) -> impl Iterator + use<'_> { + let keys_to_remove = self.chunk_time_to_chunks .keys() .filter(|k| **k < older_than_chunk_time) .copied() .collect::>(); - self.snapshotting_chunks = keys_to_remove - .into_iter() - .map(|chunk_time| { - let chunk = self.chunk_time_to_chunks.remove(&chunk_time).unwrap(); - let timestamp_min_max = chunk.timestamp_min_max(); - let (schema, record_batch) = chunk.into_schema_record_batch(Arc::clone(&table_def)); - - SnapshotChunk { - chunk_time, - timestamp_min_max, - record_batch, - schema, - } - }) - .collect::>(); - self.snapshotting_chunks.clone() + keys_to_remove.into_iter().map(move |chunk_time| { + let chunk = self.chunk_time_to_chunks.remove(&chunk_time).unwrap(); + let timestamp_min_max = chunk.timestamp_min_max(); + let (schema, record_batch) = chunk.into_schema_record_batch(Arc::clone(&table_def)); + + SnapshotChunk { + chunk_time, + timestamp_min_max, + record_batch, + schema, + } + }) } + // pub fn add_snapshot_chunks(&self, snapshot_chunks: Vec) { + // self.snapshotting_chunks.add(snapshot_chunks); + // } + // + // pub fn clear_snapshots(&self) { + // self.snapshotting_chunks.clear_all(); + // } pub fn clear_snapshots(&mut self) { self.snapshotting_chunks.clear(); } } +pub(crate) struct SnaphotChunkIter<'a> { + pub keys_to_remove: Iter<'a, i64>, + pub map: &'a mut BTreeMap, + pub table_def: Arc, +} + +impl Iterator for SnaphotChunkIter<'_> { + type Item = SnapshotChunk; + + fn next(&mut self) -> Option { + if let Some(chunk_time) = self.keys_to_remove.next() { + let chunk = self.map.remove(chunk_time).unwrap(); + let timestamp_min_max = chunk.timestamp_min_max(); + let (schema, record_batch) = + chunk.into_schema_record_batch(Arc::clone(&self.table_def)); + + return Some(SnapshotChunk { + chunk_time: *chunk_time, + timestamp_min_max, + record_batch, + schema, + }); + } + None + } +} + +pub(crate) struct SnapshotChunksContainer { + chunks: parking_lot::Mutex>, +} + +impl SnapshotChunksContainer { + fn new() -> Self { + Self { + chunks: parking_lot::Mutex::new(vec![]), + } + } + + pub(crate) fn add_one(&self, chunk: SnapshotChunk) { + self.chunks.lock().push(chunk); + } + + pub(crate) fn add(&self, chunks: Vec) { + let mut all_chunks = self.chunks.lock(); + *all_chunks = chunks; + } + + fn clear_all(&self) { + self.chunks.lock().clear(); + } + + fn find_min_max(&self, current_min: i64, current_max: i64) -> TimestampMinMax { + let mut timestamp_min_max = TimestampMinMax::new(current_min, current_max); + + for sc in self.chunks.lock().iter() { + timestamp_min_max = timestamp_min_max.union(&sc.timestamp_min_max); + } + + timestamp_min_max + } + + fn as_filtered_vec(&self, filter: &ChunkFilter<'_>) -> Vec { + // TODO: find an alternate impl for this + self.chunks + .lock() + .iter() + .filter(|sc| { + filter.test_time_stamp_min_max(sc.timestamp_min_max.min, sc.timestamp_min_max.max) + }) + .cloned() + .collect() + } +} + #[derive(Debug, Clone)] pub struct SnapshotChunk { pub(crate) chunk_time: i64, @@ -213,7 +306,7 @@ impl std::fmt::Debug for TableBuffer { } } -struct MutableTableChunk { +pub(crate) struct MutableTableChunk { timestamp_min: i64, timestamp_max: i64, data: BTreeMap, @@ -597,8 +690,8 @@ mod tests { use super::*; use arrow_util::assert_batches_sorted_eq; use data_types::NamespaceName; - use datafusion::prelude::{Expr, col, lit_timestamp_nano}; - use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; + use influxdb_line_protocol::v3::SeriesKey; + use iox_query::test; use iox_time::Time; struct TestWriter { @@ -810,4 +903,18 @@ mod tests { assert_batches_sorted_eq!(t.expected_output, &batches); } } + + #[test] + fn test_drain_filter() { + // let map: BTreeMap = BTreeMap::new(); + // let keys_to_remove = vec![1, 2, 3]; + // let columns = vec![(ColumnId::new(), Arc::from("region"), InfluxColumnType::Tag)]; + // let series_key = SeriesKey::new(); + // let table_def = TableDefinition::new(TableId::new(0), Arc::from("foo"), columns, series_key); + // let drain_filter = DrainFilter { + // keys_to_remove: keys_to_remove.iter(), + // map: &mut map, + // table_def: , + // }; + } } From 2c752b2f62563d031e40ccc4c40466b6c3d5b5c0 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Fri, 21 Feb 2025 13:46:15 +0000 Subject: [PATCH 4/8] chore: all tests passing with perf improvs --- .../src/write_buffer/queryable_buffer.rs | 409 +++++++++--------- .../src/write_buffer/table_buffer.rs | 7 +- 2 files changed, 216 insertions(+), 200 deletions(-) diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 652c62891cf..d155f58842b 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -1,9 +1,9 @@ -use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter}; use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; +use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter}; use anyhow::Context; use arrow::{ array::{AsArray, UInt64Array}, @@ -36,9 +36,9 @@ use parking_lot::RwLock; use parquet::format::FileMetaData; use schema::Schema; use schema::sort::SortKey; -use std::any::Any; use std::sync::Arc; use std::time::Duration; +use std::{any::Any, collections::BTreeMap}; use tokio::sync::oneshot::{self, Receiver}; use tokio::task::JoinSet; @@ -198,7 +198,8 @@ impl QueryableBuffer { .table_definition_by_id(table_id) .expect("table exists"); let sort_key = table_buffer.sort_key.clone(); - let all_keys_to_remove = table_buffer.get_keys_to_remove(snapshot_details.end_time_marker); + let all_keys_to_remove = + table_buffer.get_keys_to_remove(snapshot_details.end_time_marker); info!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); let chunk_time_to_chunk = &mut table_buffer.chunk_time_to_chunks; @@ -214,120 +215,34 @@ impl QueryableBuffer { let table_name = db_schema.table_id_to_name(table_id).expect("table exists"); - // TODO: just for experimentation we want to force all snapshots to go - // through without breaking down the record batches - if !snapshot_details.forced { - // when forced, we're already under memory pressure so create smaller - // chunks (by time) and they need to be non-overlapping. - // 1. Create smaller groups (using smaller duration), 10 secs here - let mut smaller_chunks: BTreeMap)> = BTreeMap::new(); - let smaller_duration = Duration::from_secs(10).as_nanos() as i64; - let all_times = chunk - .record_batch - .column_by_name("time") - .expect("time col to be present") - .as_primitive::() - .values(); - - for (idx, time) in all_times.iter().enumerate() { - let smaller_chunk_time = time - (time % smaller_duration); - let (min_max, vec_indices) = smaller_chunks - .entry(smaller_chunk_time) - .or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new())); - - min_max.update(*time); - vec_indices.push(idx as u64); - } - - let total_row_count = chunk.record_batch.column(0).len(); - - for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() { - debug!( - ?smaller_chunk_time, - ?min_max, - num_indexes = ?all_indexes.len(), - ?total_row_count, - ">>> number of small chunks"); - } - - // 2. At this point we have a bucket for each 10 sec block with related - // indexes from main chunk. Use those indexes to "cheaply" create - // smaller record batches. - let batch_schema = chunk.record_batch.schema(); - let parent_cols = chunk.record_batch.columns(); - - for (loop_idx, (smaller_chunk_time, (min_max, all_indexes))) in - smaller_chunks.into_iter().enumerate() - { - let mut smaller_chunk_cols = vec![]; - let indices = UInt64Array::from_iter(all_indexes); - for arr in parent_cols { - // `take` here minimises allocations but is not completely free, - // it still needs to allocate for smaller batches. The - // allocations are in `ScalarBuffer::from_iter` under the hood - let filtered = - take(&arr, &indices, None) - .expect("index should be accessible in parent cols"); - - smaller_chunk_cols.push(filtered); - } - let smaller_rec_batch = - RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols) - .expect("create smaller record batch"); - let persist_job = PersistJob { - database_id: *database_id, - table_id: *table_id, - table_name: Arc::clone(&table_name), - chunk_time: smaller_chunk_time, - path: ParquetFilePath::new( - self.persister.node_identifier_prefix(), - db_schema.name.as_ref(), - database_id.as_u32(), - table_name.as_ref(), - table_id.as_u32(), - smaller_chunk_time, - snapshot_details.last_wal_sequence_number, - Some(loop_idx as u64), - ), - batch: smaller_rec_batch, - schema: chunk.schema.clone(), - timestamp_min_max: min_max.to_ts_min_max(), - sort_key: sort_key.clone(), - }; - persisting_chunks.push(persist_job); - } - - } else { - let persist_job = PersistJob { - database_id: *database_id, - table_id: *table_id, - table_name: Arc::clone(&table_name), - chunk_time: chunk.chunk_time, - path: ParquetFilePath::new( - self.persister.node_identifier_prefix(), - db_schema.name.as_ref(), - database_id.as_u32(), - table_name.as_ref(), - table_id.as_u32(), - chunk.chunk_time, - snapshot_details.last_wal_sequence_number, - None, - ), - // these clones are cheap and done one at a time - batch: chunk.record_batch.clone(), - schema: chunk.schema.clone(), - timestamp_min_max: chunk.timestamp_min_max, - sort_key: sort_key.clone(), - }; - persisting_chunks.push(persist_job); - } + let persist_job = PersistJob { + database_id: *database_id, + table_id: *table_id, + table_name: Arc::clone(&table_name), + chunk_time: chunk.chunk_time, + path: ParquetFilePath::new( + self.persister.node_identifier_prefix(), + db_schema.name.as_ref(), + database_id.as_u32(), + table_name.as_ref(), + table_id.as_u32(), + chunk.chunk_time, + snapshot_details.last_wal_sequence_number, + None, + ), + // these clones are cheap and done one at a time + batch: chunk.record_batch.clone(), + schema: chunk.schema.clone(), + timestamp_min_max: chunk.timestamp_min_max, + sort_key: sort_key.clone(), + }; + persisting_chunks.push(persist_job); snapshot_chunks.push_back(chunk); // snapshot_chunks.add_one(chunk); debug!(">>> finished with chunk"); } } } - persisting_chunks }; @@ -375,91 +290,52 @@ impl QueryableBuffer { persist_jobs.len(), wal_file_number.as_u64(), ); - // persist the individual files, building the snapshot as we go - // let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( - // persister.node_identifier_prefix().to_string(), - // snapshot_details.snapshot_sequence_number, - // snapshot_details.last_wal_sequence_number, - // catalog.sequence_number(), - // ))); - // - let mut persisted_snapshot = PersistedSnapshot::new( - persister.node_identifier_prefix().to_string(), - snapshot_details.snapshot_sequence_number, - snapshot_details.last_wal_sequence_number, - catalog.sequence_number(), - ); let persist_jobs_empty = persist_jobs.is_empty(); - // let mut set = JoinSet::new(); - for persist_job in persist_jobs { - let persister = Arc::clone(&persister); - let executor = Arc::clone(&executor); - // let persisted_snapshot = Arc::clone(&persisted_snapshot); - let parquet_cache = parquet_cache.clone(); - let buffer = Arc::clone(&buffer); - let persisted_files = Arc::clone(&persisted_files); - - // set.spawn(async move { - let path = persist_job.path.to_string(); - let database_id = persist_job.database_id; - let table_id = persist_job.table_id; - let chunk_time = persist_job.chunk_time; - let min_time = persist_job.timestamp_min_max.min; - let max_time = persist_job.timestamp_min_max.max; - - let SortDedupePersistSummary { - file_size_bytes, - file_meta_data, - } = sort_dedupe_persist( - persist_job, - persister, - executor, - parquet_cache - ) - .await - .inspect_err(|error| { - error!( - %error, - debug = ?error, - "error during sort, deduplicate, and persist of buffer data as parquet" - ); - }) - // for now, we are still panicking in this case, see: - // https://github.com/influxdata/influxdb/issues/25676 - // https://github.com/influxdata/influxdb/issues/25677 - .expect("sort, deduplicate, and persist buffer data as parquet"); - let parquet_file = ParquetFile { - id: ParquetFileId::new(), - path, - size_bytes: file_size_bytes, - row_count: file_meta_data.num_rows as u64, - chunk_time, - min_time, - max_time, - }; - - { - // we can clear the buffer as we move on - let mut buffer = buffer.write(); - - // add file first - persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); - // then clear the buffer - if let Some(db) = buffer.db_to_table.get_mut(&database_id) { - if let Some(table) = db.get_mut(&table_id) { - table.clear_snapshots(); - } - } - } - persisted_snapshot - // .lock() - .add_parquet_file(database_id, table_id, parquet_file) - // }); - } + let persisted_snapshot = if snapshot_details.forced { + let mut persisted_snapshot = PersistedSnapshot::new( + persister.node_identifier_prefix().to_string(), + snapshot_details.snapshot_sequence_number, + snapshot_details.last_wal_sequence_number, + catalog.sequence_number(), + ); - // set.join_all().await; + sort_dedupe_serial( + persist_jobs, + &persister, + executor, + parquet_cache, + buffer, + persisted_files, + &mut persisted_snapshot, + ) + .await; + persisted_snapshot + } else { + // persist the individual files, building the snapshot as we go + let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( + persister.node_identifier_prefix().to_string(), + snapshot_details.snapshot_sequence_number, + snapshot_details.last_wal_sequence_number, + catalog.sequence_number(), + ))); + + sort_dedupe_parallel( + persist_jobs, + &persister, + executor, + parquet_cache, + buffer, + persisted_files, + Arc::clone(&persisted_snapshot), + ) + .await; + + Arc::into_inner(persisted_snapshot) + .expect("Should only have one strong reference") + .into_inner() + }; // persist the snapshot file - only if persist jobs are present // if persist_jobs is empty, then parquet file wouldn't have been @@ -494,11 +370,6 @@ impl QueryableBuffer { // force_snapshot) snapshot runs, snapshot_tracker will check if // wal_periods are empty so it won't trigger a snapshot in the first // place. - - // let persisted_snapshot = Arc::into_inner(persisted_snapshot) - // .expect("Should only have one strong reference") - // .into_inner(); - if !persist_jobs_empty { loop { match persister.persist_snapshot(&persisted_snapshot).await { @@ -550,6 +421,150 @@ impl QueryableBuffer { } } +async fn sort_dedupe_parallel( + persist_jobs: Vec, + persister: &Arc, + executor: Arc, + parquet_cache: Option>, + buffer: Arc>, + persisted_files: Arc, + persisted_snapshot: Arc>, +) { + // if gen1 duration is 1m we should combine upto 10 of them + // to create a single parquet file + let mut set = JoinSet::new(); + for persist_job in persist_jobs { + let persister = Arc::clone(persister); + let executor = Arc::clone(&executor); + let persisted_snapshot = Arc::clone(&persisted_snapshot); + let parquet_cache = parquet_cache.clone(); + let buffer = Arc::clone(&buffer); + let persisted_files = Arc::clone(&persisted_files); + + set.spawn(async move { + let path = persist_job.path.to_string(); + let database_id = persist_job.database_id; + let table_id = persist_job.table_id; + let chunk_time = persist_job.chunk_time; + let min_time = persist_job.timestamp_min_max.min; + let max_time = persist_job.timestamp_min_max.max; + + let SortDedupePersistSummary { + file_size_bytes, + file_meta_data, + } = sort_dedupe_persist(persist_job, persister, executor, parquet_cache) + .await + .inspect_err(|error| { + error!( + %error, + debug = ?error, + "error during sort, deduplicate, and persist of buffer data as parquet" + ); + }) + // for now, we are still panicking in this case, see: + // https://github.com/influxdata/influxdb/issues/25676 + // https://github.com/influxdata/influxdb/issues/25677 + .expect("sort, deduplicate, and persist buffer data as parquet"); + let parquet_file = ParquetFile { + id: ParquetFileId::new(), + path, + size_bytes: file_size_bytes, + row_count: file_meta_data.num_rows as u64, + chunk_time, + min_time, + max_time, + }; + + { + // we can clear the buffer as we move on + let mut buffer = buffer.write(); + + // add file first + persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); + // then clear the buffer + if let Some(db) = buffer.db_to_table.get_mut(&database_id) { + if let Some(table) = db.get_mut(&table_id) { + table.clear_snapshots(); + } + } + } + + persisted_snapshot + .lock() + .add_parquet_file(database_id, table_id, parquet_file) + }); + } + set.join_all().await; +} + +async fn sort_dedupe_serial( + persist_jobs: Vec, + persister: &Arc, + executor: Arc, + parquet_cache: Option>, + buffer: Arc>, + persisted_files: Arc, + persisted_snapshot: &mut PersistedSnapshot, +) { + for persist_job in persist_jobs { + let persister = Arc::clone(persister); + let executor = Arc::clone(&executor); + let parquet_cache = parquet_cache.clone(); + let buffer = Arc::clone(&buffer); + let persisted_files = Arc::clone(&persisted_files); + + let path = persist_job.path.to_string(); + let database_id = persist_job.database_id; + let table_id = persist_job.table_id; + let chunk_time = persist_job.chunk_time; + let min_time = persist_job.timestamp_min_max.min; + let max_time = persist_job.timestamp_min_max.max; + + let SortDedupePersistSummary { + file_size_bytes, + file_meta_data, + } = sort_dedupe_persist(persist_job, persister, executor, parquet_cache) + .await + .inspect_err(|error| { + error!( + %error, + debug = ?error, + "error during sort, deduplicate, and persist of buffer data as parquet" + ); + }) + // for now, we are still panicking in this case, see: + // https://github.com/influxdata/influxdb/issues/25676 + // https://github.com/influxdata/influxdb/issues/25677 + .expect("sort, deduplicate, and persist buffer data as parquet"); + let parquet_file = ParquetFile { + id: ParquetFileId::new(), + path, + size_bytes: file_size_bytes, + row_count: file_meta_data.num_rows as u64, + chunk_time, + min_time, + max_time, + }; + + { + // we can clear the buffer as we move on + let mut buffer = buffer.write(); + + // add file first + persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); + // then clear the buffer + if let Some(db) = buffer.db_to_table.get_mut(&database_id) { + if let Some(table) = db.get_mut(&table_id) { + table.clear_snapshots(); + } + } + } + + persisted_snapshot + .add_parquet_file(database_id, table_id, parquet_file) + } +} + #[derive(Debug)] struct MinMax { min: i64, diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index 75afaf3a75d..8424fcb6544 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -14,7 +14,6 @@ use influxdb3_wal::{FieldData, Row}; use observability_deps::tracing::error; use schema::sort::SortKey; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; -use std::collections::btree_map::Entry; use std::collections::{BTreeMap, LinkedList}; use std::mem::size_of; use std::sync::Arc; @@ -690,8 +689,10 @@ mod tests { use super::*; use arrow_util::assert_batches_sorted_eq; use data_types::NamespaceName; - use influxdb_line_protocol::v3::SeriesKey; - use iox_query::test; + use datafusion::prelude::{col, lit_timestamp_nano, Expr}; + use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; + // use influxdb_line_protocol::v3::SeriesKey; + // use iox_query::test; use iox_time::Time; struct TestWriter { From 3d0f7e54a44a56bf1d9e15212a78b635e828a184 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Sun, 23 Feb 2025 22:40:17 +0000 Subject: [PATCH 5/8] feat: schema mismatch alignment This PR addresses the OOM issue (or reduces the chances of running into OOM when snapshotting) by doing following main changes - defaults gen 1 duration to 1m (instead of 10m) - snapshot chunks are built lazily and - sort/dedupe step itself is done serially (i.e 1 at a time) As an optimisation when _not_ forcing a snapshot it aggregates up to 10m worth of chunks and writes them in parallel assumption is given it's a normal snapshot, there is enough memory to run it. closes: https://github.com/influxdata/influxdb/issues/25991 --- Cargo.lock | 1 + influxdb3/src/commands/serve.rs | 17 +- influxdb3_processing_engine/src/lib.rs | 1 + influxdb3_server/src/lib.rs | 1 + influxdb3_server/src/query_executor/mod.rs | 1 + influxdb3_wal/src/lib.rs | 12 +- influxdb3_write/Cargo.toml | 1 + influxdb3_write/src/paths.rs | 27 +- influxdb3_write/src/persister.rs | 1 - influxdb3_write/src/write_buffer/mod.rs | 20 +- .../src/write_buffer/queryable_buffer.rs | 987 +++++++++++++++--- .../src/write_buffer/table_buffer.rs | 124 +-- 12 files changed, 908 insertions(+), 285 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 776d4b09a62..9d29048b4ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3299,6 +3299,7 @@ dependencies = [ "serde_with", "sha2", "snap", + "sysinfo 0.30.13", "test-log", "test_helpers", "thiserror 1.0.69", diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 235f6d1234f..dc41a493573 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -179,7 +179,7 @@ pub struct Config { #[clap( long = "gen1-duration", env = "INFLUXDB3_GEN1_DURATION", - default_value = "10m", + default_value = "1m", action )] pub gen1_duration: Gen1Duration, @@ -361,6 +361,16 @@ pub struct Config { /// smaller time ranges if possible in a query. #[clap(long = "query-file-limit", env = "INFLUXDB3_QUERY_FILE_LIMIT", action)] pub query_file_limit: Option, + + /// Threshold for internal buffer, can be either percentage or absolute value in MB. + /// eg: 70% or 1000 MB + #[clap( + long = "max-memory-for-snapshot", + env = "INFLUXDB3_MAX_MEMORY_FOR_SNAPSHOT", + default_value = "100", + action + )] + pub max_memory_for_snapshot: MemorySizeMb, } /// Specified size of the Parquet cache in megabytes (MB) @@ -569,15 +579,14 @@ pub async fn command(config: Config) -> Result<()> { metric_registry: Arc::clone(&metrics), snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep, query_file_limit: config.query_file_limit, + max_memory_for_snapshot_bytes: config.max_memory_for_snapshot.as_num_bytes() as u64, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; info!("setting up background mem check for query buffer"); background_buffer_checker( - // config.force_snapshot_mem_threshold.bytes(), - 734003200, - // 536870912, + config.force_snapshot_mem_threshold.as_num_bytes(), &write_buffer_impl, ) .await; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index e6a84410c2d..6de512a2598 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -1055,6 +1055,7 @@ mod tests { metric_registry: Arc::clone(&metric_registry), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 2500d35ad21..2625591970a 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -785,6 +785,7 @@ mod tests { metric_registry: Arc::clone(&metrics), snapshotted_wal_files_to_keep: 100, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }, ) .await diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 0e5b56a7d75..5be9b8b52b4 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -849,6 +849,7 @@ mod tests { metric_registry: Default::default(), snapshotted_wal_files_to_keep: 1, query_file_limit, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index fb96f81a3fd..5c4268a22de 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -209,6 +209,16 @@ impl Gen1Duration { self.0.as_nanos() as i64 } + pub fn as_10m(&self) -> u64 { + let duration_secs = self.0.as_secs(); + let ten_min_secs = 600; + if duration_secs >= ten_min_secs { + 1 + } else { + ten_min_secs / duration_secs + } + } + pub fn new_1m() -> Self { Self(Duration::from_secs(60)) } @@ -239,7 +249,7 @@ impl Default for Gen1Duration { #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub struct NoopDetails { - timestamp_ns: i64, + pub timestamp_ns: i64, } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 170d035bc5c..ac869224b01 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -63,6 +63,7 @@ serde_json.workspace = true serde_with.workspace = true sha2.workspace = true snap.workspace = true +sysinfo.workspace = true thiserror.workspace = true tokio.workspace = true url.workspace = true diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index eba53ba040b..b6c88d2a404 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -55,7 +55,6 @@ pub struct ParquetFilePath(ObjPath); impl ParquetFilePath { /// Generate a parquet file path using the given arguments. This will convert the provided /// `chunk_time` into a date time string with format `'YYYY-MM-DD/HH-MM'` - #[allow(clippy::too_many_arguments)] pub fn new( host_prefix: &str, db_name: &str, @@ -64,26 +63,14 @@ impl ParquetFilePath { table_id: u32, chunk_time: i64, wal_file_sequence_number: WalFileSequenceNumber, - sub_chunk_index: Option, ) -> Self { let date_time = DateTime::::from_timestamp_nanos(chunk_time); - let path = if sub_chunk_index.is_some() { - ObjPath::from(format!( - "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}-{chunk_idx}.{ext}", - date_string = date_time.format("%Y-%m-%d/%H-%M"), - wal_seq = wal_file_sequence_number.as_u64(), - chunk_idx = sub_chunk_index.unwrap(), - ext = PARQUET_FILE_EXTENSION - )) - - } else { - ObjPath::from(format!( - "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}", - date_string = date_time.format("%Y-%m-%d/%H-%M"), - wal_seq = wal_file_sequence_number.as_u64(), - ext = PARQUET_FILE_EXTENSION - )) - }; + let path = ObjPath::from(format!( + "{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}", + date_string = date_time.format("%Y-%m-%d/%H-%M"), + wal_seq = wal_file_sequence_number.as_u64(), + ext = PARQUET_FILE_EXTENSION + )); Self(path) } } @@ -156,7 +143,6 @@ fn parquet_file_path_new() { .timestamp_nanos_opt() .unwrap(), WalFileSequenceNumber::new(1337), - None, ), ObjPath::from("my_host/dbs/my_db-0/my_table-0/2038-01-19/03-14/0000001337.parquet") ); @@ -176,7 +162,6 @@ fn parquet_file_percent_encoded() { .timestamp_nanos_opt() .unwrap(), WalFileSequenceNumber::new(100), - None, ) .as_ref() .as_ref(), diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index f0924b83095..79e78c672d4 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -968,7 +968,6 @@ mod tests { 0, Utc::now().timestamp_nanos_opt().unwrap(), WalFileSequenceNumber::new(1), - None, ); let (bytes_written, meta, _) = persister .persist_parquet_file(path.clone(), stream_builder.build()) diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 9ddea969ebe..5467a165253 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -174,6 +174,7 @@ pub struct WriteBufferImplArgs { pub metric_registry: Arc, pub snapshotted_wal_files_to_keep: u64, pub query_file_limit: Option, + pub max_memory_for_snapshot_bytes: u64, } impl WriteBufferImpl { @@ -190,6 +191,7 @@ impl WriteBufferImpl { metric_registry, snapshotted_wal_files_to_keep, query_file_limit, + max_memory_for_snapshot_bytes, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -221,6 +223,8 @@ impl WriteBufferImpl { distinct_cache_provider: Arc::clone(&distinct_cache), persisted_files: Arc::clone(&persisted_files), parquet_cache: parquet_cache.clone(), + gen1_duration: wal_config.gen1_duration, + max_size_per_parquet_file_bytes: max_memory_for_snapshot_bytes, })); // create the wal instance, which will replay into the queryable buffer and start @@ -1039,6 +1043,7 @@ mod tests { metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); @@ -1134,6 +1139,7 @@ mod tests { metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); @@ -1207,6 +1213,7 @@ mod tests { metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap() @@ -1454,6 +1461,7 @@ mod tests { metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); @@ -2721,10 +2729,7 @@ mod tests { #[test_log::test(tokio::test)] async fn test_out_of_order_data() { let tmp_dir = test_helpers::tmp_dir().unwrap(); - debug!( - ?tmp_dir, - ">>> using tmp dir for test_check_mem_and_force_snapshot" - ); + debug!(?tmp_dir, ">>> using tmp dir"); let obj_store: Arc = Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap()); let (write_buffer, _, _) = setup( @@ -2795,6 +2800,9 @@ mod tests { "| a | us | 1970-01-01T00:00:28Z | 10.0 |", "| a | us | 1970-01-01T00:00:29Z | 10.0 |", "| a | us | 1970-01-01T00:00:30Z | 10.0 |", + "| a | us | 1970-01-01T00:00:20Z | 10.0 |", + "| a | us | 1970-01-01T00:00:21Z | 10.0 |", + "| a | us | 1970-01-01T00:00:22Z | 10.0 |", "| a | us | 1970-01-01T00:01:40Z | 10.0 |", "| a | us | 1970-01-01T00:01:41Z | 10.0 |", "| a | us | 1970-01-01T00:01:42Z | 10.0 |", @@ -2807,9 +2815,6 @@ mod tests { "| a | us | 1970-01-01T00:01:49Z | 10.0 |", "| a | us | 1970-01-01T00:01:50Z | 10.0 |", "| a | us | 1970-01-01T00:01:51Z | 10.0 |", - "| a | us | 1970-01-01T00:00:20Z | 10.0 |", - "| a | us | 1970-01-01T00:00:21Z | 10.0 |", - "| a | us | 1970-01-01T00:00:22Z | 10.0 |", "+------+--------+----------------------+-------+", ], &actual @@ -3306,6 +3311,7 @@ mod tests { metric_registry: Arc::clone(&metric_registry), snapshotted_wal_files_to_keep: 10, query_file_limit: None, + max_memory_for_snapshot_bytes: 100_000_000, }) .await .unwrap(); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index d155f58842b..4109e8f7206 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -1,16 +1,11 @@ -use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter}; +use crate::{paths::ParquetFilePath, write_buffer::table_buffer::array_ref_nulls_for_type}; use anyhow::Context; -use arrow::{ - array::{AsArray, UInt64Array}, - compute::take, - datatypes::TimestampNanosecondType, - record_batch::RecordBatch, -}; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TimestampMinMax, @@ -24,7 +19,10 @@ use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle}; use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; use influxdb3_id::{DbId, TableId}; -use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch}; +use influxdb3_wal::{ + CatalogOp, Gen1Duration, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber, + WalOp, WriteBatch, +}; use iox_query::QueryChunk; use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics}; use iox_query::exec::Executor; @@ -34,12 +32,16 @@ use observability_deps::tracing::{debug, error, info}; use parking_lot::Mutex; use parking_lot::RwLock; use parquet::format::FileMetaData; -use schema::Schema; +use schema::Schema as IoxSchema; use schema::sort::SortKey; -use std::sync::Arc; +use std::any::Any; use std::time::Duration; -use std::{any::Any, collections::BTreeMap}; -use tokio::sync::oneshot::{self, Receiver}; +use std::{iter::Peekable, slice::Iter, sync::Arc}; +use sysinfo::{MemoryRefreshKind, RefreshKind}; +use tokio::sync::{ + Semaphore, + oneshot::{self, Receiver}, +}; use tokio::task::JoinSet; #[derive(Debug)] @@ -55,6 +57,8 @@ pub struct QueryableBuffer { /// Sends a notification to this watch channel whenever a snapshot info is persisted persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, + gen1_duration: Gen1Duration, + max_size_per_parquet_file_bytes: u64, } #[derive(Debug)] @@ -66,6 +70,8 @@ pub struct QueryableBufferArgs { pub distinct_cache_provider: Arc, pub persisted_files: Arc, pub parquet_cache: Option>, + pub gen1_duration: Gen1Duration, + pub max_size_per_parquet_file_bytes: u64, } impl QueryableBuffer { @@ -78,6 +84,8 @@ impl QueryableBuffer { distinct_cache_provider, persisted_files, parquet_cache, + gen1_duration, + max_size_per_parquet_file_bytes, }: QueryableBufferArgs, ) -> Self { let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog)))); @@ -94,6 +102,8 @@ impl QueryableBuffer { parquet_cache, persisted_snapshot_notify_rx, persisted_snapshot_notify_tx, + gen1_duration, + max_size_per_parquet_file_bytes, } } @@ -217,6 +227,7 @@ impl QueryableBuffer { let persist_job = PersistJob { database_id: *database_id, + database_name: Arc::clone(&db_schema.name), table_id: *table_id, table_name: Arc::clone(&table_name), chunk_time: chunk.chunk_time, @@ -228,17 +239,15 @@ impl QueryableBuffer { table_id.as_u32(), chunk.chunk_time, snapshot_details.last_wal_sequence_number, - None, ), // these clones are cheap and done one at a time - batch: chunk.record_batch.clone(), - schema: chunk.schema.clone(), + batches: vec![chunk.record_batch.clone()], + iox_schema: chunk.schema.clone(), timestamp_min_max: chunk.timestamp_min_max, sort_key: sort_key.clone(), }; persisting_chunks.push(persist_job); - snapshot_chunks.push_back(chunk); - // snapshot_chunks.add_one(chunk); + snapshot_chunks.push(chunk); debug!(">>> finished with chunk"); } } @@ -256,6 +265,8 @@ impl QueryableBuffer { let catalog = Arc::clone(&self.catalog); let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone(); let parquet_cache = self.parquet_cache.clone(); + let gen1_duration = self.gen1_duration; + let max_size_per_parquet_file = self.max_size_per_parquet_file_bytes; tokio::spawn(async move { // persist the catalog if it has been updated @@ -301,8 +312,17 @@ impl QueryableBuffer { catalog.sequence_number(), ); + let iterator = PersistJobGroupedIterator::new( + &persist_jobs, + Arc::from(persister.node_identifier_prefix()), + wal_file_number, + Arc::clone(&catalog), + gen1_duration.as_10m() as usize, + Some(max_size_per_parquet_file), + ); + sort_dedupe_serial( - persist_jobs, + iterator, &persister, executor, parquet_cache, @@ -321,8 +341,17 @@ impl QueryableBuffer { catalog.sequence_number(), ))); + let iterator = PersistJobGroupedIterator::new( + &persist_jobs, + Arc::from(persister.node_identifier_prefix()), + wal_file_number, + Arc::clone(&catalog), + gen1_duration.as_10m() as usize, + Some(max_size_per_parquet_file), + ); + sort_dedupe_parallel( - persist_jobs, + iterator, &persister, executor, parquet_cache, @@ -421,8 +450,9 @@ impl QueryableBuffer { } } -async fn sort_dedupe_parallel( - persist_jobs: Vec, +#[allow(clippy::too_many_arguments)] +async fn sort_dedupe_parallel>( + iterator: I, persister: &Arc, executor: Arc, parquet_cache: Option>, @@ -430,75 +460,52 @@ async fn sort_dedupe_parallel( persisted_files: Arc, persisted_snapshot: Arc>, ) { - // if gen1 duration is 1m we should combine upto 10 of them - // to create a single parquet file + info!(">>> running sort/dedupe in parallel"); + let mut set = JoinSet::new(); - for persist_job in persist_jobs { + // TODO: may be this concurrency level needs to be externalised + let sempahore = Arc::new(Semaphore::new(5)); + for persist_job in iterator { let persister = Arc::clone(persister); let executor = Arc::clone(&executor); let persisted_snapshot = Arc::clone(&persisted_snapshot); let parquet_cache = parquet_cache.clone(); let buffer = Arc::clone(&buffer); let persisted_files = Arc::clone(&persisted_files); + let semaphore = Arc::clone(&sempahore); set.spawn(async move { - let path = persist_job.path.to_string(); - let database_id = persist_job.database_id; - let table_id = persist_job.table_id; - let chunk_time = persist_job.chunk_time; - let min_time = persist_job.timestamp_min_max.min; - let max_time = persist_job.timestamp_min_max.max; - - let SortDedupePersistSummary { - file_size_bytes, - file_meta_data, - } = sort_dedupe_persist(persist_job, persister, executor, parquet_cache) + let permit = semaphore + .acquire_owned() .await - .inspect_err(|error| { - error!( - %error, - debug = ?error, - "error during sort, deduplicate, and persist of buffer data as parquet" - ); - }) - // for now, we are still panicking in this case, see: - // https://github.com/influxdata/influxdb/issues/25676 - // https://github.com/influxdata/influxdb/issues/25677 - .expect("sort, deduplicate, and persist buffer data as parquet"); - let parquet_file = ParquetFile { - id: ParquetFileId::new(), - path, - size_bytes: file_size_bytes, - row_count: file_meta_data.num_rows as u64, - chunk_time, - min_time, - max_time, - }; - - { - // we can clear the buffer as we move on - let mut buffer = buffer.write(); - - // add file first - persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); - // then clear the buffer - if let Some(db) = buffer.db_to_table.get_mut(&database_id) { - if let Some(table) = db.get_mut(&table_id) { - table.clear_snapshots(); - } - } - } + .expect("to get permit to run sort/dedupe in parallel"); + let (database_id, table_id, parquet_file) = process_single_persist_job( + persist_job, + persister, + executor, + parquet_cache, + buffer, + persisted_files, + ) + .await; persisted_snapshot .lock() - .add_parquet_file(database_id, table_id, parquet_file) + .add_parquet_file(database_id, table_id, parquet_file); + drop(permit); }); } - set.join_all().await; + + while let Some(res) = set.join_next().await { + if let Err(e) = res { + error!(?e, "error when running sort/dedupe in parallel"); + } + } } -async fn sort_dedupe_serial( - persist_jobs: Vec, +#[allow(clippy::too_many_arguments)] +async fn sort_dedupe_serial>( + iterator: I, persister: &Arc, executor: Arc, parquet_cache: Option>, @@ -506,89 +513,84 @@ async fn sort_dedupe_serial( persisted_files: Arc, persisted_snapshot: &mut PersistedSnapshot, ) { - for persist_job in persist_jobs { + info!(">>> running sort/dedupe serially"); + + for persist_job in iterator { let persister = Arc::clone(persister); let executor = Arc::clone(&executor); let parquet_cache = parquet_cache.clone(); let buffer = Arc::clone(&buffer); let persisted_files = Arc::clone(&persisted_files); - let path = persist_job.path.to_string(); - let database_id = persist_job.database_id; - let table_id = persist_job.table_id; - let chunk_time = persist_job.chunk_time; - let min_time = persist_job.timestamp_min_max.min; - let max_time = persist_job.timestamp_min_max.max; - - let SortDedupePersistSummary { - file_size_bytes, - file_meta_data, - } = sort_dedupe_persist(persist_job, persister, executor, parquet_cache) - .await - .inspect_err(|error| { - error!( - %error, - debug = ?error, - "error during sort, deduplicate, and persist of buffer data as parquet" - ); - }) - // for now, we are still panicking in this case, see: - // https://github.com/influxdata/influxdb/issues/25676 - // https://github.com/influxdata/influxdb/issues/25677 - .expect("sort, deduplicate, and persist buffer data as parquet"); - let parquet_file = ParquetFile { - id: ParquetFileId::new(), - path, - size_bytes: file_size_bytes, - row_count: file_meta_data.num_rows as u64, - chunk_time, - min_time, - max_time, - }; - - { - // we can clear the buffer as we move on - let mut buffer = buffer.write(); - - // add file first - persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); - // then clear the buffer - if let Some(db) = buffer.db_to_table.get_mut(&database_id) { - if let Some(table) = db.get_mut(&table_id) { - table.clear_snapshots(); - } - } - } + let (database_id, table_id, parquet_file) = process_single_persist_job( + persist_job, + persister, + executor, + parquet_cache, + buffer, + persisted_files, + ) + .await; - persisted_snapshot - .add_parquet_file(database_id, table_id, parquet_file) + persisted_snapshot.add_parquet_file(database_id, table_id, parquet_file) } } -#[derive(Debug)] -struct MinMax { - min: i64, - max: i64, -} - -impl MinMax { - fn new(min: i64, max: i64) -> Self { - // this doesn't check if min < max, a lot of the times - // it's good to start with i64::MAX for min and i64::MIN - // for max in loops so this type unlike TimestampMinMax - // doesn't check this pre-condition - Self { min, max } - } - - fn update(&mut self, other: i64) { - self.min = other.min(self.min); - self.max = other.max(self.max); - } - - fn to_ts_min_max(&self) -> TimestampMinMax { - // at this point min < max - TimestampMinMax::new(self.min, self.max) +async fn process_single_persist_job( + persist_job: PersistJob, + persister: Arc, + executor: Arc, + parquet_cache: Option>, + buffer: Arc>, + persisted_files: Arc, +) -> (DbId, TableId, ParquetFile) { + let path = persist_job.path.to_string(); + let database_id = persist_job.database_id; + let table_id = persist_job.table_id; + let chunk_time = persist_job.chunk_time; + let min_time = persist_job.timestamp_min_max.min; + let max_time = persist_job.timestamp_min_max.max; + + let SortDedupePersistSummary { + file_size_bytes, + file_meta_data, + } = sort_dedupe_persist(persist_job, persister, executor, parquet_cache) + .await + .inspect_err(|error| { + error!( + %error, + debug = ?error, + "error during sort, deduplicate, and persist of buffer data as parquet" + ); + }) + // for now, we are still panicking in this case, see: + // https://github.com/influxdata/influxdb/issues/25676 + // https://github.com/influxdata/influxdb/issues/25677 + .expect("sort, deduplicate, and persist buffer data as parquet"); + let parquet_file = ParquetFile { + id: ParquetFileId::new(), + path, + size_bytes: file_size_bytes, + row_count: file_meta_data.num_rows as u64, + chunk_time, + min_time, + max_time, + }; + + { + // we can clear the buffer as we move on + let mut buffer = buffer.write(); + + // add file first + persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); + // then clear the buffer + if let Some(db) = buffer.db_to_table.get_mut(&database_id) { + if let Some(table) = db.get_mut(&table_id) { + table.clear_snapshots(); + } + } } + (database_id, table_id, parquet_file) } #[async_trait] @@ -768,16 +770,237 @@ impl BufferState { #[derive(Debug)] struct PersistJob { database_id: DbId, + database_name: Arc, table_id: TableId, table_name: Arc, chunk_time: i64, path: ParquetFilePath, - batch: RecordBatch, - schema: Schema, + // when creating job per chunk, this will be just + // a single RecordBatch, however when grouped this be + // multiple RecordBatch'es that can be passed on to + // ReorgPlanner (as vec of batches in BufferChunk) + batches: Vec, + iox_schema: IoxSchema, timestamp_min_max: TimestampMinMax, sort_key: SortKey, } +impl PersistJob { + fn total_batch_size(&self) -> u64 { + self.batches + .iter() + .map(|batch| batch.get_array_memory_size()) + .sum::() as u64 + } +} + +struct PersistJobGroupedIterator<'a> { + iter: Peekable>, + host_prefix: Arc, + wal_file_number: WalFileSequenceNumber, + catalog: Arc, + chunk_size: usize, + max_size_bytes: u64, + system: sysinfo::System, +} + +impl<'a> PersistJobGroupedIterator<'a> { + fn new( + data: &'a [PersistJob], + host_prefix: Arc, + wal_file_number: WalFileSequenceNumber, + catalog: Arc, + chunk_size: usize, + max_size_bytes: Option, + ) -> Self { + PersistJobGroupedIterator { + iter: data.iter().peekable(), + host_prefix: Arc::clone(&host_prefix), + wal_file_number, + catalog, + chunk_size, + max_size_bytes: max_size_bytes.unwrap_or(u64::MAX), + system: sysinfo::System::new_with_specifics( + RefreshKind::new().with_memory(MemoryRefreshKind::new().with_ram()), + ), + } + } + + fn check_and_align_schema_mismatches_between_batches( + &mut self, + current_data: &PersistJob, + all_batches: &mut [RecordBatch], + all_schemas: Vec, + ) -> Option<()> { + let table_defn = self + .catalog + .db_schema_by_id(¤t_data.database_id)? + .table_definition_by_id(¤t_data.table_id)?; + let expected_schema = table_defn.schema.clone(); + let batches_with_schema_mismatch = + find_batches_with_mismatching_schemas(all_batches, &all_schemas, &expected_schema); + + if !batches_with_schema_mismatch.is_empty() { + // we need to add the missing fields - as schema changes are additive, when there is + // a mismatch it means new column has been added to table but the batches are missing + // them. + for (idx, batch) in &batches_with_schema_mismatch { + let mut cols = vec![]; + // pick it's current iox schema, to add the columns (making null for missing) + let outdated_batch_schema = &all_schemas[*idx]; + debug!( + ?outdated_batch_schema, + ">>> outdated batch schema when aligning mismatched schema" + ); + for col_idx_with_field_details in expected_schema.iter().enumerate() { + let (col_idx, (influx_col_type, field)) = col_idx_with_field_details; + let batch_field = outdated_batch_schema.field_by_name(field.name()); + let len = batch.columns()[0].len(); + if batch_field.is_some() { + let col = Arc::clone(&batch.columns()[col_idx]); + cols.push(col); + } else { + let null_array_col = array_ref_nulls_for_type(influx_col_type, len); + cols.push(null_array_col); + } + } + + let new_arrow_schema = expected_schema.as_arrow(); + debug!( + ?new_arrow_schema, + ">>> new arrow schema for batch when aligning mismatched schema" + ); + let new_rec_batch = RecordBatch::try_new(new_arrow_schema, cols).expect( + "record batch to be created with new schema after fixing schema mismatch", + ); + + let _ = std::mem::replace(&mut all_batches[*idx], new_rec_batch); + } + }; + Some(()) + } +} + +impl Iterator for PersistJobGroupedIterator<'_> { + // This is a grouped persist job, since it includes exactly + // same fields with only difference being each job has a vec + // of batches, it's been reused for now. For clarity it might + // be better to have different types to represent this state + type Item = PersistJob; + + fn next(&mut self) -> Option { + let current_data = self.iter.next()?; + let current_table_id = ¤t_data.table_id; + + let mut ts_min_max = current_data.timestamp_min_max; + + let mut all_batches = Vec::with_capacity(self.chunk_size); + let mut all_iox_schemas = Vec::with_capacity(self.chunk_size); + all_batches.extend_from_slice(¤t_data.batches); + all_iox_schemas.push(current_data.iox_schema.clone()); + + let mut min_chunk_time = current_data.chunk_time; + let mut current_size_bytes = current_data.total_batch_size(); + debug!(?current_size_bytes, table_name = ?current_data.table_name, ">>> current_size_bytes for table"); + self.system.refresh_memory(); + // This is a very naive approach to keep mem bounded, if the first chunk + // is >50M we allow it to be processed but this may run into OOM. To + // address that, one approach is to break it further down but once a + // bigger record batch is built, breaking it down with mem bounds gets + // very tricky and fairly inconsistent as building smaller record batches + // require allocation. The point when mem was sampled to point when they + // are broken down into smaller batches the incoming traffic fills the + // buffer to a point the original mem sample isn't right. But mostly + // with even higher throughputs (having 1m gen 1 duration as default) + // this approach has been faring well, however once you hit very high + // throughputs whereby in 10s (force snapshot interval) + // it fills in enough mem to go past the allocated query buffer % + // (50% default) it will run into OOMs. + let system_mem_bytes = self.system.free_memory() - 100_000_000; + let max_size_bytes = self.max_size_bytes.min(system_mem_bytes); + debug!( + max_size_bytes, + system_mem_bytes, ">>> max size bytes/system mem bytes" + ); + + while all_batches.len() < self.chunk_size && current_size_bytes < max_size_bytes { + debug!(?current_size_bytes, ">>> current_size_bytes"); + if let Some(next_data) = self.iter.peek() { + if next_data.table_id == *current_table_id { + let next = self.iter.next().unwrap(); + ts_min_max = ts_min_max.union(&next.timestamp_min_max); + min_chunk_time = min_chunk_time.min(next.chunk_time); + current_size_bytes += next.total_batch_size(); + all_batches.extend_from_slice(&next.batches); + all_iox_schemas.push(next.iox_schema.clone()); + } else { + break; + } + } else { + break; + } + } + + self.check_and_align_schema_mismatches_between_batches( + current_data, + &mut all_batches, + all_iox_schemas, + )?; + + Some(PersistJob { + database_id: current_data.database_id, + database_name: Arc::clone(¤t_data.database_name), + table_id: current_data.table_id, + path: ParquetFilePath::new( + &self.host_prefix, + ¤t_data.database_name, + current_data.database_id.as_u32(), + ¤t_data.table_name, + current_data.table_id.as_u32(), + min_chunk_time, + self.wal_file_number, + ), + table_name: Arc::clone(¤t_data.table_name), + chunk_time: min_chunk_time, + batches: all_batches, + iox_schema: current_data.iox_schema.clone(), + timestamp_min_max: ts_min_max, + sort_key: current_data.sort_key.clone(), + }) + } +} + +fn find_batches_with_mismatching_schemas( + all_batches: &mut [RecordBatch], + all_schemas: &[IoxSchema], + expected_schema: &IoxSchema, +) -> Vec<(usize, RecordBatch)> { + let batches_with_schema_mismatch: Vec<(usize, RecordBatch)> = all_batches + .iter() + .cloned() + .enumerate() + .filter(|(idx, _)| { + let schema = &all_schemas[*idx]; + for field_1 in expected_schema.iter() { + let mut found_field = false; + for field_2 in schema.iter() { + if field_1.1.name() == field_2.1.name() { + found_field = true; + break; + } + } + + if !found_field { + return true; + } + } + false + }) + .collect(); + + batches_with_schema_mismatch +} + pub(crate) struct SortDedupePersistSummary { pub file_size_bytes: u64, pub file_meta_data: FileMetaData, @@ -799,7 +1022,11 @@ async fn sort_dedupe_persist( ) -> Result { // Dedupe and sort using the COMPACT query built into // iox_query - let row_count = persist_job.batch.num_rows(); + let row_count = persist_job + .batches + .iter() + .map(|batch| batch.num_rows()) + .sum(); info!( "Persisting {} rows for db id {} and table id {} and chunk {} to file {}", row_count, @@ -809,17 +1036,16 @@ async fn sort_dedupe_persist( persist_job.path.to_string() ); - // TODO: this is a good place to use multiple batches let chunk_stats = create_chunk_statistics( Some(row_count), - &persist_job.schema, + &persist_job.iox_schema, Some(persist_job.timestamp_min_max), &NoColumnRanges, ); let chunks: Vec> = vec![Arc::new(BufferChunk { - batches: vec![persist_job.batch], - schema: persist_job.schema.clone(), + batches: persist_job.batches, + schema: persist_job.iox_schema.clone(), stats: Arc::new(chunk_stats), partition_id: TransitionPartitionId::from_parts( PartitionId::new(0), @@ -839,7 +1065,7 @@ async fn sort_dedupe_persist( .compact_plan( data_types::TableId::new(0), persist_job.table_name, - &persist_job.schema, + &persist_job.iox_schema, chunks, persist_job.sort_key, ) @@ -861,7 +1087,7 @@ async fn sort_dedupe_persist( // keep attempting to persist forever. If we can't reach the object store, we'll stop accepting // writes elsewhere in the system, so we need to keep trying to persist. loop { - let batch_stream = stream_from_batches(persist_job.schema.as_arrow(), data.clone()); + let batch_stream = stream_from_batches(persist_job.iox_schema.as_arrow(), data.clone()); match persister .persist_parquet_file(persist_job.path.clone(), batch_stream) @@ -896,15 +1122,16 @@ mod tests { use crate::write_buffer::validator::WriteValidator; use datafusion_util::config::register_iox_object_store; use executor::{DedicatedExecutor, register_current_runtime_for_io}; - use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber}; + use influxdb3_wal::{Gen1Duration, NoopDetails, SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_query::exec::ExecutorConfig; use iox_time::{MockProvider, Time, TimeProvider}; use object_store::ObjectStore; use object_store::memory::InMemory; use parquet_file::storage::{ParquetStorage, StorageId}; + use pretty_assertions::assert_eq; use std::num::NonZeroUsize; - #[tokio::test] + #[test_log::test(tokio::test)] async fn snapshot_works_with_not_all_columns_in_buffer() { let object_store: Arc = Arc::new(InMemory::new()); let metrics = Arc::new(metric::Registry::default()); @@ -950,6 +1177,8 @@ mod tests { .unwrap(), persisted_files: Arc::new(PersistedFiles::new()), parquet_cache: None, + gen1_duration: Gen1Duration::new_1m(), + max_size_per_parquet_file_bytes: 4_000, }; let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); @@ -1060,4 +1289,450 @@ mod tests { .get_files(db.id, table.table_id); assert_eq!(files.len(), 2); } + + #[test_log::test(tokio::test)] + async fn test_when_snapshot_in_parallel_group_multiple_gen_1_durations() { + let object_store: Arc = Arc::new(InMemory::new()); + let metrics = Arc::new(metric::Registry::default()); + + let parquet_store = + ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); + let exec = Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )); + let runtime_env = exec.new_context().inner().runtime_env(); + register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store)); + register_current_runtime_for_io(); + + let catalog = Arc::new(Catalog::new("hosta".into(), "foo".into())); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let persister = Arc::new(Persister::new( + Arc::clone(&object_store), + "hosta", + time_provider, + )); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + + let queryable_buffer_args = QueryableBufferArgs { + executor: Arc::clone(&exec), + catalog: Arc::clone(&catalog), + persister: Arc::clone(&persister), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + distinct_cache_provider: DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(), + persisted_files: Arc::new(PersistedFiles::new()), + parquet_cache: None, + gen1_duration: Gen1Duration::new_1m(), + max_size_per_parquet_file_bytes: 50_000, + }; + let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); + + let db = data_types::NamespaceName::new("testdb").unwrap(); + + // create the initial write with one tag + let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap(); + let lp = format!( + "foo,t1=a,t2=b f1=1i {}", + time_provider.now().timestamp_nanos() + ); + + let lines = val + .v1_parse_lines_and_update_schema( + &lp, + false, + time_provider.now(), + Precision::Nanosecond, + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(1), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + + // write the lp into the buffer + queryable_buffer.notify(Arc::new(wal_contents)).await; + + // create another write, this time with two tags, in a different gen1 block + let lp = "foo,t1=a,t2=b f1=1i,f2=2 61000000000"; + let val = WriteValidator::initialize(db, Arc::clone(&catalog), 0).unwrap(); + + let lines = val + .v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(2), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + let end_time = + wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; + + let snapshot_sequence_number = SnapshotSequenceNumber::new(1); + let snapshot_details = SnapshotDetails { + snapshot_sequence_number, + end_time_marker: end_time, + first_wal_sequence_number: WalFileSequenceNumber::new(1), + last_wal_sequence_number: WalFileSequenceNumber::new(2), + forced: false, + }; + + let details = queryable_buffer + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) + .await; + let _details = details.await.unwrap(); + + // validate we have a single persisted file + let db = catalog.db_schema("testdb").unwrap(); + let table = db.table_definition("foo").unwrap(); + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + debug!(?files, ">>> test: queryable buffer persisted files"); + // although there were 2 writes for different gen 1 durations, they'd be written + // together + assert_eq!(files.len(), 1); + + let first_file = get_file(&files, ParquetFileId::from(0)).unwrap(); + assert_eq!(first_file.chunk_time, 0); + assert_eq!(first_file.timestamp_min_max().min, 0); + assert_eq!(first_file.timestamp_min_max().max, 61_000_000_000); + assert_eq!(first_file.row_count, 2); + } + + #[test_log::test(tokio::test)] + async fn test_when_snapshot_serially_separate_gen_1_durations_are_written() { + let object_store: Arc = Arc::new(InMemory::new()); + let metrics = Arc::new(metric::Registry::default()); + + let parquet_store = + ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); + let exec = Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )); + let runtime_env = exec.new_context().inner().runtime_env(); + register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store)); + register_current_runtime_for_io(); + + let catalog = Arc::new(Catalog::new("hosta".into(), "foo".into())); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let persister = Arc::new(Persister::new( + Arc::clone(&object_store), + "hosta", + time_provider, + )); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + + let queryable_buffer_args = QueryableBufferArgs { + executor: Arc::clone(&exec), + catalog: Arc::clone(&catalog), + persister: Arc::clone(&persister), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + distinct_cache_provider: DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(), + persisted_files: Arc::new(PersistedFiles::new()), + parquet_cache: None, + gen1_duration: Gen1Duration::new_1m(), + max_size_per_parquet_file_bytes: 2_000, + }; + let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); + + let db = data_types::NamespaceName::new("testdb").unwrap(); + + // create the initial write with one tag + let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap(); + let lp = format!( + "foo,t1=a,t2=b f1=1i {}", + time_provider.now().timestamp_nanos() + ); + + let lines = val + .v1_parse_lines_and_update_schema( + &lp, + false, + time_provider.now(), + Precision::Nanosecond, + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(1), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + + // write the lp into the buffer + queryable_buffer.notify(Arc::new(wal_contents)).await; + + // create another write, this time with two tags, in a different gen1 block + let lp = "foo,t1=a,t2=b f1=1i,f2=2 61000000000"; + let val = WriteValidator::initialize(db, Arc::clone(&catalog), 0).unwrap(); + + let lines = val + .v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new(2), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + let end_time = + wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; + + let snapshot_sequence_number = SnapshotSequenceNumber::new(1); + let snapshot_details = SnapshotDetails { + snapshot_sequence_number, + end_time_marker: end_time, + first_wal_sequence_number: WalFileSequenceNumber::new(1), + last_wal_sequence_number: WalFileSequenceNumber::new(2), + forced: true, + }; + + let details = queryable_buffer + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) + .await; + let _details = details.await.unwrap(); + + // validate we have a single persisted file + let db = catalog.db_schema("testdb").unwrap(); + let table = db.table_definition("foo").unwrap(); + + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + debug!(?files, ">>> test: queryable buffer persisted files"); + assert_eq!(files.len(), 2); + + let first_file = get_file(&files, ParquetFileId::from(0)).unwrap(); + let second_file = get_file(&files, ParquetFileId::from(1)).unwrap(); + assert_eq!(first_file.chunk_time, 0); + assert_eq!(first_file.timestamp_min_max().min, 0); + assert_eq!(first_file.timestamp_min_max().max, 0); + assert_eq!(first_file.row_count, 1); + + assert_eq!(second_file.chunk_time, 60_000_000_000); + assert_eq!(second_file.timestamp_min_max().min, 61_000_000_000); + assert_eq!(second_file.timestamp_min_max().max, 61_000_000_000); + assert_eq!(first_file.row_count, 1); + } + + #[test_log::test(tokio::test)] + async fn test_snapshot_serially_two_tables_with_varying_throughput() { + let object_store: Arc = Arc::new(InMemory::new()); + let metrics = Arc::new(metric::Registry::default()); + + let parquet_store = + ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); + let exec = Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + // Default to 1gb + mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) + }, + DedicatedExecutor::new_testing(), + )); + let runtime_env = exec.new_context().inner().runtime_env(); + register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store)); + register_current_runtime_for_io(); + + let catalog = Arc::new(Catalog::new("hosta".into(), "foo".into())); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let persister = Arc::new(Persister::new( + Arc::clone(&object_store), + "hosta", + time_provider, + )); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + + let queryable_buffer_args = QueryableBufferArgs { + executor: Arc::clone(&exec), + catalog: Arc::clone(&catalog), + persister: Arc::clone(&persister), + last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(), + distinct_cache_provider: DistinctCacheProvider::new_from_catalog( + Arc::clone(&time_provider), + Arc::clone(&catalog), + ) + .unwrap(), + persisted_files: Arc::new(PersistedFiles::new()), + parquet_cache: None, + gen1_duration: Gen1Duration::new_1m(), + max_size_per_parquet_file_bytes: 100_000, + }; + let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); + + let db = data_types::NamespaceName::new("testdb").unwrap(); + + for i in 0..2 { + // create another write, this time with two tags, in a different gen1 block + let ts = Gen1Duration::new_1m().as_duration().as_nanos() as i64 + (i * 240_000_000_000); + let lp = format!("foo,t1=a,t2=b f1=3i,f2=3 {}", ts); + debug!(?lp, ">>> writing line"); + let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap(); + + let lines = val + .v1_parse_lines_and_update_schema( + lp.as_str(), + false, + time_provider.now(), + Precision::Nanosecond, + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new((i + 10) as u64), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + queryable_buffer.notify(Arc::new(wal_contents)).await; + } + + let mut max_timestamp_ns = None; + for i in 0..10 { + // create another write, this time with two tags, in a different gen1 block + let ts = Gen1Duration::new_1m().as_duration().as_nanos() as i64 + (i * 240_000_000_000); + // let line = format!("bar,t1=a,t2=b f1=3i,f2=3 {}", ts); + let lp = format!( + "bar,t1=a,t2=b f1=3i,f2=3 {}\nbar,t1=a,t2=c f1=4i,f2=3 {}\nbar,t1=ab,t2=b f1=5i,f2=3 {}", + ts, ts, ts + ); + debug!(?lp, ">>> writing line"); + let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap(); + + let lines = val + .v1_parse_lines_and_update_schema( + lp.as_str(), + false, + time_provider.now(), + Precision::Nanosecond, + ) + .unwrap() + .convert_lines_to_buffer(Gen1Duration::new_1m()); + let batch: WriteBatch = lines.into(); + max_timestamp_ns = Some(batch.max_time_ns); + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: batch.min_time_ns, + max_timestamp_ns: batch.max_time_ns, + wal_file_number: WalFileSequenceNumber::new((i + 10) as u64), + ops: vec![WalOp::Write(batch)], + snapshot: None, + }; + queryable_buffer.notify(Arc::new(wal_contents)).await; + } + + let end_time = + max_timestamp_ns.unwrap() + Gen1Duration::new_1m().as_duration().as_nanos() as i64; + + let snapshot_sequence_number = SnapshotSequenceNumber::new(1); + let snapshot_details = SnapshotDetails { + snapshot_sequence_number, + end_time_marker: end_time, + first_wal_sequence_number: WalFileSequenceNumber::new(0), + last_wal_sequence_number: WalFileSequenceNumber::new(9), + forced: true, + }; + + let wal_contents = WalContents { + persist_timestamp_ms: 0, + min_timestamp_ns: 0, + max_timestamp_ns: max_timestamp_ns.unwrap(), + wal_file_number: WalFileSequenceNumber::new(11), + snapshot: None, + ops: vec![WalOp::Noop(NoopDetails { + timestamp_ns: end_time, + })], + }; + let details = queryable_buffer + .notify_and_snapshot(Arc::new(wal_contents), snapshot_details) + .await; + let _details = details.await.unwrap(); + + let db = catalog.db_schema("testdb").unwrap(); + let table = db.table_definition("foo").unwrap(); + + // foo had 2 writes - should write single file when force snapshotted, with both rows in + // them even though they are in two separate chunks + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + debug!(?files, ">>> test: queryable buffer persisted files"); + assert_eq!(1, files.len()); + for foo_file in files { + assert_eq!(2, foo_file.row_count); + } + + // bar had 10 writes with 3 lines, should write 4 files each with 9 rows or 3 row in them + let table = db.table_definition("bar").unwrap(); + let files = queryable_buffer + .persisted_files + .get_files(db.id, table.table_id); + debug!(?files, ">>> test: queryable buffer persisted files"); + assert_eq!(4, files.len()); + for bar_file in files { + debug!(?bar_file, ">>> test: bar_file"); + assert!(bar_file.row_count == 3 || bar_file.row_count == 9); + } + } + + fn get_file(files: &[ParquetFile], parquet_file_id: ParquetFileId) -> Option<&ParquetFile> { + files.iter().find(|file| file.id == parquet_file_id) + } } diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index 8424fcb6544..006198d7c97 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -14,13 +14,13 @@ use influxdb3_wal::{FieldData, Row}; use observability_deps::tracing::error; use schema::sort::SortKey; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; -use std::collections::{BTreeMap, LinkedList}; use std::mem::size_of; use std::sync::Arc; use std::{ - collections::btree_map::Entry, - slice::Iter, + collections::BTreeMap, + mem::{self}, }; +use std::{collections::btree_map::Entry, slice::Iter}; use thiserror::Error; use crate::ChunkFilter; @@ -38,9 +38,7 @@ pub(crate) type Result = std::result::Result; pub struct TableBuffer { pub(crate) chunk_time_to_chunks: BTreeMap, - // pub(crate) snapshotting_chunks: SnapshotChunksContainer, - // pub(crate) snapshotting_chunks: Vec, - pub(crate) snapshotting_chunks: LinkedList, + pub(crate) snapshotting_chunks: Vec, pub(crate) sort_key: SortKey, } @@ -48,8 +46,7 @@ impl TableBuffer { pub fn new(sort_key: SortKey) -> Self { Self { chunk_time_to_chunks: BTreeMap::default(), - // snapshotting_chunks: SnapshotChunksContainer::new(), - snapshotting_chunks: LinkedList::new(), + snapshotting_chunks: Vec::new(), sort_key, } } @@ -164,40 +161,40 @@ impl TableBuffer { .collect::>() } - pub fn snapshot_lazy( + pub fn snapshot( &mut self, table_def: Arc, older_than_chunk_time: i64, - ) -> impl Iterator + use<'_> { - let keys_to_remove = self.chunk_time_to_chunks + ) -> Vec { + let keys_to_remove = self + .chunk_time_to_chunks .keys() .filter(|k| **k < older_than_chunk_time) .copied() .collect::>(); - keys_to_remove.into_iter().map(move |chunk_time| { - let chunk = self.chunk_time_to_chunks.remove(&chunk_time).unwrap(); - let timestamp_min_max = chunk.timestamp_min_max(); - let (schema, record_batch) = chunk.into_schema_record_batch(Arc::clone(&table_def)); + self.snapshotting_chunks = keys_to_remove + .into_iter() + .map(|chunk_time| { + let chunk = self.chunk_time_to_chunks.remove(&chunk_time).unwrap(); + let timestamp_min_max = chunk.timestamp_min_max(); + let (schema, record_batch) = chunk.into_schema_record_batch(Arc::clone(&table_def)); + + SnapshotChunk { + chunk_time, + timestamp_min_max, + record_batch, + schema, + } + }) + .collect::>(); - SnapshotChunk { - chunk_time, - timestamp_min_max, - record_batch, - schema, - } - }) + self.snapshotting_chunks.clone() } - // pub fn add_snapshot_chunks(&self, snapshot_chunks: Vec) { - // self.snapshotting_chunks.add(snapshot_chunks); - // } - // - // pub fn clear_snapshots(&self) { - // self.snapshotting_chunks.clear_all(); - // } pub fn clear_snapshots(&mut self) { - self.snapshotting_chunks.clear(); + // vec clear still holds the mem (capacity), so use take + let _ = mem::take(&mut self.snapshotting_chunks); } } @@ -228,53 +225,6 @@ impl Iterator for SnaphotChunkIter<'_> { } } -pub(crate) struct SnapshotChunksContainer { - chunks: parking_lot::Mutex>, -} - -impl SnapshotChunksContainer { - fn new() -> Self { - Self { - chunks: parking_lot::Mutex::new(vec![]), - } - } - - pub(crate) fn add_one(&self, chunk: SnapshotChunk) { - self.chunks.lock().push(chunk); - } - - pub(crate) fn add(&self, chunks: Vec) { - let mut all_chunks = self.chunks.lock(); - *all_chunks = chunks; - } - - fn clear_all(&self) { - self.chunks.lock().clear(); - } - - fn find_min_max(&self, current_min: i64, current_max: i64) -> TimestampMinMax { - let mut timestamp_min_max = TimestampMinMax::new(current_min, current_max); - - for sc in self.chunks.lock().iter() { - timestamp_min_max = timestamp_min_max.union(&sc.timestamp_min_max); - } - - timestamp_min_max - } - - fn as_filtered_vec(&self, filter: &ChunkFilter<'_>) -> Vec { - // TODO: find an alternate impl for this - self.chunks - .lock() - .iter() - .filter(|sc| { - filter.test_time_stamp_min_max(sc.timestamp_min_max.min, sc.timestamp_min_max.max) - }) - .cloned() - .collect() - } -} - #[derive(Debug, Clone)] pub struct SnapshotChunk { pub(crate) chunk_time: i64, @@ -545,7 +495,7 @@ impl MutableTableChunk { } } -fn array_ref_nulls_for_type(data_type: InfluxColumnType, len: usize) -> ArrayRef { +pub(crate) fn array_ref_nulls_for_type(data_type: InfluxColumnType, len: usize) -> ArrayRef { match data_type { InfluxColumnType::Field(InfluxFieldType::Boolean) => { let mut builder = BooleanBuilder::new(); @@ -689,10 +639,8 @@ mod tests { use super::*; use arrow_util::assert_batches_sorted_eq; use data_types::NamespaceName; - use datafusion::prelude::{col, lit_timestamp_nano, Expr}; + use datafusion::prelude::{Expr, col, lit_timestamp_nano}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; - // use influxdb_line_protocol::v3::SeriesKey; - // use iox_query::test; use iox_time::Time; struct TestWriter { @@ -904,18 +852,4 @@ mod tests { assert_batches_sorted_eq!(t.expected_output, &batches); } } - - #[test] - fn test_drain_filter() { - // let map: BTreeMap = BTreeMap::new(); - // let keys_to_remove = vec![1, 2, 3]; - // let columns = vec![(ColumnId::new(), Arc::from("region"), InfluxColumnType::Tag)]; - // let series_key = SeriesKey::new(); - // let table_def = TableDefinition::new(TableId::new(0), Arc::from("foo"), columns, series_key); - // let drain_filter = DrainFilter { - // keys_to_remove: keys_to_remove.iter(), - // map: &mut map, - // table_def: , - // }; - } } From 8d268aa15fbc47d9804c7b7472e87f61f728d87f Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Fri, 28 Feb 2025 10:17:34 +0000 Subject: [PATCH 6/8] feat: uses ensure_schema as discussed - extra debug logs added - test fixes --- influxdb3_catalog/src/catalog.rs | 10 +- influxdb3_server/src/query_executor/mod.rs | 32 ++--- influxdb3_write/Cargo.toml | 1 + .../src/write_buffer/queryable_buffer.rs | 129 ++++-------------- 4 files changed, 51 insertions(+), 121 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 97478bbb504..f45e70b2854 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -18,7 +18,7 @@ use influxdb3_wal::{ use iox_time::Time; use observability_deps::tracing::{debug, info, warn}; use parking_lot::RwLock; -use schema::{Schema, SchemaBuilder}; +use schema::{Schema, SchemaBuilder, sort::SortKey}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::cmp::Ordering; @@ -1137,6 +1137,14 @@ impl TableDefinition { pub fn series_key_names(&self) -> &[Arc] { &self.series_key_names } + + pub fn sort_key(&self) -> SortKey { + let cols = self + .series_key + .iter() + .map(|c| Arc::clone(&self.column_id_to_name_unchecked(c))); + SortKey::from_columns(cols) + } } trait TableUpdate { diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 5be9b8b52b4..321fc249e87 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -927,9 +927,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", + "| cpu | 2105 | 3 | 0 | 20 |", + "| cpu | 2105 | 3 | 30 | 50 |", + "| cpu | 2105 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -942,9 +942,9 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| mem | 1961 | 3 | 0 | 20 |", - "| mem | 1961 | 3 | 30 | 50 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| mem | 2105 | 3 | 0 | 20 |", + "| mem | 2105 | 3 | 30 | 50 |", + "| mem | 2105 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -956,12 +956,12 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", - "| mem | 1961 | 3 | 0 | 20 |", - "| mem | 1961 | 3 | 30 | 50 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| cpu | 2105 | 3 | 0 | 20 |", + "| cpu | 2105 | 3 | 30 | 50 |", + "| cpu | 2105 | 3 | 60 | 80 |", + "| mem | 2105 | 3 | 0 | 20 |", + "| mem | 2105 | 3 | 30 | 50 |", + "| mem | 2105 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, @@ -974,10 +974,10 @@ mod tests { "+------------+------------+-----------+----------+----------+", "| table_name | size_bytes | row_count | min_time | max_time |", "+------------+------------+-----------+----------+----------+", - "| cpu | 1961 | 3 | 0 | 20 |", - "| cpu | 1961 | 3 | 30 | 50 |", - "| cpu | 1961 | 3 | 60 | 80 |", - "| mem | 1961 | 3 | 60 | 80 |", + "| cpu | 2105 | 3 | 0 | 20 |", + "| cpu | 2105 | 3 | 30 | 50 |", + "| cpu | 2105 | 3 | 60 | 80 |", + "| mem | 2105 | 3 | 60 | 80 |", "+------------+------------+-----------+----------+----------+", ], }, diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index ac869224b01..76feb9aa536 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # Core Crates +arrow_util.workspace = true data_types.workspace = true datafusion_util.workspace = true executor.workspace = true diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 4109e8f7206..b8b26ab1e04 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -1,11 +1,12 @@ +use crate::paths::ParquetFilePath; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::table_buffer::TableBuffer; use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot}; use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter}; -use crate::{paths::ParquetFilePath, write_buffer::table_buffer::array_ref_nulls_for_type}; use anyhow::Context; use arrow::record_batch::RecordBatch; +use arrow_util::util::ensure_schema; use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TimestampMinMax, @@ -312,6 +313,7 @@ impl QueryableBuffer { catalog.sequence_number(), ); + debug!(num = ?persist_jobs.len(), ">>> number of persist jobs before grouping forced"); let iterator = PersistJobGroupedIterator::new( &persist_jobs, Arc::from(persister.node_identifier_prefix()), @@ -334,6 +336,7 @@ impl QueryableBuffer { persisted_snapshot } else { // persist the individual files, building the snapshot as we go + debug!(num = ?persist_jobs.len(), ">>> number of persist jobs before grouping not forced"); let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( persister.node_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, @@ -743,12 +746,7 @@ impl BufferState { let table_def = db_schema .table_definition_by_id(table_id) .expect("table should exist"); - let sort_key = table_def - .series_key - .iter() - .map(|c| Arc::clone(&table_def.column_id_to_name_unchecked(c))); - - TableBuffer::new(SortKey::from_columns(sort_key)) + TableBuffer::new(table_def.sort_key()) }); for (chunk_time, chunk) in &table_chunks.chunk_time_to_chunk { table_buffer.buffer_chunk(*chunk_time, &chunk.rows); @@ -825,60 +823,6 @@ impl<'a> PersistJobGroupedIterator<'a> { ), } } - - fn check_and_align_schema_mismatches_between_batches( - &mut self, - current_data: &PersistJob, - all_batches: &mut [RecordBatch], - all_schemas: Vec, - ) -> Option<()> { - let table_defn = self - .catalog - .db_schema_by_id(¤t_data.database_id)? - .table_definition_by_id(¤t_data.table_id)?; - let expected_schema = table_defn.schema.clone(); - let batches_with_schema_mismatch = - find_batches_with_mismatching_schemas(all_batches, &all_schemas, &expected_schema); - - if !batches_with_schema_mismatch.is_empty() { - // we need to add the missing fields - as schema changes are additive, when there is - // a mismatch it means new column has been added to table but the batches are missing - // them. - for (idx, batch) in &batches_with_schema_mismatch { - let mut cols = vec![]; - // pick it's current iox schema, to add the columns (making null for missing) - let outdated_batch_schema = &all_schemas[*idx]; - debug!( - ?outdated_batch_schema, - ">>> outdated batch schema when aligning mismatched schema" - ); - for col_idx_with_field_details in expected_schema.iter().enumerate() { - let (col_idx, (influx_col_type, field)) = col_idx_with_field_details; - let batch_field = outdated_batch_schema.field_by_name(field.name()); - let len = batch.columns()[0].len(); - if batch_field.is_some() { - let col = Arc::clone(&batch.columns()[col_idx]); - cols.push(col); - } else { - let null_array_col = array_ref_nulls_for_type(influx_col_type, len); - cols.push(null_array_col); - } - } - - let new_arrow_schema = expected_schema.as_arrow(); - debug!( - ?new_arrow_schema, - ">>> new arrow schema for batch when aligning mismatched schema" - ); - let new_rec_batch = RecordBatch::try_new(new_arrow_schema, cols).expect( - "record batch to be created with new schema after fixing schema mismatch", - ); - - let _ = std::mem::replace(&mut all_batches[*idx], new_rec_batch); - } - }; - Some(()) - } } impl Iterator for PersistJobGroupedIterator<'_> { @@ -895,9 +839,7 @@ impl Iterator for PersistJobGroupedIterator<'_> { let mut ts_min_max = current_data.timestamp_min_max; let mut all_batches = Vec::with_capacity(self.chunk_size); - let mut all_iox_schemas = Vec::with_capacity(self.chunk_size); all_batches.extend_from_slice(¤t_data.batches); - all_iox_schemas.push(current_data.iox_schema.clone()); let mut min_chunk_time = current_data.chunk_time; let mut current_size_bytes = current_data.total_batch_size(); @@ -932,7 +874,6 @@ impl Iterator for PersistJobGroupedIterator<'_> { min_chunk_time = min_chunk_time.min(next.chunk_time); current_size_bytes += next.total_batch_size(); all_batches.extend_from_slice(&next.batches); - all_iox_schemas.push(next.iox_schema.clone()); } else { break; } @@ -941,11 +882,20 @@ impl Iterator for PersistJobGroupedIterator<'_> { } } - self.check_and_align_schema_mismatches_between_batches( - current_data, - &mut all_batches, - all_iox_schemas, - )?; + let table_defn = self + .catalog + .db_schema_by_id(¤t_data.database_id)? + .table_definition_by_id(¤t_data.table_id)?; + + let all_schema_aligned_batches: Vec = all_batches + .iter() + .map(|batch| { + ensure_schema(&table_defn.schema.as_arrow(), batch) + // TODO: are there chances this could result in error - what does it mean if it + // did? + .expect("batches should have same schema") + }) + .collect(); Some(PersistJob { database_id: current_data.database_id, @@ -962,45 +912,14 @@ impl Iterator for PersistJobGroupedIterator<'_> { ), table_name: Arc::clone(¤t_data.table_name), chunk_time: min_chunk_time, - batches: all_batches, - iox_schema: current_data.iox_schema.clone(), + batches: all_schema_aligned_batches, + iox_schema: table_defn.schema.clone(), timestamp_min_max: ts_min_max, - sort_key: current_data.sort_key.clone(), + sort_key: table_defn.sort_key(), }) } } -fn find_batches_with_mismatching_schemas( - all_batches: &mut [RecordBatch], - all_schemas: &[IoxSchema], - expected_schema: &IoxSchema, -) -> Vec<(usize, RecordBatch)> { - let batches_with_schema_mismatch: Vec<(usize, RecordBatch)> = all_batches - .iter() - .cloned() - .enumerate() - .filter(|(idx, _)| { - let schema = &all_schemas[*idx]; - for field_1 in expected_schema.iter() { - let mut found_field = false; - for field_2 in schema.iter() { - if field_1.1.name() == field_2.1.name() { - found_field = true; - break; - } - } - - if !found_field { - return true; - } - } - false - }) - .collect(); - - batches_with_schema_mismatch -} - pub(crate) struct SortDedupePersistSummary { pub file_size_bytes: u64, pub file_meta_data: FileMetaData, @@ -1028,11 +947,13 @@ async fn sort_dedupe_persist( .map(|batch| batch.num_rows()) .sum(); info!( - "Persisting {} rows for db id {} and table id {} and chunk {} to file {}", + "Persisting {} rows for db id {} and table id {} and chunk {} and ts min {} max {} to file {}", row_count, persist_job.database_id, persist_job.table_id, persist_job.chunk_time, + persist_job.timestamp_min_max.min, + persist_job.timestamp_min_max.max, persist_job.path.to_string() ); From 19c29abf8dfb2f0d0da0a9dfa98fb5cd8fa83235 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Mon, 3 Mar 2025 12:25:37 +0000 Subject: [PATCH 7/8] refactor: constrain only the parallel runs --- influxdb3_write/src/write_buffer/queryable_buffer.rs | 6 +++--- influxdb3_write/src/write_buffer/table_buffer.rs | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index b8b26ab1e04..66a234b8f68 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -204,14 +204,14 @@ impl QueryableBuffer { for (database_id, table_map) in buffer.db_to_table.iter_mut() { let db_schema = catalog.db_schema_by_id(database_id).expect("db exists"); for (table_id, table_buffer) in table_map.iter_mut() { - info!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table"); + debug!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table"); let table_def = db_schema .table_definition_by_id(table_id) .expect("table exists"); let sort_key = table_buffer.sort_key.clone(); let all_keys_to_remove = table_buffer.get_keys_to_remove(snapshot_details.end_time_marker); - info!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); + debug!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); let chunk_time_to_chunk = &mut table_buffer.chunk_time_to_chunks; let snapshot_chunks = &mut table_buffer.snapshotting_chunks; @@ -350,7 +350,7 @@ impl QueryableBuffer { wal_file_number, Arc::clone(&catalog), gen1_duration.as_10m() as usize, - Some(max_size_per_parquet_file), + None, ); sort_dedupe_parallel( diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index 006198d7c97..4380863a20f 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -80,7 +80,6 @@ impl TableBuffer { ) -> Result)>> { let mut batches = HashMap::new(); let schema = table_def.schema.as_arrow(); - // for sc in self.snapshotting_chunks.as_filtered_vec(filter) { for sc in self.snapshotting_chunks.iter().filter(|sc| { filter.test_time_stamp_min_max(sc.timestamp_min_max.min, sc.timestamp_min_max.max) }) { @@ -129,10 +128,9 @@ impl TableBuffer { (a_min.min(b_min), a_max.max(b_max)) }) }; - // self.snapshotting_chunks.find_min_max(min, max) let mut timestamp_min_max = TimestampMinMax::new(min, max); - for sc in self.snapshotting_chunks.iter() { + for sc in &self.snapshotting_chunks { timestamp_min_max = timestamp_min_max.union(&sc.timestamp_min_max); } From f4d3d737083e5a58b854b25edcb80dd44fac522b Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Thu, 6 Mar 2025 18:03:18 +0000 Subject: [PATCH 8/8] refactor: minor tidy ups --- .../src/write_buffer/queryable_buffer.rs | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 66a234b8f68..90f3843f6c0 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -29,7 +29,7 @@ use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics}; use iox_query::exec::Executor; use iox_query::frontend::reorg::ReorgPlanner; use object_store::path::Path; -use observability_deps::tracing::{debug, error, info}; +use observability_deps::tracing::{debug, error, info, trace}; use parking_lot::Mutex; use parking_lot::RwLock; use parquet::format::FileMetaData; @@ -204,14 +204,14 @@ impl QueryableBuffer { for (database_id, table_map) in buffer.db_to_table.iter_mut() { let db_schema = catalog.db_schema_by_id(database_id).expect("db exists"); for (table_id, table_buffer) in table_map.iter_mut() { - debug!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table"); + trace!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table"); let table_def = db_schema .table_definition_by_id(table_id) .expect("table exists"); let sort_key = table_buffer.sort_key.clone(); let all_keys_to_remove = table_buffer.get_keys_to_remove(snapshot_details.end_time_marker); - debug!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); + trace!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove"); let chunk_time_to_chunk = &mut table_buffer.chunk_time_to_chunks; let snapshot_chunks = &mut table_buffer.snapshotting_chunks; @@ -222,7 +222,7 @@ impl QueryableBuffer { }; for chunk in snapshot_chunks_iter { - debug!(">>> starting with new chunk"); + trace!(">>> starting with new chunk"); let table_name = db_schema.table_id_to_name(table_id).expect("table exists"); @@ -249,7 +249,7 @@ impl QueryableBuffer { }; persisting_chunks.push(persist_job); snapshot_chunks.push(chunk); - debug!(">>> finished with chunk"); + trace!(">>> finished with chunk"); } } } @@ -463,10 +463,9 @@ async fn sort_dedupe_parallel>( persisted_files: Arc, persisted_snapshot: Arc>, ) { - info!(">>> running sort/dedupe in parallel"); + info!("running sort/dedupe in parallel"); let mut set = JoinSet::new(); - // TODO: may be this concurrency level needs to be externalised let sempahore = Arc::new(Semaphore::new(5)); for persist_job in iterator { let persister = Arc::clone(persister); @@ -516,7 +515,7 @@ async fn sort_dedupe_serial>( persisted_files: Arc, persisted_snapshot: &mut PersistedSnapshot, ) { - info!(">>> running sort/dedupe serially"); + info!("running sort/dedupe serially"); for persist_job in iterator { let persister = Arc::clone(persister); @@ -792,6 +791,23 @@ impl PersistJob { } } +/// This iterator groups persist jobs together to create a single persist job out of it with the +/// combined record batches from all of them. By default it'll try to pick as many as 10 persist +/// jobs (gen1 defaults to 1m so groups 10 of them to get to 10m) whilst maintaining memory +/// bound. There are 2 places where it's called from, +/// - when forcing snapshot due to memory pressure +/// - normal snapshot (i.e based on snapshot tracker) +/// +/// In the force snapshot case, explicit memory bound is passed in (defaults to 100M), however a +/// single record batch may well exceed 100M (for 1m duration), if that happens then it will +/// naively try to do a sort/dedupe with a bigger chunk and this could very well OOM. There is no +/// dynamic behaviour to break down a bigger record batch and since this requires allocation to +/// create smaller record batches, there is a period of time where the bigger batch and the smaller +/// batches need to be in memory which has so far proven tricky. +/// +/// In the normal snapshot case, unlimited memory bound is set, but it will still only put together +/// 10 persist jobs, so in this case the assumption is there is still plenty of room for the +/// memory as at the point of invocation there is no memory pressure. struct PersistJobGroupedIterator<'a> { iter: Peekable>, host_prefix: Arc, @@ -826,10 +842,6 @@ impl<'a> PersistJobGroupedIterator<'a> { } impl Iterator for PersistJobGroupedIterator<'_> { - // This is a grouped persist job, since it includes exactly - // same fields with only difference being each job has a vec - // of batches, it's been reused for now. For clarity it might - // be better to have different types to represent this state type Item = PersistJob; fn next(&mut self) -> Option { @@ -845,19 +857,6 @@ impl Iterator for PersistJobGroupedIterator<'_> { let mut current_size_bytes = current_data.total_batch_size(); debug!(?current_size_bytes, table_name = ?current_data.table_name, ">>> current_size_bytes for table"); self.system.refresh_memory(); - // This is a very naive approach to keep mem bounded, if the first chunk - // is >50M we allow it to be processed but this may run into OOM. To - // address that, one approach is to break it further down but once a - // bigger record batch is built, breaking it down with mem bounds gets - // very tricky and fairly inconsistent as building smaller record batches - // require allocation. The point when mem was sampled to point when they - // are broken down into smaller batches the incoming traffic fills the - // buffer to a point the original mem sample isn't right. But mostly - // with even higher throughputs (having 1m gen 1 duration as default) - // this approach has been faring well, however once you hit very high - // throughputs whereby in 10s (force snapshot interval) - // it fills in enough mem to go past the allocated query buffer % - // (50% default) it will run into OOMs. let system_mem_bytes = self.system.free_memory() - 100_000_000; let max_size_bytes = self.max_size_bytes.min(system_mem_bytes); debug!( @@ -891,8 +890,6 @@ impl Iterator for PersistJobGroupedIterator<'_> { .iter() .map(|batch| { ensure_schema(&table_defn.schema.as_arrow(), batch) - // TODO: are there chances this could result in error - what does it mean if it - // did? .expect("batches should have same schema") }) .collect();