Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dataset-level stats #2297

Merged
merged 11 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 93 additions & 5 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f
cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR,
lower/upper fences & skewness).

When computing non-streaming statistics, an Out-Of-Memory (OOM) heuristic check is done.
When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done.
If the file is larger than the available memory minus a headroom buffer of 20% (which can be
adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be
preemptively prevented.
Expand Down Expand Up @@ -242,7 +242,9 @@ with ~500 tests.

use std::{
default::Default,
fmt, fs, io,
fmt, fs,
hash::BuildHasher,
io,
io::Write,
iter::repeat,
path::{Path, PathBuf},
Expand Down Expand Up @@ -769,6 +771,10 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}?;

let stats_sr_vec = args.stats_to_records(stats);
let mut work_br;

// vec we use to compute dataset-level fingerprint hash
let mut stats_br_vec: Vec<csv::ByteRecord> = Vec::with_capacity(stats_sr_vec.len());

let stats_headers_sr = args.stat_headers();
wtr.write_record(&stats_headers_sr)?;
Expand All @@ -780,10 +786,85 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
header.to_vec()
};
let stat = stat.iter().map(str::as_bytes);
wtr.write_record(vec![&*header].into_iter().chain(stat))?;
work_br = vec![&*header]
.into_iter()
.chain(stat)
.collect::<csv::ByteRecord>();
wtr.write_record(&work_br)?;
stats_br_vec.push(work_br);
}

// update the stats args json metadata
// Add dataset-level stats as additional rows ====================
let num_stats_fields = stats_headers_sr.len();
let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields);

// Helper closure to write a dataset stat row
let mut write_dataset_stat = |name: &[u8], value: &[u8]| -> CliResult<()> {
dataset_stats_br.clear();
dataset_stats_br.push_field(name);
// Fill middle columns with empty strings
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}
// write qsv__value as last column
dataset_stats_br.push_field(value);
wtr.write_byte_record(&dataset_stats_br)
.map_err(std::convert::Into::into)
};

// Write qsv__rowcount
let ds_record_count = itoa::Buffer::new()
.format(*record_count)
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__rowcount", &ds_record_count)?;

// Write qsv__columncount
let ds_column_count = itoa::Buffer::new()
.format(headers.len())
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__columncount", &ds_column_count)?;

// Write qsv__filesize_bytes
let ds_filesize_bytes = itoa::Buffer::new()
.format(fs::metadata(&path)?.len())
.as_bytes()
.to_vec();
write_dataset_stat(b"qsv__filesize_bytes", &ds_filesize_bytes)?;

// Compute hash of stats for data fingerprinting
let stats_hash = {
#[allow(deprecated)]
// we use "deprecated" SipHasher explicitly instead of DefaultHasher,
// even though, it is the current DefaultHasher since Rust 1.7.0
// as we want the hash to be deterministic and stable across Rust versions
// DefaultHasher may change in future Rust versions
let mut hasher =
std::hash::BuildHasherDefault::<std::hash::SipHasher>::default().build_hasher();

// Hash the first 20 columns of each stats record
// we only do the first 20 stats columns to compute the hash as those
// columns are always the same, even if other stats --options are used
for record in &stats_br_vec {
for field in record.iter().take(20) {
std::hash::Hash::hash(field, &mut hasher);
}
}

// Include dataset-level stats in hash
for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] {
std::hash::Hash::hash(stat, &mut hasher);
}

std::hash::Hasher::finish(&hasher)
};

// Write qsv__fingerprint_hash dataset
let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_vec();
write_dataset_stat(b"qsv__fingerprint_hash", &hash_bytes)?;

// update the stats args json metadata ===============
current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64;

if create_cache
Expand Down Expand Up @@ -891,7 +972,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// save the stats data to "<FILESTEM>.stats.csv.data.jsonl"
if write_stats_jsonl {
stats_pathbuf.set_extension("data.jsonl");
util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), stats_pathbuf)?;
util::csv_to_jsonl(&currstats_filename, &get_stats_data_types(), &stats_pathbuf)?;
}
}
}
Expand Down Expand Up @@ -1144,6 +1225,10 @@ impl Args {
"antimode_occurrences",
]);
}

// we add the qsv__value field at the end for dataset-level stats
fields.push("qsv__value");

