Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Nov 22, 2024
1 parent f66eb55 commit 534f07b
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 131 deletions.
164 changes: 36 additions & 128 deletions benchmarks/src/bin/cache_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream};
use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
ActionCreatePreparedStatementResult, Any, CommandGetDbSchemas, CommandGetTables,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
ProstMessageExt, SqlInfo,
ActionClosePreparedStatementRequest, Any, CommandGetDbSchemas,
CommandPreparedStatementUpdate, CommandStatementQuery, ProstMessageExt, SqlInfo,
};
use arrow_flight::{
Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket,
HandshakeResponse, Ticket,
};
use dashmap::DashMap;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use datafusion_flight_table::GcStream;
use futures::{Stream, TryStreamExt};
use log::{debug, info};
use mimalloc::MiMalloc;
use parquet::arrow::arrow_cache::ArrowArrayCache;
use prost::Message;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use structopt::StructOpt;
use tonic::metadata::MetadataValue;
Expand Down Expand Up @@ -114,15 +114,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
ctx.runtime_env().object_store(url).unwrap()
};
let mut parquet_options: ParquetReadOptions<'_> = Default::default();
use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory;
parquet_options.reader = Some(Arc::new(Parquet7FileReaderFactory::new(object_store)));
use datafusion::datasource::physical_plan::parquet::Parquet8FileReaderFactory;
parquet_options.reader = Some(Arc::new(Parquet8FileReaderFactory::new(object_store)));

// register parquet file with the execution context
ctx.register_parquet(&file_name, &options.path.to_string_lossy(), parquet_options)
.await
.map_err(|e| status!("Error registering table", e))?;

let service = FlightSqlServiceImpl::new(file_name, options.path, ctx);
let service = FlightSqlServiceImpl::new(file_name, ctx);
info!("Listening on {addr:?}");
let svc = FlightServiceServer::new(service);

Expand All @@ -132,77 +132,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

pub struct FlightSqlServiceImpl {
contexts: Arc<DashMap<String, Arc<SessionContext>>>,
statements: Arc<DashMap<String, LogicalPlan>>,
results: Arc<DashMap<String, Arc<dyn ExecutionPlan>>>,
execution_plans: Arc<DashMap<String, Arc<dyn ExecutionPlan>>>,
default_ctx: Arc<SessionContext>,
table_name: String,
table_path: String,
iteration: AtomicU32,
}

