diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index b95aed857..a5ae3bfcd 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR, lower/upper fences & skewness). -When computing “non-streaming” statistics, an Out-Of-Memory (OOM) heuristic check is done. +When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done. If the file is larger than the available memory minus a headroom buffer of 20% (which can be adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be preemptively prevented. @@ -242,7 +242,9 @@ with ~500 tests. use std::{ default::Default, - fmt, fs, io, + fmt, fs, + hash::BuildHasher, + io, io::Write, iter::repeat, path::{Path, PathBuf}, @@ -769,6 +771,10 @@ pub fn run(argv: &[&str]) -> CliResult<()> { }?; let stats_sr_vec = args.stats_to_records(stats); + let mut work_br; + + // vec we use to compute dataset-level fingerprint hash + let mut stats_br_vec: Vec = Vec::with_capacity(stats_sr_vec.len()); let stats_headers_sr = args.stat_headers(); wtr.write_record(&stats_headers_sr)?; @@ -780,10 +786,85 @@ pub fn run(argv: &[&str]) -> CliResult<()> { header.to_vec() }; let stat = stat.iter().map(str::as_bytes); - wtr.write_record(vec![&*header].into_iter().chain(stat))?; + work_br = vec![&*header] + .into_iter() + .chain(stat) + .collect::(); + wtr.write_record(&work_br)?; + stats_br_vec.push(work_br); } - // update the stats args json metadata + // Add dataset-level stats as additional rows ==================== + let num_stats_fields = stats_headers_sr.len(); + let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields); + + // Helper closure to write a dataset stat row + let mut write_dataset_stat = |name: &[u8], value: &[u8]| -> CliResult<()> { + dataset_stats_br.clear(); + dataset_stats_br.push_field(name); + // Fill middle columns with empty strings + for _ in 2..num_stats_fields { + dataset_stats_br.push_field(b""); + } + // write qsv__value as last column + dataset_stats_br.push_field(value); + wtr.write_byte_record(&dataset_stats_br) + .map_err(std::convert::Into::into) + }; + + // Write qsv__rowcount + let ds_record_count = itoa::Buffer::new() + .format(*record_count) + .as_bytes() + .to_vec(); + write_dataset_stat(b"qsv__rowcount", &ds_record_count)?; + + // Write qsv__columncount + let ds_column_count = itoa::Buffer::new() + .format(headers.len()) + .as_bytes() + .to_vec(); + write_dataset_stat(b"qsv__columncount", &ds_column_count)?; + + // Write qsv__filesize_bytes + let ds_filesize_bytes = itoa::Buffer::new() + .format(fs::metadata(&path)?.len()) + .as_bytes() + .to_vec(); + write_dataset_stat(b"qsv__filesize_bytes", &ds_filesize_bytes)?; + + // Compute hash of stats for data fingerprinting + let stats_hash = { + #[allow(deprecated)] + // we use "deprecated" SipHasher explicitly instead of DefaultHasher, + // even though, it is the current DefaultHasher since Rust 1.7.0 + // as we want the hash to be deterministic and stable across Rust versions + // DefaultHasher may change in future Rust versions + let mut hasher = + std::hash::BuildHasherDefault::::default().build_hasher(); + + // Hash the first 20 columns of each stats record + // we only do the first 20 stats columns to compute the hash as those + // columns are always the same, even if other stats --options are used + for record in &stats_br_vec { + for field in record.iter().take(20) { + std::hash::Hash::hash(field, &mut hasher); + } + } + + // Include dataset-level stats in hash + for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] { + std::hash::Hash::hash(stat, &mut hasher); + } + + std::hash::Hasher::finish(&hasher) + }; + + // Write qsv__fingerprint_hash dataset + let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_vec(); + write_dataset_stat(b"qsv__fingerprint_hash", &hash_bytes)?; + + // update the stats args json metadata =============== current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64; if create_cache @@ -891,7 +972,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // save the stats data to ".stats.csv.data.jsonl" if write_stats_jsonl { stats_pathbuf.set_extension("data.jsonl"); - util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), stats_pathbuf)?; + util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), &stats_pathbuf)?; } } } @@ -1144,6 +1225,10 @@ impl Args { "antimode_occurrences", ]); } + + // we add the qsv__value field at the end for dataset-level stats + fields.push("qsv__value"); + csv::StringRecord::from(fields) } } @@ -1791,6 +1876,9 @@ impl Stats { // append it here to preserve legacy ordering of columns pieces.extend_from_slice(&mc_pieces); + // add an empty field for qsv__value + pieces.push(empty()); + csv::StringRecord::from(pieces) } } diff --git a/src/util.rs b/src/util.rs index 35bf70d8f..eb3b8fe2c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1921,6 +1921,8 @@ pub fn get_stats_records( args: &SchemaArgs, mode: StatsMode, ) -> CliResult<(ByteRecord, Vec)> { + const DATASET_STATS_PREFIX: &str = r#"{"field":"qsv__"#; + if mode == StatsMode::None || args.arg_input.is_none() || args.arg_input.as_ref() == Some(&"-".to_string()) @@ -1930,13 +1932,13 @@ pub fn get_stats_records( return Ok((ByteRecord::new(), Vec::new())); }; - let canonical_input_path = Path::new(&args.arg_input.clone().unwrap()).canonicalize()?; + let canonical_input_path = Path::new(args.arg_input.as_ref().unwrap()).canonicalize()?; let statsdata_path = canonical_input_path.with_extension("stats.csv.data.jsonl"); let stats_data_current = if statsdata_path.exists() { let statsdata_metadata = std::fs::metadata(&statsdata_path)?; - let input_metadata = std::fs::metadata(args.arg_input.clone().unwrap())?; + let input_metadata = std::fs::metadata(args.arg_input.as_ref().unwrap())?; let statsdata_mtime = FileTime::from_last_modification_time(&statsdata_metadata); let input_mtime = FileTime::from_last_modification_time(&input_metadata); @@ -1958,28 +1960,39 @@ pub fn get_stats_records( return Ok((ByteRecord::new(), Vec::new())); } + // get the headers from the input file + let mut rdr = csv::Reader::from_path(args.arg_input.as_ref().ok_or("No input provided")?)?; + let csv_fields = rdr.byte_headers()?.clone(); + drop(rdr); + let mut stats_data_loaded = false; - let mut csv_stats: Vec = Vec::new(); + let mut csv_stats: Vec = Vec::with_capacity(csv_fields.len()); // if stats_data file exists and is current, use it if stats_data_current && !args.flag_force { - let statsdata_file = std::fs::File::open(&statsdata_path)?; - let statsdata_reader = std::io::BufReader::new(statsdata_file); - let statsdata_lines = statsdata_reader.lines(); - - let mut line: String; - for curr_line in statsdata_lines { - line = curr_line?; - let stats_record: StatsData = serde_json::from_str(&line)?; - csv_stats.push(stats_record); + let statsdatajson_rdr = + BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdata_path)?); + + let mut curr_line: String; + let mut s_slice: Vec; + for line in statsdatajson_rdr.lines() { + curr_line = line?; + if curr_line.starts_with(DATASET_STATS_PREFIX) { + break; + } + s_slice = curr_line.as_bytes().to_vec(); + match simd_json::serde::from_slice(&mut **&mut s_slice) { + Ok(stats) => csv_stats.push(stats), + Err(_) => continue, + } } - stats_data_loaded = true; + stats_data_loaded = !csv_stats.is_empty(); } // otherwise, run stats command to generate stats.csv.data.jsonl file if !stats_data_loaded { let stats_args = crate::cmd::stats::Args { - arg_input: args.arg_input.clone(), + arg_input: args.arg_input.as_ref().map(String::from), flag_select: crate::select::SelectColumns::parse("").unwrap(), flag_everything: false, flag_typesonly: false, @@ -2010,13 +2023,10 @@ pub fn get_stats_records( .unwrap(); let tempfile_path = tempfile.path().to_str().unwrap().to_string(); - let statsdatajson_path = canonical_input_path.with_extension("stats.csv.data.jsonl"); + let statsdatajson_path = &canonical_input_path.with_extension("stats.csv.data.jsonl"); + + let input = stats_args.arg_input.unwrap_or_else(|| "-".to_string()); - let input = if let Some(arg_input) = stats_args.arg_input { - arg_input - } else { - "-".to_string() - }; // we do rustfmt::skip here as it was breaking the stats cmdline along strange // boundaries, causing CI errors. // This is because we're using tab characters (/t) to separate args to fix #2294, @@ -2041,8 +2051,7 @@ pub fn get_stats_records( // StatsMode::FrequencyForceStats // we're doing frequency, so we need cardinality from a --forced stats run format!( - "stats\t{input}\t--cardinality\t--stats-jsonl\t--force\ - \t--output\t{tempfile_path}" + "stats\t{input}\t--cardinality\t--stats-jsonl\t--force\t--output\t{tempfile_path}" ) }, #[cfg(feature = "polars")] @@ -2103,31 +2112,26 @@ pub fn get_stats_records( } // create a statsdatajon from the output of the stats command - csv_to_jsonl( - &tempfile_path, - &get_stats_data_types(), - statsdatajson_path.clone(), - )?; - - let statsdatajson_rdr = BufReader::with_capacity( - DEFAULT_RDR_BUFFER_CAPACITY * 2, - File::open(statsdatajson_path)?, - ); + csv_to_jsonl(&tempfile_path, &get_stats_data_types(), &statsdatajson_path)?; + + let statsdatajson_rdr = + BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdatajson_path)?); - let mut statsrecord: StatsData; let mut curr_line: String; + let mut s_slice: Vec; for line in statsdatajson_rdr.lines() { curr_line = line?; - statsrecord = serde_json::from_str(&curr_line)?; - csv_stats.push(statsrecord); + if curr_line.starts_with(DATASET_STATS_PREFIX) { + break; + } + s_slice = curr_line.as_bytes().to_vec(); + match simd_json::serde::from_slice(&mut **&mut s_slice) { + Ok(stats) => csv_stats.push(stats), + Err(_) => continue, + } } }; - // get the headers from the input file - let mut rdr = csv::Reader::from_path(args.arg_input.clone().unwrap()).unwrap(); - let csv_fields = rdr.byte_headers()?.clone(); - drop(rdr); - Ok((csv_fields, csv_stats)) } @@ -2136,7 +2140,7 @@ pub fn get_stats_records( pub fn csv_to_jsonl( input_csv: &str, csv_types: &[JsonTypes], - output_jsonl: PathBuf, + output_jsonl: &PathBuf, ) -> CliResult<()> { let file = File::open(input_csv)?; let mut rdr = csv::ReaderBuilder::new() diff --git a/tests/test_frequency.rs b/tests/test_frequency.rs index 84048f506..97e3ac6bb 100644 --- a/tests/test_frequency.rs +++ b/tests/test_frequency.rs @@ -535,6 +535,8 @@ fn frequency_all_unique_force_stats_cache() { .args(["--stats-mode", "force"]) .arg(testdata); + wrk.assert_success(&mut cmd); + let got: Vec> = wrk.read_stdout(&mut cmd); let expected = vec![ svec!["field", "value", "count", "percentage"], diff --git a/tests/test_index.rs b/tests/test_index.rs index ab70f35eb..a36a621ea 100644 --- a/tests/test_index.rs +++ b/tests/test_index.rs @@ -84,7 +84,8 @@ fn index_outdated_stats() { "cv", "nullcount", "max_precision", - "sparsity" + "sparsity", + "qsv__value", ], svec![ "letter", @@ -106,7 +107,8 @@ fn index_outdated_stats() { "", "0", "", - "0" + "0", + "", ], svec![ "number", @@ -128,7 +130,100 @@ fn index_outdated_stats() { "40.8248", "0", "", - "0" + "0", + "", + ], + svec![ + "qsv__rowcount", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "3" + ], + svec![ + "qsv__columncount", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "2" + ], + svec![ + "qsv__filesize_bytes", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "26" + ], + svec![ + "qsv__fingerprint_hash", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "7405440055837468253" ], ]; diff --git a/tests/test_json.rs b/tests/test_json.rs index 5f6f6bb8f..b7125e6c3 100644 --- a/tests/test_json.rs +++ b/tests/test_json.rs @@ -192,7 +192,7 @@ fn json_fruits_stats_slice_json() { // qsv stats fruits.csv let mut stats_cmd = wrk.command("stats"); - stats_cmd.arg(test_file); + stats_cmd.arg(test_file).arg("--force"); wrk.assert_success(&mut stats_cmd); diff --git a/tests/test_tojsonl.rs b/tests/test_tojsonl.rs index de3e79807..8afd6788a 100644 --- a/tests/test_tojsonl.rs +++ b/tests/test_tojsonl.rs @@ -20,6 +20,8 @@ fn tojsonl_simple() { let mut cmd = wrk.command("tojsonl"); cmd.arg("in.csv"); + wrk.assert_success(&mut cmd); + let got: String = wrk.stdout(&mut cmd); let expected = r#"{"id":1,"father":"Mark","mother":"Charlotte","oldest_child":"Tom","boy":true,"weight":150.2} {"id":2,"father":"John","mother":"Ann","oldest_child":"Jessika","boy":false,"weight":175.5}