csv::StringRecord::from(fields)
}
}
Expand Down Expand Up @@ -1791,6 +1876,9 @@ impl Stats {
// append it here to preserve legacy ordering of columns
pieces.extend_from_slice(&mc_pieces);

// add an empty field for qsv__value
pieces.push(empty());

csv::StringRecord::from(pieces)
}
}
Expand Down
86 changes: 45 additions & 41 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,8 @@ pub fn get_stats_records(
args: &SchemaArgs,
mode: StatsMode,
) -> CliResult<(ByteRecord, Vec<StatsData>)> {
const DATASET_STATS_PREFIX: &str = r#"{"field":"qsv__"#;

if mode == StatsMode::None
|| args.arg_input.is_none()
|| args.arg_input.as_ref() == Some(&"-".to_string())
Expand All @@ -1930,13 +1932,13 @@ pub fn get_stats_records(
return Ok((ByteRecord::new(), Vec::new()));
};

let canonical_input_path = Path::new(&args.arg_input.clone().unwrap()).canonicalize()?;
let canonical_input_path = Path::new(args.arg_input.as_ref().unwrap()).canonicalize()?;
let statsdata_path = canonical_input_path.with_extension("stats.csv.data.jsonl");

let stats_data_current = if statsdata_path.exists() {
let statsdata_metadata = std::fs::metadata(&statsdata_path)?;

let input_metadata = std::fs::metadata(args.arg_input.clone().unwrap())?;
let input_metadata = std::fs::metadata(args.arg_input.as_ref().unwrap())?;

let statsdata_mtime = FileTime::from_last_modification_time(&statsdata_metadata);
let input_mtime = FileTime::from_last_modification_time(&input_metadata);
Expand All @@ -1958,28 +1960,39 @@ pub fn get_stats_records(
return Ok((ByteRecord::new(), Vec::new()));
}

// get the headers from the input file
let mut rdr = csv::Reader::from_path(args.arg_input.as_ref().ok_or("No input provided")?)?;
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

let mut stats_data_loaded = false;
let mut csv_stats: Vec<StatsData> = Vec::new();
let mut csv_stats: Vec<StatsData> = Vec::with_capacity(csv_fields.len());

// if stats_data file exists and is current, use it
if stats_data_current && !args.flag_force {
let statsdata_file = std::fs::File::open(&statsdata_path)?;
let statsdata_reader = std::io::BufReader::new(statsdata_file);
let statsdata_lines = statsdata_reader.lines();

let mut line: String;
for curr_line in statsdata_lines {
line = curr_line?;
let stats_record: StatsData = serde_json::from_str(&line)?;
csv_stats.push(stats_record);
let statsdatajson_rdr =
BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdata_path)?);

let mut curr_line: String;
let mut s_slice: Vec<u8>;
for line in statsdatajson_rdr.lines() {
curr_line = line?;
if curr_line.starts_with(DATASET_STATS_PREFIX) {
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
}
stats_data_loaded = true;
stats_data_loaded = !csv_stats.is_empty();
}

// otherwise, run stats command to generate stats.csv.data.jsonl file
if !stats_data_loaded {
let stats_args = crate::cmd::stats::Args {
arg_input: args.arg_input.clone(),
arg_input: args.arg_input.as_ref().map(String::from),
flag_select: crate::select::SelectColumns::parse("").unwrap(),
flag_everything: false,
flag_typesonly: false,
Expand Down Expand Up @@ -2010,13 +2023,10 @@ pub fn get_stats_records(
.unwrap();
let tempfile_path = tempfile.path().to_str().unwrap().to_string();

let statsdatajson_path = canonical_input_path.with_extension("stats.csv.data.jsonl");
let statsdatajson_path = &canonical_input_path.with_extension("stats.csv.data.jsonl");

let input = stats_args.arg_input.unwrap_or_else(|| "-".to_string());

let input = if let Some(arg_input) = stats_args.arg_input {
arg_input
} else {
"-".to_string()
};
// we do rustfmt::skip here as it was breaking the stats cmdline along strange
// boundaries, causing CI errors.
// This is because we're using tab characters (/t) to separate args to fix #2294,
Expand All @@ -2041,8 +2051,7 @@ pub fn get_stats_records(
// StatsMode::FrequencyForceStats
// we're doing frequency, so we need cardinality from a --forced stats run
format!(
"stats\t{input}\t--cardinality\t--stats-jsonl\t--force\
\t--output\t{tempfile_path}"
"stats\t{input}\t--cardinality\t--stats-jsonl\t--force\t--output\t{tempfile_path}"
)
},
#[cfg(feature = "polars")]
Expand Down Expand Up @@ -2103,31 +2112,26 @@ pub fn get_stats_records(
}

// create a statsdatajon from the output of the stats command
csv_to_jsonl(
&tempfile_path,
&get_stats_data_types(),
statsdatajson_path.clone(),
)?;

let statsdatajson_rdr = BufReader::with_capacity(
DEFAULT_RDR_BUFFER_CAPACITY * 2,
File::open(statsdatajson_path)?,
);
csv_to_jsonl(&tempfile_path, &get_stats_data_types(), &statsdatajson_path)?;

let statsdatajson_rdr =
BufReader::with_capacity(DEFAULT_RDR_BUFFER_CAPACITY, File::open(statsdatajson_path)?);

let mut statsrecord: StatsData;
let mut curr_line: String;
let mut s_slice: Vec<u8>;
for line in statsdatajson_rdr.lines() {
curr_line = line?;
statsrecord = serde_json::from_str(&curr_line)?;
csv_stats.push(statsrecord);
if curr_line.starts_with(DATASET_STATS_PREFIX) {
break;
}
s_slice = curr_line.as_bytes().to_vec();
match simd_json::serde::from_slice(&mut **&mut s_slice) {
Ok(stats) => csv_stats.push(stats),
Err(_) => continue,
}
}
};

// get the headers from the input file
let mut rdr = csv::Reader::from_path(args.arg_input.clone().unwrap()).unwrap();
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

Ok((csv_fields, csv_stats))
}

Expand All @@ -2136,7 +2140,7 @@ pub fn get_stats_records(
pub fn csv_to_jsonl(
input_csv: &str,
csv_types: &[JsonTypes],
output_jsonl: PathBuf,
output_jsonl: &PathBuf,
) -> CliResult<()> {
let file = File::open(input_csv)?;
let mut rdr = csv::ReaderBuilder::new()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ fn frequency_all_unique_force_stats_cache() {
.args(["--stats-mode", "force"])
.arg(testdata);

wrk.assert_success(&mut cmd);

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);
let expected = vec![
svec!["field", "value", "count", "percentage"],
Expand Down
Loading
Loading