From 01e81c59e20bb51cea4c7105dbe4ef3e05af40f9 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 18 Feb 2024 14:27:44 -0500 Subject: [PATCH] removed projected node --- sql_dfphysicalplan/Cargo.toml | 1 + sql_dfphysicalplan/src/main.rs | 54 +++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/sql_dfphysicalplan/Cargo.toml b/sql_dfphysicalplan/Cargo.toml index 76d77bf..14e5052 100644 --- a/sql_dfphysicalplan/Cargo.toml +++ b/sql_dfphysicalplan/Cargo.toml @@ -9,4 +9,5 @@ edition = "2021" arrow = { version = "50", features = ["prettyprint"] } datafusion = "35" datafusion-common = "35" +futures = "0.3" tokio = { version = "1", features = ["full"] } diff --git a/sql_dfphysicalplan/src/main.rs b/sql_dfphysicalplan/src/main.rs index d08f9bb..a0b20a3 100644 --- a/sql_dfphysicalplan/src/main.rs +++ b/sql_dfphysicalplan/src/main.rs @@ -1,7 +1,26 @@ +use arrow::util::pretty; use datafusion::execution::context::SessionContext; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::Partitioning; use datafusion::prelude::*; use datafusion_common::Result; +use futures::stream::StreamExt; use std::io; +use std::sync::Arc; + +async fn remove_projection(physical_plan: Arc) -> Arc { + let _ = physical_plan + .clone() + .as_any() + .downcast_ref::() + .expect("Unable to downcast_ref to ProjectionExec"); + + let children = physical_plan.children(); + assert_eq!(children.len(), 1); + + children[0].clone() +} #[tokio::main] async fn main() -> Result<()> { @@ -21,23 +40,42 @@ async fn main() -> Result<()> { let stdin = io::read_to_string(io::stdin())?; + // let results: Vec<_> = df.collect().await?; + + // let pretty_results = pretty::pretty_format_batches(&results)?.to_string(); + + // println!("{}", pretty_results); + let df = ctx .sql(&stdin) .await .unwrap_or_else(|err| panic!("{:#?}", err)); - let logical_plan = df.clone().into_unoptimized_plan(); - println!("Logical Plan:\n{:#?}", logical_plan); + let physical_plan = df.clone().create_physical_plan().await?; - // let physical_plan = df.clone().create_physical_plan().await?; - // println!("Physical Plan:\n{:#?}", physical_plan); + let physical_plan = remove_projection(physical_plan).await; - let results: Vec<_> = df.collect().await?; + let partitioning = physical_plan.output_partitioning(); + let partitions = match partitioning { + Partitioning::RoundRobinBatch(c) => c, + Partitioning::Hash(_, h) => h, + Partitioning::UnknownPartitioning(p) => p, + }; - // format the results - let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?.to_string(); + for i in 0..partitions { + let batch_stream = physical_plan.execute(i, Default::default())?; - println!("{}", pretty_results); + let results = batch_stream.collect::>().await; + for batch in results { + let batch = batch.unwrap(); + if batch.num_rows() == 0 { + continue; + } + + let pretty_results = pretty::pretty_format_batches(&[batch])?.to_string(); + println!("{}", pretty_results); + } + } Ok(()) }