Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Sep 27, 2024
1 parent a016870 commit a364eee
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 4 deletions.
9 changes: 8 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::{
use datafusion_common::exec_datafusion_err;
use datafusion_common::instant::Instant;
use object_store::aws::AmazonS3Builder;
use parquet::arrow::builder::ArrowArrayCache;
use structopt::StructOpt;
use url::Url;

Expand Down Expand Up @@ -166,16 +167,22 @@ impl RunOpt {
pretty::print_batches(&result)?;
}
}

let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
let row_count: usize = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
if self.common.print_result {
pretty::print_batches(&result)?;
}

// ArrowArrayCache::get().reset();

benchmark_run.write_iter(elapsed, row_count);
}
}
benchmark_run.set_cache_stats(ArrowArrayCache::get().stats());
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub struct CommonOpt {
#[structopt(short, long)]
pub debug: bool,

/// If true, will print the result record batch
#[structopt(long)]
pub print_result: bool,

/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
Expand Down
34 changes: 32 additions & 2 deletions benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
// under the License.

use datafusion::{error::Result, DATAFUSION_VERSION};
use parquet::arrow::builder::CacheStatistics;
use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
collections::HashMap,
fs::File,
path::Path,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -96,6 +98,7 @@ pub struct BenchmarkRun {
context: RunContext,
queries: Vec<BenchQuery>,
current_case: Option<usize>,
cache_stats: Option<CacheStatistics>,
}

impl Default for BenchmarkRun {
Expand All @@ -111,6 +114,7 @@ impl BenchmarkRun {
context: RunContext::new(),
queries: vec![],
current_case: None,
cache_stats: None,
}
}
/// begin a new case. iterations added after this will be included in the new case
Expand All @@ -137,6 +141,10 @@ impl BenchmarkRun {
}
}

pub fn set_cache_stats(&mut self, cache_stats: CacheStatistics) {
self.cache_stats = Some(cache_stats);
}

/// Stringify data into formatted json
pub fn to_json(&self) -> String {
let mut output = HashMap::<&str, Value>::new();
Expand All @@ -146,10 +154,32 @@ impl BenchmarkRun {
}

/// Write data as json into output path if it exists.
pub fn maybe_write_json(&self, maybe_path: Option<impl AsRef<Path>>) -> Result<()> {
if let Some(path) = maybe_path {
pub fn maybe_write_json(
&mut self,
maybe_path: Option<impl AsRef<Path>>,
) -> Result<()> {
if let Some(ref path) = maybe_path {
std::fs::write(path, self.to_json())?;
};
// stats is too large so we write into parquet
if let Some(path) = maybe_path {
// same filename but parquet extension
let stats = self.cache_stats.take();
if let Some(stats) = stats {
let path = path.as_ref().with_extension("parquet");
let mut writer = File::create(path)?;

let stats_batch = stats.into_record_batch();
let schema = stats_batch.schema();
let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
&mut writer,
schema,
None,
)?;
writer.write(&stats_batch)?;
writer.close()?;
}
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ fn resolve_u8(v: &Value) -> AvroResult<u8> {
other => Err(AvroError::GetU8(other.into())),
}?;
if let Value::Int(n) = int {
if n >= 0 && n <= std::convert::From::from(u8::MAX) {
if n >= 0 && n <= u8::MAX as i32 {
return Ok(n as u8);
}
}
Expand Down
1 change: 1 addition & 0 deletions dev/bench_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ do
else
echo "Query $i failed. Check $log_file for details." | tee -a $log_file
fi
rm -rf target/arrow-cache
done

0 comments on commit a364eee

Please sign in to comment.