Skip to content

Commit

Permalink
Merge branch 'move-enforce-sorting-to-new-crate' of https://github.co…
Browse files Browse the repository at this point in the history
…m/buraksenn/datafusion into moves_crates_test
  • Loading branch information
logan-keede committed Jan 21, 2025
2 parents 5093e61 + 9c40e49 commit 3d78ef2
Show file tree
Hide file tree
Showing 67 changed files with 1,794 additions and 513 deletions.
39 changes: 38 additions & 1 deletion .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,42 @@ on:
- main

jobs:
# Check crate compiles and base cargo check passes
linux-build-lib:
name: linux build test
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Prepare cargo build
run: cargo check --profile ci --all-targets

# Run extended tests (with feature 'extended_tests')
linux-test-extended:
name: cargo test 'extended_tests' (amd64)
needs: linux-build-lib
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable
- name: Run tests (excluding doctests)
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,extended_tests
- name: Verify Working Directory Clean
run: git diff --exit-code

# Check answers are correct when hash values collide
hash-collisions:
name: cargo test hash collisions (amd64)
Expand All @@ -51,7 +87,8 @@ jobs:
- name: Run tests
run: |
cd datafusion
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro
cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --exclude datafusion-sqllogictest --workspace --lib --tests --features=force_hash_collisions,avro,extended_tests
sqllogictest-sqlite:
name: "Run sqllogictests with the sqlite test suite"
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ unicode_expressions = [
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]
extended_tests = []

[dependencies]
apache-avro = { version = "0.17", optional = true }
Expand Down Expand Up @@ -150,6 +151,7 @@ rand_distr = "0.4.3"
regex = { workspace = true }
rstest = { workspace = true }
serde_json = { workspace = true }
sysinfo = "0.33.1"
test-utils = { path = "../../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] }

Expand Down
84 changes: 84 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow_array::{ArrayRef, RecordBatch};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_expr::col;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche
});
}

/// Registers a table like this:
/// c0,c1,c2...,c99
/// 0,100...9900
/// 0,200...19800
/// 0,300...29700
fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) {
// ("c0", [0, 0, ...])
// ("c1": [100, 200, ...])
// etc
let iter = (0..num_columns).map(|i| i as u64).map(|i| {
let array: ArrayRef = Arc::new(arrow::array::UInt64Array::from_iter_values(
(0..num_rows)
.map(|j| j as u64 * 100 + i)
.collect::<Vec<_>>(),
));
(format!("c{}", i), array)
});
let batch = RecordBatch::try_from_iter(iter).unwrap();
let schema = batch.schema();
let partitions = vec![vec![batch]];

// tell DataFusion that the table is sorted by all columns
let sort_order = (0..num_columns)
.map(|i| col(format!("c{}", i)).sort(true, true))
.collect::<Vec<_>>();

// create the table
let table = MemTable::try_new(schema, partitions)
.unwrap()
.with_sort_order(vec![sort_order]);

ctx.register_table("t", Arc::new(table)).unwrap();
}

/// return a query like
/// ```sql
/// select c1, null as c2, ... null as cn from t ORDER BY c1
/// UNION ALL
/// select null as c1, c2, ... null as cn from t ORDER BY c2
/// ...
/// select null as c1, null as c2, ... cn from t ORDER BY cn
/// ORDER BY c1, c2 ... CN
/// ```
fn union_orderby_query(n: usize) -> String {
let mut query = String::new();
for i in 0..n {
if i != 0 {
query.push_str("\n UNION ALL \n");
}
let select_list = (0..n)
.map(|j| {
if i == j {
format!("c{j}")
} else {
format!("null as c{j}")
}
})
.collect::<Vec<_>>()
.join(", ");
query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})", select_list, i));
}
query.push_str(&format!(
"\nORDER BY {}",
(0..n)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ")
));
query
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// -- Sorted Queries --
register_union_order_table(&ctx, 100, 1000);

// this query has many expressions in its sort order so stresses
// order equivalence validation
c.bench_function("physical_sorted_union_orderby", |b| {
// SELECT ... UNION ALL ...
let query = union_orderby_query(20);
b.iter(|| physical_plan(&ctx, &query))
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ fn update_join_filter(
side: col_idx.side,
})
.collect(),
join_filter.schema().clone(),
Arc::clone(join_filter.schema()),
)
})
}
Expand Down Expand Up @@ -2246,11 +2246,11 @@ mod tests {
side: JoinSide::Left,
},
],
Schema::new(vec![
Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
])),
)),
&JoinType::Inner,
true,
Expand Down Expand Up @@ -2360,11 +2360,11 @@ mod tests {
side: JoinSide::Left,
},
],
Schema::new(vec![
Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
])),
)),
&JoinType::Inner,
true,
Expand Down Expand Up @@ -2462,7 +2462,7 @@ mod tests {
Some(JoinFilter::new(
filter_expr,
filter_column_indices,
filter_schema,
Arc::new(filter_schema),
)),
&JoinType::Inner,
None,
Expand Down Expand Up @@ -2536,11 +2536,11 @@ mod tests {
side: JoinSide::Left,
},
],
Schema::new(vec![
Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
])),
)),
&JoinType::Inner,
None,
Expand Down
Loading

0 comments on commit 3d78ef2

Please sign in to comment.