From 7ed7c71045642179916ddac16c49982c1ef77f21 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 18 Feb 2024 13:32:38 -0500 Subject: [PATCH] import sql_dfpp --- sql_dfphysicalplan/Cargo.toml | 12 +++++++ sql_dfphysicalplan/queries/TPC-H_Q1.sql | 21 +++++++++++ sql_dfphysicalplan/queries/test_query.sql | 13 +++++++ sql_dfphysicalplan/src/main.rs | 43 +++++++++++++++++++++++ 4 files changed, 89 insertions(+) create mode 100644 sql_dfphysicalplan/Cargo.toml create mode 100644 sql_dfphysicalplan/queries/TPC-H_Q1.sql create mode 100644 sql_dfphysicalplan/queries/test_query.sql create mode 100644 sql_dfphysicalplan/src/main.rs diff --git a/sql_dfphysicalplan/Cargo.toml b/sql_dfphysicalplan/Cargo.toml new file mode 100644 index 0000000..76d77bf --- /dev/null +++ b/sql_dfphysicalplan/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "sql_dfphysicalplan" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { version = "50", features = ["prettyprint"] } +datafusion = "35" +datafusion-common = "35" +tokio = { version = "1", features = ["full"] } diff --git a/sql_dfphysicalplan/queries/TPC-H_Q1.sql b/sql_dfphysicalplan/queries/TPC-H_Q1.sql new file mode 100644 index 0000000..cda51ab --- /dev/null +++ b/sql_dfphysicalplan/queries/TPC-H_Q1.sql @@ -0,0 +1,21 @@ +SELECT + L_RETURNFLAG, + L_LINESTATUS, + SUM(L_QUANTITY) AS SUM_QTY, + SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, + SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT)) AS SUM_DISC_PRICE, + SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX)) AS SUM_CHARGE, + AVG(L_QUANTITY) AS AVG_QTY, + AVG(L_EXTENDEDPRICE) AS AVG_PRICE, + AVG(L_DISCOUNT) AS AVG_DISC, + COUNT(*) AS COUNT_ORDER +FROM + LINEITEM +WHERE + L_SHIPDATE <= DATE '1998-12-01' - INTERVAL '93 DAY' +GROUP BY + L_RETURNFLAG, + L_LINESTATUS +ORDER BY + L_RETURNFLAG, + L_LINESTATUS; \ No newline at end of file diff --git a/sql_dfphysicalplan/queries/test_query.sql b/sql_dfphysicalplan/queries/test_query.sql new file mode 100644 index 0000000..2f8b2b6 --- /dev/null +++ b/sql_dfphysicalplan/queries/test_query.sql @@ -0,0 +1,13 @@ +SELECT + L_RETURNFLAG, + L_LINESTATUS, + SUM(L_QUANTITY) AS SUM_QTY, + AVG(L_QUANTITY) AS AVG_QTY, + COUNT(*) AS COUNT_ORDER +FROM + LINEITEM +-- WHERE +-- L_SHIPDATE <= DATE '1998-12-01' - INTERVAL '93 DAY' +GROUP BY + L_RETURNFLAG, + L_LINESTATUS diff --git a/sql_dfphysicalplan/src/main.rs b/sql_dfphysicalplan/src/main.rs new file mode 100644 index 0000000..d08f9bb --- /dev/null +++ b/sql_dfphysicalplan/src/main.rs @@ -0,0 +1,43 @@ +use datafusion::execution::context::SessionContext; +use datafusion::prelude::*; +use datafusion_common::Result; +use std::io; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + let tables = [ + "customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier", + ]; + for table_name in tables { + ctx.register_csv( + table_name, + &format!("data/{}.csv", table_name), + CsvReadOptions::new().delimiter(b'|'), + ) + .await?; + } + + let stdin = io::read_to_string(io::stdin())?; + + let df = ctx + .sql(&stdin) + .await + .unwrap_or_else(|err| panic!("{:#?}", err)); + + let logical_plan = df.clone().into_unoptimized_plan(); + println!("Logical Plan:\n{:#?}", logical_plan); + + // let physical_plan = df.clone().create_physical_plan().await?; + // println!("Physical Plan:\n{:#?}", physical_plan); + + let results: Vec<_> = df.collect().await?; + + // format the results + let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?.to_string(); + + println!("{}", pretty_results); + + Ok(()) +}