Skip to content

Commit

Permalink
refactor: improve dataset-level stats calculation, use a helper closu…
Browse files Browse the repository at this point in the history
…re write_dataset_stat
  • Loading branch information
jqnatividad committed Nov 18, 2024
1 parent 68f3830 commit 3bb256d
Showing 1 changed file with 57 additions and 42 deletions.
99 changes: 57 additions & 42 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following additional "non-streaming" statistics require loading the entire f
cardinality, mode/antimode, median, MAD, quartiles and its related measures (IQR,
lower/upper fences & skewness).
When computing non-streaming statistics, an Out-Of-Memory (OOM) heuristic check is done.
When computing "non-streaming" statistics, an Out-Of-Memory (OOM) heuristic check is done.
If the file is larger than the available memory minus a headroom buffer of 20% (which can be
adjusted using the QSV_FREEMEMORY_HEADROOM_PCT environment variable), processing will be
preemptively prevented.
Expand Down Expand Up @@ -794,46 +794,64 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
stats_br_vec.push(work_br);
}

// add the dataset-level stats
// Add dataset-level stats as additional rows ====================
let num_stats_fields = stats_headers_sr.len();
let mut dataset_stats_br = csv::ByteRecord::with_capacity(128, num_stats_fields);
dataset_stats_br.push_field(b"_qsv_rowcount");
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}

// Helper closure to write a dataset stat row
let write_dataset_stat = |name: &[u8],
value: &[u8],
br: &mut csv::ByteRecord,
wtr: &mut csv::Writer<_>|
-> CliResult<()> {
br.clear();
br.push_field(name);
// Fill middle columns with empty strings
for _ in 2..num_stats_fields {
br.push_field(b"");
}
// write _qsv_value as last column
br.push_field(value);
wtr.write_byte_record(br).map_err(|e| e.into())
};

// Write _qsv_rowcount
let ds_record_count = itoa::Buffer::new()
.format(*record_count)
.as_bytes()
.to_owned();
dataset_stats_br.push_field(&ds_record_count);
wtr.write_record(&dataset_stats_br)?;
write_dataset_stat(
b"_qsv_rowcount",
&ds_record_count,
&mut dataset_stats_br,
&mut wtr,
)?;

dataset_stats_br.clear();
dataset_stats_br.push_field(b"_qsv_columncount");
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}
// Write _qsv_columncount
let ds_column_count = itoa::Buffer::new()
.format(headers.len())
.as_bytes()
.to_owned();
dataset_stats_br.push_field(&ds_column_count);
wtr.write_record(&dataset_stats_br)?;
write_dataset_stat(
b"_qsv_columncount",
&ds_column_count,
&mut dataset_stats_br,
&mut wtr,
)?;

dataset_stats_br.clear();
dataset_stats_br.push_field(b"_qsv_filesize_bytes");
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}
// Write _qsv_filesize_bytes
let ds_filesize_bytes = itoa::Buffer::new()
.format(fs::metadata(&path)?.len())
.as_bytes()
.to_owned();
dataset_stats_br.push_field(&ds_filesize_bytes);
wtr.write_record(&dataset_stats_br)?;
write_dataset_stat(
b"_qsv_filesize_bytes",
&ds_filesize_bytes,
&mut dataset_stats_br,
&mut wtr,
)?;

// compute the hash using stats, instead of scanning the entire file
// so the performance is constant regardless of file size
// Compute hash of stats for data fingerprinting
let stats_hash = {
#[allow(deprecated)]
// we use "deprecated" SipHasher explicitly instead of DefaultHasher,
Expand All @@ -842,32 +860,29 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// DefaultHasher may change in future Rust versions
let mut hasher =
std::hash::BuildHasherDefault::<std::hash::SipHasher>::default().build_hasher();

// Hash the first 20 columns of each stats record
// we only do the first 20 stats columns to compute the hash as those
// columns are always the same, even if other stats --options are used
for record in &stats_br_vec {
for (i, field) in record.iter().enumerate() {
// we only do the first 20 stats columns to compute the hash as those
// columns are always the same, even if other stats --options are used
if i >= 20 {
break;
}
for field in record.iter().take(20) {
std::hash::Hash::hash(field, &mut hasher);
}
}
// we also add the dataset level stats to the hash
std::hash::Hash::hash(&ds_record_count, &mut hasher);
std::hash::Hash::hash(&ds_column_count, &mut hasher);
std::hash::Hash::hash(&ds_filesize_bytes, &mut hasher);

// Include dataset-level stats in hash
for stat in [&ds_record_count, &ds_column_count, &ds_filesize_bytes] {
std::hash::Hash::hash(stat, &mut hasher);
}

std::hash::Hasher::finish(&hasher)
};

dataset_stats_br.clear();
dataset_stats_br.push_field(b"_qsv_hash");
for _ in 2..num_stats_fields {
dataset_stats_br.push_field(b"");
}
dataset_stats_br.push_field(itoa::Buffer::new().format(stats_hash).as_bytes());
wtr.write_record(&dataset_stats_br)?;
// Write _qsv_hash
let hash_bytes = itoa::Buffer::new().format(stats_hash).as_bytes().to_owned();
write_dataset_stat(b"_qsv_hash", &hash_bytes, &mut dataset_stats_br, &mut wtr)?;

// update the stats args json metadata
// update the stats args json metadata ===============
current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64;

if create_cache
Expand Down

0 comments on commit 3bb256d

Please sign in to comment.