impl FlightSqlServiceImpl {
fn new(table_name: String, path: PathBuf, default_ctx: Arc<SessionContext>) -> Self {
fn new(table_name: String, default_ctx: Arc<SessionContext>) -> Self {
Self {
contexts: Default::default(),
statements: Default::default(),
results: Default::default(),
execution_plans: Default::default(),
table_name,
table_path: path.to_string_lossy().to_string(),
default_ctx,
iteration: AtomicU32::new(0),
}
}
}

impl FlightSqlServiceImpl {
async fn create_ctx(&self) -> Result<String, Status> {
let uuid = Uuid::new_v4().hyphenated().to_string();

let session_config = SessionConfig::from_env()
.map_err(|e| Status::internal(format!("Error building plan: {e}")))?
.with_information_schema(true);
let ctx = Arc::new(SessionContext::new_with_config(session_config));

// register parquet file with the execution context
ctx.register_parquet(
&self.table_name,
&self.table_path,
ParquetReadOptions::default(),
)
.await
.map_err(|e| status!("Error registering table", e))?;

self.contexts.insert(uuid.clone(), ctx);
fn create_ctx(&self) -> Result<Uuid, Status> {
let uuid = Uuid::new_v4();
debug!("Created context with uuid: {uuid}");
Ok(uuid)
}

fn get_ctx<T>(&self, req: &Request<T>) -> Result<Arc<SessionContext>, Status> {
// get the token from the authorization header on Request
if let Some(auth) = req.metadata().get("authorization") {
let str = auth
.to_str()
.map_err(|e| Status::internal(format!("Error parsing header: {e}")))?;
let authorization = str.to_string();
let bearer = "Bearer ";
if !authorization.starts_with(bearer) {
Err(Status::internal("Invalid auth header!"))?;
}
let auth = authorization[bearer.len()..].to_string();

if let Some(context) = self.contexts.get(&auth) {
Ok(context.clone())
} else {
Err(Status::internal(format!(
"Context handle not found: {auth}"
)))?
}
} else {
Ok(self.default_ctx.clone())
}
fn get_ctx<T>(&self, _req: &Request<T>) -> Result<Arc<SessionContext>, Status> {
Ok(self.default_ctx.clone())
}

fn get_result(&self, handle: &str) -> Result<Arc<dyn ExecutionPlan>, Status> {
if let Some(result) = self.results.get(handle) {
if let Some(result) = self.execution_plans.get(handle) {
Ok(result.clone())
} else {
Err(Status::internal(format!(
Expand All @@ -211,13 +170,8 @@ impl FlightSqlServiceImpl {
}
}

fn remove_plan(&self, handle: &str) -> Result<(), Status> {
self.statements.remove(&handle.to_string());
Ok(())
}

fn remove_result(&self, handle: &str) -> Result<(), Status> {
self.results.remove(&handle.to_string());
self.execution_plans.remove(&handle.to_string());
Ok(())
}
}
Expand All @@ -238,7 +192,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
// see Ballista implementation for example of basic auth
// in this case, we simply accept the connection and create a new SessionContext
// the SessionContext will be re-used within this same connection/session
let token = self.create_ctx().await?;
let token = self.create_ctx()?;

let result = HandshakeResponse {
protocol_version: 0,
Expand Down Expand Up @@ -307,11 +261,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
debug!("execution plan schema: {:?}", schema);
let stream = execution_plan
.execute(fetch_results.partition as usize, ctx.task_ctx())
.unwrap()
// .map_err(|e| status!("Error executing plan", e))?
.map_err(|e| {
panic!("Error executing plan: {:?}", e);
});
.unwrap();
let stream = GcStream::new(stream).map_err(|e| {
panic!("Error executing plan: {:?}", e);
});

let ipc_options = IpcWriteOptions::default().with_preserve_dict_id(false);
let stream = FlightDataEncoderBuilder::new()
Expand All @@ -330,6 +283,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
let user_query = cmd.query.as_str();
info!("running query: {user_query}");

let iteration = self
.iteration
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if iteration == 1 {
// A dirty hack to show memory usage.
let current_cache_memory_usage =
ArrowArrayCache::get().stats().memory_usage();
info!("cache memory usage: {}", current_cache_memory_usage);
}

let ctx = self.get_ctx(&request)?;

let plan = ctx.sql(user_query).await.expect("Error generating plan");
Expand All @@ -345,7 +308,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let schema = physical_plan.schema();

let handle = Uuid::new_v4().hyphenated().to_string();
self.results.insert(handle.clone(), physical_plan);
self.execution_plans.insert(handle.clone(), physical_plan);

let flight_desc = FlightDescriptor {
r#type: DescriptorType::Cmd.into(),
Expand Down Expand Up @@ -373,24 +336,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(resp)
}

async fn get_flight_info_prepared_statement(
&self,
_cmd: CommandPreparedStatementQuery,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
info!("get_flight_info_prepared_statement");
panic!("not implemented");
}

async fn get_flight_info_tables(
&self,
_query: CommandGetTables,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
info!("get_flight_info_tables");
panic!("not implemented");
}

async fn do_put_prepared_statement_update(
&self,
_handle: CommandPreparedStatementUpdate,
Expand All @@ -402,42 +347,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(-1)
}

async fn do_action_create_prepared_statement(
&self,
query: ActionCreatePreparedStatementRequest,
request: Request<Action>,
) -> Result<ActionCreatePreparedStatementResult, Status> {
let user_query = query.query.as_str();
info!("do_action_create_prepared_statement: {user_query}");

let ctx = self.get_ctx(&request)?;

let plan = ctx
.sql(user_query)
.await
.and_then(|df| df.into_optimized_plan())
.map_err(|e| Status::internal(format!("Error building plan: {e}")))?;

// store a copy of the plan, it will be used for execution
let plan_uuid = Uuid::new_v4().hyphenated().to_string();
self.statements.insert(plan_uuid.clone(), plan.clone());

let plan_schema = plan.schema();

let arrow_schema = (&**plan_schema).into();
let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default())
.try_into()
.map_err(|e| status!("Unable to serialize schema", e))?;
let IpcMessage(schema_bytes) = message;

let res = ActionCreatePreparedStatementResult {
prepared_statement_handle: plan_uuid.into(),
dataset_schema: schema_bytes,
parameter_schema: Default::default(),
};
Ok(res)
}

async fn do_action_close_prepared_statement(
&self,
handle: ActionClosePreparedStatementRequest,
Expand All @@ -446,7 +355,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
let handle = std::str::from_utf8(&handle.prepared_statement_handle);
if let Ok(handle) = handle {
info!("do_action_close_prepared_statement: removing plan and results for {handle}");
let _ = self.remove_plan(handle);
let _ = self.remove_result(handle);
}
Ok(())
Expand Down
50 changes: 49 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use datafusion::execution::cache::cache_unit::Cache37;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanVisitor;
use datafusion::prelude::ParquetReadOptions;
use datafusion::{
error::{DataFusionError, Result},
Expand All @@ -38,6 +40,7 @@ use datafusion_common::exec_datafusion_err;
use datafusion_common::instant::Instant;
use datafusion_flight_table::sql::FlightSqlDriver;
use datafusion_flight_table::sql::USERNAME;
use datafusion_flight_table::FlightExec;
use datafusion_flight_table::FlightTableFactory;
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
Expand Down Expand Up @@ -229,6 +232,7 @@ impl RunOpt {
pretty::print_batches(&result)?;
}
}

let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
let row_count: usize = result.iter().map(|b| b.num_rows()).sum();
Expand Down Expand Up @@ -297,10 +301,17 @@ impl RunOpt {
}
}

benchmark_run.write_iter(
let mut metrics_visitor = ExecutionPlanMetricCollector::new();
datafusion::physical_plan::visit_execution_plan(
physical_plan.as_ref(),
&mut metrics_visitor,
)?;

benchmark_run.write_iter_with_metrics(
elapsed,
row_count,
Cache37::consume_bytes_read(),
metrics_visitor.into_hashmap(),
);
}
}
Expand Down Expand Up @@ -375,3 +386,40 @@ impl RunOpt {
}
}
}

struct ExecutionPlanMetricCollector {
metrics: HashMap<String, usize>,
}

impl ExecutionPlanMetricCollector {
fn new() -> Self {
Self {
metrics: HashMap::new(),
}
}

fn into_hashmap(self) -> HashMap<String, usize> {
self.metrics
}
}

impl ExecutionPlanVisitor for ExecutionPlanMetricCollector {
type Error = DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool> {
if let Some(_flight_exec) = plan.as_any().downcast_ref::<FlightExec>() {
if let Some(metrics) = plan.metrics() {
let agg = metrics.aggregate_by_name().timestamps_removed();
for m in agg.iter() {
self.metrics
.insert(m.value().name().to_string(), m.value().as_usize());
}
}
}
Ok(true)
}

fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result<bool> {
Ok(true)
}
}
Loading

0 comments on commit 534f07b

Please sign in to comment.