From 3bb256d5eb4e7ae4d553cd3e3a0e290bb5045880 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:18:49 -0500 Subject: [PATCH] refactor: improve dataset-level stats calculation, use a helper closure write_dataset_stat --- src/cmd/stats.rs | 99 ++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 7d8dc53d3..ae098be99 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR, lower/upper fences & skewness). -When computing “non-streaming” statistics, an Out-Of-Memory (OOM) heuristic check is done. +When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done. If the file is larger than the available memory minus a headroom buffer of 20% (which can be adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be preemptively prevented. @@ -794,46 +794,64 @@ pub fn run(argv: &[&str]) -> CliResult<()> { stats_br_vec.push(work_br); } - // add the dataset-level stats + // Add dataset-level stats as additional rows ==================== let num_stats_fields = stats_headers_sr.len(); let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields); - dataset_stats_br.push_field(b"_qsv_rowcount"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + + // Helper closure to write a dataset stat row + let write_dataset_stat = |name: &[u8], + value: &[u8], + br: &mut csv::ByteRecord, + wtr: &mut csv::Writer<_>| + -> CliResult<()> { + br.clear(); + br.push_field(name); + // Fill middle columns with empty strings + for _ in 2..num_stats_fields { + br.push_field(b""); + } + // write _qsv_value as last column + br.push_field(value); + wtr.write_byte_record(br).map_err(|e| e.into()) + }; + + // Write _qsv_rowcount let ds_record_count = itoa::Buffer::new() .format(*record_count) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_record_count); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_rowcount", + &ds_record_count, + &mut dataset_stats_br, + &mut wtr, + )?; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_columncount"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + // Write _qsv_columncount let ds_column_count = itoa::Buffer::new() .format(headers.len()) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_column_count); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_columncount", + &ds_column_count, + &mut dataset_stats_br, + &mut wtr, + )?; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_filesize_bytes"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } + // Write _qsv_filesize_bytes let ds_filesize_bytes = itoa::Buffer::new() .format(fs::metadata(&path)?.len()) .as_bytes() .to_owned(); - dataset_stats_br.push_field(&ds_filesize_bytes); - wtr.write_record(&dataset_stats_br)?; + write_dataset_stat( + b"_qsv_filesize_bytes", + &ds_filesize_bytes, + &mut dataset_stats_br, + &mut wtr, + )?; - // compute the hash using stats, instead of scanning the entire file - // so the performance is constant regardless of file size + // Compute hash of stats for data fingerprinting let stats_hash = { #[allow(deprecated)] // we use "deprecated" SipHasher explicitly instead of DefaultHasher, @@ -842,32 +860,29 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // DefaultHasher may change in future Rust versions let mut hasher = std::hash::BuildHasherDefault::::default().build_hasher(); + + // Hash the first 20 columns of each stats record + // we only do the first 20 stats columns to compute the hash as those + // columns are always the same, even if other stats --options are used for record in &stats_br_vec { - for (i, field) in record.iter().enumerate() { - // we only do the first 20 stats columns to compute the hash as those - // columns are always the same, even if other stats --options are used - if i >= 20 { - break; - } + for field in record.iter().take(20) { std::hash::Hash::hash(field, &mut hasher); } } - // we also add the dataset level stats to the hash - std::hash::Hash::hash(&ds_record_count, &mut hasher); - std::hash::Hash::hash(&ds_column_count, &mut hasher); - std::hash::Hash::hash(&ds_filesize_bytes, &mut hasher); + + // Include dataset-level stats in hash + for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] { + std::hash::Hash::hash(stat, &mut hasher); + } + std::hash::Hasher::finish(&hasher) }; - dataset_stats_br.clear(); - dataset_stats_br.push_field(b"_qsv_hash"); - for _ in 2..num_stats_fields { - dataset_stats_br.push_field(b""); - } - dataset_stats_br.push_field(itoa::Buffer::new().format(stats_hash).as_bytes()); - wtr.write_record(&dataset_stats_br)?; + // Write _qsv_hash + let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_owned(); + write_dataset_stat(b"_qsv_hash", &hash_bytes, &mut dataset_stats_br, &mut wtr)?; - // update the stats args json metadata + // update the stats args json metadata =============== current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; if create_cache