Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
removed projected node
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 18, 2024
1 parent 7ed7c71 commit 01e81c5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
1 change: 1 addition & 0 deletions sql_dfphysicalplan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ edition = "2021"
arrow = { version = "50", features = ["prettyprint"] }
datafusion = "35"
datafusion-common = "35"
futures = "0.3"
tokio = { version = "1", features = ["full"] }
54 changes: 46 additions & 8 deletions sql_dfphysicalplan/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
use arrow::util::pretty;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use datafusion::prelude::*;
use datafusion_common::Result;
use futures::stream::StreamExt;
use std::io;
use std::sync::Arc;

async fn remove_projection(physical_plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let _ = physical_plan
.clone()
.as_any()
.downcast_ref::<ProjectionExec>()
.expect("Unable to downcast_ref to ProjectionExec");

let children = physical_plan.children();
assert_eq!(children.len(), 1);

children[0].clone()
}

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -21,23 +40,42 @@ async fn main() -> Result<()> {

let stdin = io::read_to_string(io::stdin())?;

// let results: Vec<_> = df.collect().await?;

// let pretty_results = pretty::pretty_format_batches(&results)?.to_string();

// println!("{}", pretty_results);

let df = ctx
.sql(&stdin)
.await
.unwrap_or_else(|err| panic!("{:#?}", err));

let logical_plan = df.clone().into_unoptimized_plan();
println!("Logical Plan:\n{:#?}", logical_plan);
let physical_plan = df.clone().create_physical_plan().await?;

// let physical_plan = df.clone().create_physical_plan().await?;
// println!("Physical Plan:\n{:#?}", physical_plan);
let physical_plan = remove_projection(physical_plan).await;

let results: Vec<_> = df.collect().await?;
let partitioning = physical_plan.output_partitioning();
let partitions = match partitioning {
Partitioning::RoundRobinBatch(c) => c,
Partitioning::Hash(_, h) => h,
Partitioning::UnknownPartitioning(p) => p,
};

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?.to_string();
for i in 0..partitions {
let batch_stream = physical_plan.execute(i, Default::default())?;

println!("{}", pretty_results);
let results = batch_stream.collect::<Vec<_>>().await;
for batch in results {
let batch = batch.unwrap();
if batch.num_rows() == 0 {
continue;
}

let pretty_results = pretty::pretty_format_batches(&[batch])?.to_string();
println!("{}", pretty_results);
}
}

Ok(())
}

0 comments on commit 01e81c5

Please sign in to comment.