Skip to content

Commit

Permalink
add flight sql
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Nov 2, 2024
1 parent f7d3a4d commit 973bb25
Show file tree
Hide file tree
Showing 9 changed files with 1,296 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ members = [
"datafusion-examples",
"test-utils",
"benchmarks",
"datafusion/flight-table",
]
resolver = "2"

Expand Down Expand Up @@ -99,6 +100,7 @@ datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.
datafusion-execution = { path = "datafusion/execution", version = "42.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.0.0" }
datafusion-flight-table = { path = "datafusion/flight-table", version = "42.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.0.0" }
Expand Down Expand Up @@ -170,4 +172,3 @@ large_futures = "warn"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_imports = "deny"

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ arrow-schema = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true, default-features = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-flight-table = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
Expand Down
68 changes: 68 additions & 0 deletions benchmarks/src/bin/cache_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::util::pretty;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_flight_table::sql::{FlightSqlDriver, USERNAME};
use datafusion_flight_table::FlightTableFactory;
use std::collections::HashMap;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::common::Result<()> {
let ctx = SessionContext::new();
let mut state = ctx.state();
state
.config_mut()
.options_mut()
.execution
.parquet
.pushdown_filters = true;

let flight_sql = FlightTableFactory::new(Arc::new(FlightSqlDriver::default()));
let table = flight_sql
.open_table(
"http://localhost:50051",
HashMap::from([(USERNAME.into(), "whatever".into())]),
"hits",
)
.await?;
ctx.register_table("hits", Arc::new(table))?;

// let sql = r#"SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0"#;
let sql = r#"SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC"#;

let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
let plan = state.optimize(&plan)?;

println!("logical plan: {}", plan);
let physical_plan = state.create_physical_plan(&plan).await?;
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
);
if !result.is_empty() {
// do not call print_batches if there are no batches as the result is confusing
// and makes it look like there is a batch with no columns
pretty::print_batches(&result)?;
}
Ok(())
}
Loading

0 comments on commit 973bb25

Please sign in to comment.