Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/datafusion into ret-ty
Browse files Browse the repository at this point in the history
  • Loading branch information
jayzhan211 committed Jan 18, 2025
2 parents 03bd527 + 0e22172 commit fd2f35d
Show file tree
Hide file tree
Showing 152 changed files with 6,388 additions and 3,511 deletions.
22 changes: 19 additions & 3 deletions .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

name: Rust Hash Collisions
name: Datafusion extended tests

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
Expand All @@ -24,8 +24,8 @@ concurrency:
# https://docs.github.com/en/actions/writing-workflows/choosing-when-your-workflow-runs/events-that-trigger-workflows#running-your-pull_request-workflow-when-a-pull-request-merges
#
# this job is intended to only run only on the main branch as it is time consuming
# and very rarely fails. However, it is important coverage to ensure correctness
# in the (very rare) event of a hash failure.
# and should not fail often. However, it is important coverage to ensure correctness
# in the (very rare) event of a hash failure or sqlite query failure.
on:
# Run on all commits to main
push:
Expand All @@ -52,3 +52,19 @@ jobs:
run: |
cd datafusion
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro
sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run sqllogictest
run: cargo test --profile release-nonlto --test sqllogictests -- --include-sqlite
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ version = "44.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "53.3.0", features = [
arrow = { version = "54.0.0", features = [
"prettyprint",
] }
arrow-array = { version = "53.3.0", default-features = false, features = [
arrow-array = { version = "54.0.0", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "53.3.0", default-features = false }
arrow-flight = { version = "53.3.0", features = [
arrow-buffer = { version = "54.0.0", default-features = false }
arrow-flight = { version = "54.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "53.3.0", default-features = false, features = [
arrow-ipc = { version = "54.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "53.3.0", default-features = false }
arrow-schema = { version = "53.3.0", default-features = false }
arrow-ord = { version = "54.0.0", default-features = false }
arrow-schema = { version = "54.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.7"
bytes = "1.4"
Expand Down Expand Up @@ -133,7 +133,7 @@ itertools = "0.14"
log = "^0.4"
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "53.3.0", default-features = false, features = [
parquet = { version = "54.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ stable API, we also improve the API over time. As a result, we typically
deprecate methods before removing them, according to the [deprecation guidelines].

[deprecation guidelines]: https://datafusion.apache.org/library-user-guide/api-health.html

## Dependencies and a `Cargo.lock`

`datafusion` is intended for use as a library and thus purposely does not have a
`Cargo.lock` file checked in. You can read more about the distinction in the
[Cargo book].

CI tests always run against the latest compatible versions of all dependencies
(the equivalent of doing `cargo update`), as suggested in the [Cargo CI guide]
and we rely on Dependabot for other upgrades. This strategy has two problems
that occasionally arise:

1. CI failures when downstream libraries upgrade in some non compatible way
2. Local development builds that fail when DataFusion inadvertently relies on
a feature in a newer version of a dependency than declared in `Cargo.toml`
(e.g. a new method is added to a trait that we use).

However, we think the current strategy is the best tradeoff between maintenance
overhead and user experience and ensures DataFusion always works with the latest
compatible versions of all dependencies. If you encounter either of these
problems, please open an issue or PR.

[cargo book]: https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
[cargo ci guide]: https://doc.rust-lang.org/cargo/guide/continuous-integration.html#verifying-latest-dependencies
55 changes: 37 additions & 18 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ DataFusion is included in the benchmark setups for several popular
benchmarks that compare performance with other engines. For example:

* [ClickBench] scripts are in the [ClickBench repo](https://github.com/ClickHouse/ClickBench/tree/main/datafusion)
* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](db-benchmark) directory
* [H2o.ai `db-benchmark`] scripts are in [db-benchmark](https://github.com/apache/datafusion/tree/main/benchmarks/src/h2o.rs)

[ClickBench]: https://github.com/ClickHouse/ClickBench/tree/main
[H2o.ai `db-benchmark`]: https://github.com/h2oai/db-benchmark
Expand Down Expand Up @@ -405,31 +405,50 @@ cargo run --release --bin external_aggr -- benchmark -n 4 --iterations 3 -p '...
```


# Older Benchmarks
## h2o benchmarks for groupby

## h2o benchmarks
### Generate data for h2o benchmarks
There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory.

1. Generate small data (1e7 rows)
```bash
cargo run --release --bin h2o group-by --query 1 --path /mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv --mem-table --debug
./bench.sh data h2o_small
```

Example run:

2. Generate medium data (1e8 rows)
```bash
./bench.sh data h2o_medium
```


3. Generate large data (1e9 rows)
```bash
./bench.sh data h2o_big
```

### Run h2o benchmarks
There are three options for running h2o benchmarks: `small`, `medium`, and `big`.
1. Run small data benchmark
```bash
./bench.sh run h2o_small
```
Running benchmarks with the following options: GroupBy(GroupBy { query: 1, path: "/mnt/bigdata/h2oai/N_1e7_K_1e2_single.csv", debug: false })
Executing select id1, sum(v1) as v1 from x group by id1
+-------+--------+
| id1 | v1 |
+-------+--------+
| id063 | 199420 |
| id094 | 200127 |
| id044 | 198886 |
...
| id093 | 200132 |
| id003 | 199047 |
+-------+--------+

h2o groupby query 1 took 1669 ms
2. Run medium data benchmark
```bash
./bench.sh run h2o_medium
```

3. Run large data benchmark
```bash
./bench.sh run h2o_big
```

4. Run a specific query with a specific data path

For example, to run query 1 with the small data generated above:
```bash
cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1
```

[1]: http://www.tpc.org/tpch/
Expand Down
146 changes: 146 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
external_aggr: External aggregation benchmark
h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv
h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv
**********
* Supported Configuration (Environment Variables)
Expand Down Expand Up @@ -142,6 +145,9 @@ main() {
all)
data_tpch "1"
data_tpch "10"
data_h2o "SMALL"
data_h2o "MEDIUM"
data_h2o "BIG"
data_clickbench_1
data_clickbench_partitioned
data_imdb
Expand Down Expand Up @@ -172,6 +178,15 @@ main() {
imdb)
data_imdb
;;
h2o_small)
data_h2o "SMALL" "CSV"
;;
h2o_medium)
data_h2o "MEDIUM" "CSV"
;;
h2o_big)
data_h2o "BIG" "CSV"
;;
external_aggr)
# same data as for tpch
data_tpch "1"
Expand Down Expand Up @@ -221,6 +236,9 @@ main() {
run_clickbench_1
run_clickbench_partitioned
run_clickbench_extended
run_h2o "SMALL" "PARQUET" "groupby"
run_h2o "MEDIUM" "PARQUET" "groupby"
run_h2o "BIG" "PARQUET" "groupby"
run_imdb
run_external_aggr
;;
Expand Down Expand Up @@ -254,6 +272,15 @@ main() {
imdb)
run_imdb
;;
h2o_small)
run_h2o "SMALL" "CSV" "groupby"
;;
h2o_medium)
run_h2o "MEDIUM" "CSV" "groupby"
;;
h2o_big)
run_h2o "BIG" "CSV" "groupby"
;;
external_aggr)
run_external_aggr
;;
Expand Down Expand Up @@ -541,6 +568,125 @@ run_imdb() {
$CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
}

data_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}

# Function to compare Python versions
version_ge() {
[ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
}

export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1

# Find the highest available Python version (3.10 or higher)
REQUIRED_VERSION="3.10"
PYTHON_CMD=$(command -v python3 || true)

if [ -n "$PYTHON_CMD" ]; then
PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
echo "Found Python version $PYTHON_VERSION, which is suitable."
else
echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
PYTHON_CMD=""
fi
fi

# Search for suitable Python versions if the default is unsuitable
if [ -z "$PYTHON_CMD" ]; then
# Loop through all available Python3 commands on the system
for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
if command -v "$CMD" &> /dev/null; then
PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
PYTHON_CMD="$CMD"
echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
break
fi
fi
done
fi

# If no suitable Python version found, exit with an error
if [ -z "$PYTHON_CMD" ]; then
echo "Python 3.10 or higher is required. Please install it."
return 1
fi

echo "Using Python command: $PYTHON_CMD"

# Install falsa and other dependencies
echo "Installing falsa..."

# Set virtual environment directory
VIRTUAL_ENV="${PWD}/venv"

# Create a virtual environment using the detected Python command
$PYTHON_CMD -m venv "$VIRTUAL_ENV"

# Activate the virtual environment and install dependencies
source "$VIRTUAL_ENV/bin/activate"

# Ensure 'falsa' is installed (avoid unnecessary reinstall)
pip install --quiet --upgrade falsa

# Create directory if it doesn't exist
H2O_DIR="${DATA_DIR}/h2o"
mkdir -p "${H2O_DIR}"

# Generate h2o test data
echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
falsa groupby --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"

# Deactivate virtual environment after completion
deactivate
}

## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join
run_h2o() {
# Default values for size and data format
SIZE=${1:-"SMALL"}
DATA_FORMAT=${2:-"CSV"}
DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
RUN_Type=${3:-"groupby"}

# Data directory and results file path
H2O_DIR="${DATA_DIR}/h2o"
RESULTS_FILE="${RESULTS_DIR}/h2o.json"

echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running h2o benchmark..."

# Set the file name based on the size
case "$SIZE" in
"SMALL")
FILE_NAME="G1_1e7_1e7_100_0.${DATA_FORMAT}" # For small dataset
;;
"MEDIUM")
FILE_NAME="G1_1e8_1e8_100_0.${DATA_FORMAT}" # For medium dataset
;;
"BIG")
FILE_NAME="G1_1e9_1e9_100_0.${DATA_FORMAT}" # For big dataset
;;
*)
echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
return 1
;;
esac

# Set the query file name based on the RUN_Type
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"

# Run the benchmark using the dynamically constructed file path and query file
$CARGO_COMMAND --bin dfbench -- h2o \
--iterations 3 \
--path "${H2O_DIR}/${FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}"
}

# Runs the external aggregation benchmark
run_external_aggr() {
# Use TPC-H SF1 dataset
Expand Down
10 changes: 10 additions & 0 deletions benchmarks/queries/h2o/groupby.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1;
SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2;
SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3;
SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4;
SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6;
SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3;
SELECT id6, largest2_v3 FROM (SELECT id6, v3 AS largest2_v3, ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 FROM x WHERE v3 IS NOT NULL) sub_query WHERE order_v3 <= 2;
SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM x GROUP BY id2, id4;
SELECT id1, id2, id3, id4, id5, id6, SUM(v3) AS v3, COUNT(*) AS count FROM x GROUP BY id1, id2, id3, id4, id5, id6;
5 changes: 5 additions & 0 deletions benchmarks/queries/h2o/join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2;
SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5;
SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
Loading

0 comments on commit fd2f35d

Please sign in to comment.