Skip to content

Commit

Permalink
Add H2O.ai Database-like Ops benchmark to dfbench
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuqi-lucas committed Jan 3, 2025
1 parent 63265fd commit ee9c3ae
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 136 deletions.
118 changes: 118 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
h2o_small: h2oai with small dataset (1e7 rows), file format is parquet
h2o_medium: h2oai with medium dataset (1e8 rows), file format is parquet
h2o_large: h2oai with large dataset (1e9 rows), file format is parquet
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -142,6 +145,9 @@ main() {
all)
data_tpch "1"
data_tpch "10"
data_h2o "SMALL"
data_h2o "MEDIUM"
data_h2o "LARGE"
data_clickbench_1
data_clickbench_partitioned
data_imdb
Expand Down Expand Up @@ -172,6 +178,18 @@ main() {
imdb)
data_imdb
;;
h2o_small)
data_h2o "SMALL"
;;
h2o_medium)
data_h2o "MEDIUM"
;;
h2o_large)
data_h2o "LARGE"
;;
h2o_small_csv)
data_h2o "SMALL" "CSV"
;;
external_aggr)
# same data as for tpch
data_tpch "1"
Expand Down Expand Up @@ -221,6 +239,9 @@ main() {
run_clickbench_1
run_clickbench_partitioned
run_clickbench_extended
run_h2o "SMALL" "PARQUET" "groupby"
run_h2o "MEDIUM" "PARQUET" "groupby"
run_h2o "LARGE" "PARQUET" "groupby"
run_imdb
run_external_aggr
;;
Expand Down Expand Up @@ -254,6 +275,18 @@ main() {
imdb)
run_imdb
;;
h2o_small)
run_h2o "SMALL" "PARQUET" "groupby"
;;
h2o_medium)
run_h2o "MEDIUM" "PARQUET" "groupby"
;;
h2o_large)
run_h2o "LARGE" "PARQUET" "groupby"
;;
h2o_small_csv)
run_h2o "SMALL" "CSV" "groupby"
;;
external_aggr)
run_external_aggr
;;
Expand Down Expand Up @@ -541,6 +574,91 @@ run_imdb() {
$CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}

data_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"PARQUET"}

# Ensure the Python version is 3.10 or higher
REQUIRED_PYTHON="python3.10"
if ! command -v $REQUIRED_PYTHON &> /dev/null
then
echo "$REQUIRED_PYTHON could not be found. Please install Python 3.10 or higher."
return 1
fi

# Install falsa and other dependencies
echo "Installing falsa..."

# Set virtual environment directory
VIRTUAL_ENV="${PWD}/.venv"

# Check if the virtual environment already exists
if [ ! -d "$VIRTUAL_ENV" ]; then
# Create a virtual environment using Python 3.10
$REQUIRED_PYTHON -m venv "$VIRTUAL_ENV"
fi

# Activate the virtual environment and install dependencies
source "$VIRTUAL_ENV/bin/activate"

# Ensure 'falsa' is installed (avoid unnecessary reinstall)
pip install --quiet --upgrade falsa

# Create directory if it doesn't exist
H2O_DIR="${DATA_DIR}/h2o"
mkdir -p "${H2O_DIR}"

# Generate h2o test data
echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

# Deactivate virtual environment after completion
deactivate
}

run_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"PARQUET"}
DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
RUN_Type=${3:-"groupby"}

# Data directory and results file path
H2O_DIR="${DATA_DIR}/h2o"
RESULTS_FILE="${RESULTS_DIR}/h2o.json"

echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running h2o benchmark..."

# Set the file name based on the size
case "$SIZE" in
"SMALL")
FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}" # For small dataset
;;
"MEDIUM")
FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}" # For medium dataset
;;
"BIG")
FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}" # For big dataset
;;
*)
echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
return 1
;;
esac

# Set the query file name based on the RUN_Type
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

# Run the benchmark using the dynamically constructed file path and query file
$CARGO_COMMAND --bin dfbench -- h2o \
--iterations 5 \
--path "${H2O_DIR}/${FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}"
}

# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
Expand Down
10 changes: 10 additions & 0 deletions benchmarks/queries/h2o/groupby.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1;
SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2;
SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4;
SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6;
SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3;
SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2;
SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4;
SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6;
5 changes: 5 additions & 0 deletions benchmarks/queries/h2o/join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5;
SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
4 changes: 3 additions & 1 deletion benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch};
use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch, h2o};

#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
Expand All @@ -45,6 +45,7 @@ enum Options {
Sort(sort::RunOpt),
SortTpch(sort_tpch::RunOpt),
Imdb(imdb::RunOpt),
H2o(h2o::RunOpt),
}

// Main benchmark runner entrypoint
Expand All @@ -60,5 +61,6 @@ pub async fn main() -> Result<()> {
Options::Sort(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Options::Imdb(opt) => opt.run().await,
Options::H2o(opt) => opt.run().await,
}
}
135 changes: 0 additions & 135 deletions benchmarks/src/bin/h2o.rs

This file was deleted.

Loading

0 comments on commit ee9c3ae

Please sign in to comment.