Skip to content

Commit

Permalink
consolidate dataframe_subquery.rs into dataframe.rs (#13950)
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee authored Dec 31, 2024
1 parent 9b5995f commit 2b15ad1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 119 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
Expand Down
91 changes: 91 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::min_max::max;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use datafusion_common::ScalarValue;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand All @@ -44,7 +47,14 @@ use tempfile::tempdir;
///
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
///
/// # Executing subqueries
///
/// * [where_scalar_subquery]: execute a scalar subquery
/// * [where_in_subquery]: execute a subquery with an IN clause
/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
///
/// # Querying data
///
/// * [query_to_date]: execute queries against parquet files
#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -55,6 +65,11 @@ async fn main() -> Result<()> {
read_memory(&ctx).await?;
write_out(&ctx).await?;
query_to_date().await?;
register_aggregate_test_data("t1", &ctx).await?;
register_aggregate_test_data("t2", &ctx).await?;
where_scalar_subquery(&ctx).await?;
where_in_subquery(&ctx).await?;
where_exist_subquery(&ctx).await?;
Ok(())
}

Expand Down Expand Up @@ -250,3 +265,79 @@ async fn query_to_date() -> Result<()> {

Ok(())
}

/// Use the DataFrame API to execute the following subquery:
/// select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 3;
async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(
scalar_subquery(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.aggregate(vec![], vec![avg(col("t2.c2"))])?
.select(vec![avg(col("t2.c2"))])?
.into_unoptimized_plan(),
))
.gt(lit(0u8)),
)?
.select(vec![col("t1.c1"), col("t1.c2")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}

/// Use the DataFrame API to execute the following subquery:
/// select t1.c1, t1.c2 from t1 where t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 3;
async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(in_subquery(
col("t1.c2"),
Arc::new(
ctx.table("t2")
.await?
.filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))?
.aggregate(vec![], vec![max(col("t2.c2"))])?
.select(vec![max(col("t2.c2"))])?
.into_unoptimized_plan(),
),
))?
.select(vec![col("t1.c1"), col("t1.c2")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}

/// Use the DataFrame API to execute the following subquery:
/// select t1.c1, t1.c2 from t1 where exists (select t2.c2 from t2 where t1.c1 = t2.c1) limit 3;
async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
ctx.table("t1")
.await?
.filter(exists(Arc::new(
ctx.table("t2")
.await?
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
.select(vec![col("t2.c2")])?
.into_unoptimized_plan(),
)))?
.select(vec![col("t1.c1"), col("t1.c2")])?
.limit(0, Some(3))?
.show()
.await?;
Ok(())
}

async fn register_aggregate_test_data(name: &str, ctx: &SessionContext) -> Result<()> {
let testdata = datafusion::test_util::arrow_test_data();
ctx.register_csv(
name,
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::default(),
)
.await?;
Ok(())
}
118 changes: 0 additions & 118 deletions datafusion-examples/examples/dataframe_subquery.rs

This file was deleted.

0 comments on commit 2b15ad1

Please sign in to comment.