Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid oom snapshot #26043

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub struct Config {
#[clap(
long = "gen1-duration",
env = "INFLUXDB3_GEN1_DURATION",
default_value = "10m",
default_value = "1m",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting to 1m means there are more query chunks in QueryableBuffer (10 times more), but this hasn't been an issue so far.

action
)]
pub gen1_duration: Gen1Duration,
Expand Down Expand Up @@ -361,6 +361,16 @@ pub struct Config {
/// smaller time ranges if possible in a query.
#[clap(long = "query-file-limit", env = "INFLUXDB3_QUERY_FILE_LIMIT", action)]
pub query_file_limit: Option<usize>,

/// Threshold for internal buffer, can be either percentage or absolute value in MB.
/// eg: 70% or 1000 MB
#[clap(
long = "max-memory-for-snapshot",
env = "INFLUXDB3_MAX_MEMORY_FOR_SNAPSHOT",
default_value = "100",
action
)]
pub max_memory_for_snapshot: MemorySizeMb,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -569,6 +579,7 @@ pub async fn command(config: Config) -> Result<()> {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
max_memory_for_snapshot_bytes: config.max_memory_for_snapshot.as_num_bytes() as u64,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
10 changes: 9 additions & 1 deletion influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use influxdb3_wal::{
use iox_time::Time;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::RwLock;
use schema::{Schema, SchemaBuilder};
use schema::{Schema, SchemaBuilder, sort::SortKey};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::cmp::Ordering;
Expand Down Expand Up @@ -1137,6 +1137,14 @@ impl TableDefinition {
pub fn series_key_names(&self) -> &[Arc<str>] {
&self.series_key_names
}

pub fn sort_key(&self) -> SortKey {
let cols = self
.series_key
.iter()
.map(|c| Arc::clone(&self.column_id_to_name_unchecked(c)));
SortKey::from_columns(cols)
}
}

trait TableUpdate {
Expand Down
1 change: 1 addition & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ mod tests {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
},
)
.await
Expand Down
33 changes: 17 additions & 16 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 1,
query_file_limit,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -926,9 +927,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| cpu | 2105 | 3 | 0 | 20 |",
"| cpu | 2105 | 3 | 30 | 50 |",
"| cpu | 2105 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -941,9 +942,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| mem | 2105 | 3 | 0 | 20 |",
"| mem | 2105 | 3 | 30 | 50 |",
"| mem | 2105 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -955,12 +956,12 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| cpu | 2105 | 3 | 0 | 20 |",
"| cpu | 2105 | 3 | 30 | 50 |",
"| cpu | 2105 | 3 | 60 | 80 |",
"| mem | 2105 | 3 | 0 | 20 |",
"| mem | 2105 | 3 | 30 | 50 |",
"| mem | 2105 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand All @@ -973,10 +974,10 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"| cpu | 2105 | 3 | 0 | 20 |",
"| cpu | 2105 | 3 | 30 | 50 |",
"| cpu | 2105 | 3 | 60 | 80 |",
"| mem | 2105 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
Expand Down
12 changes: 11 additions & 1 deletion influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ impl Gen1Duration {
self.0.as_nanos() as i64
}

pub fn as_10m(&self) -> u64 {
let duration_secs = self.0.as_secs();
let ten_min_secs = 600;
if duration_secs >= ten_min_secs {
1
} else {
ten_min_secs / duration_secs
}
}

pub fn new_1m() -> Self {
Self(Duration::from_secs(60))
}
Expand Down Expand Up @@ -239,7 +249,7 @@ impl Default for Gen1Duration {

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct NoopDetails {
timestamp_ns: i64,
pub timestamp_ns: i64,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ workspace = true

[dependencies]
# Core Crates
arrow_util.workspace = true
data_types.workspace = true
datafusion_util.workspace = true
executor.workspace = true
Expand Down Expand Up @@ -63,6 +64,7 @@ serde_json.workspace = true
serde_with.workspace = true
sha2.workspace = true
snap.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
url.workspace = true
Expand Down
22 changes: 14 additions & 8 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub struct WriteBufferImplArgs {
pub metric_registry: Arc<Registry>,
pub snapshotted_wal_files_to_keep: u64,
pub query_file_limit: Option<usize>,
pub max_memory_for_snapshot_bytes: u64,
}

impl WriteBufferImpl {
Expand All @@ -190,6 +191,7 @@ impl WriteBufferImpl {
metric_registry,
snapshotted_wal_files_to_keep,
query_file_limit,
max_memory_for_snapshot_bytes,
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
Expand Down Expand Up @@ -221,6 +223,8 @@ impl WriteBufferImpl {
distinct_cache_provider: Arc::clone(&distinct_cache),
persisted_files: Arc::clone(&persisted_files),
parquet_cache: parquet_cache.clone(),
gen1_duration: wal_config.gen1_duration,
max_size_per_parquet_file_bytes: max_memory_for_snapshot_bytes,
}));

// create the wal instance, which will replay into the queryable buffer and start
Expand Down Expand Up @@ -1039,6 +1043,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -1134,6 +1139,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -1207,6 +1213,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap()
Expand Down Expand Up @@ -1454,6 +1461,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down Expand Up @@ -2051,7 +2059,7 @@ mod tests {
);
}

#[tokio::test]
#[test_log::test(tokio::test)]
async fn notifies_watchers_of_snapshot() {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (wbuf, _, _) = setup(
Expand Down Expand Up @@ -2721,10 +2729,7 @@ mod tests {
#[test_log::test(tokio::test)]
async fn test_out_of_order_data() {
let tmp_dir = test_helpers::tmp_dir().unwrap();
debug!(
?tmp_dir,
">>> using tmp dir for test_check_mem_and_force_snapshot"
);
debug!(?tmp_dir, ">>> using tmp dir");
let obj_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
let (write_buffer, _, _) = setup(
Expand Down Expand Up @@ -2795,6 +2800,9 @@ mod tests {
"| a | us | 1970-01-01T00:00:28Z | 10.0 |",
"| a | us | 1970-01-01T00:00:29Z | 10.0 |",
"| a | us | 1970-01-01T00:00:30Z | 10.0 |",
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
"| a | us | 1970-01-01T00:01:40Z | 10.0 |",
"| a | us | 1970-01-01T00:01:41Z | 10.0 |",
"| a | us | 1970-01-01T00:01:42Z | 10.0 |",
Expand All @@ -2807,9 +2815,6 @@ mod tests {
"| a | us | 1970-01-01T00:01:49Z | 10.0 |",
"| a | us | 1970-01-01T00:01:50Z | 10.0 |",
"| a | us | 1970-01-01T00:01:51Z | 10.0 |",
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
"+------+--------+----------------------+-------+",
],
&actual
Expand Down Expand Up @@ -3306,6 +3311,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
max_memory_for_snapshot_bytes: 100_000_000,
})
.await
.unwrap();
Expand Down
Loading