diff --git a/arrow-datafusion b/arrow-datafusion index dc1e3f9..2bebc24 160000 --- a/arrow-datafusion +++ b/arrow-datafusion @@ -1 +1 @@ -Subproject commit dc1e3f945a6b64b5156cbc0dbe2bcbceaf686adf +Subproject commit 2bebc24e9a74c6c4f9a1faba2fd2fa58c53f95fb diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml index 9ae2a23..3121ef2 100644 --- a/scheduler/Cargo.toml +++ b/scheduler/Cargo.toml @@ -6,19 +6,19 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -name = "lib" +name = "chronos" path = "src/lib.rs" [[bin]] -name = "scheduler-api-server" -path = "src/proto_server.rs" +name = "scheduler-service" +path = "src/scheduler_service.rs" [[bin]] -name = "executor" +name = "executor-service" path = "src/executor.rs" [[bin]] -name = "cli" +name = "scheduler-cli" path = "src/cli.rs" [[bin]] @@ -41,6 +41,10 @@ futures = "0.3.30" anyhow = "1.0.82" clap = "4.5.4" bytes = "0.4.12" +ahash = "0.8.11" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + [build-dependencies] -tonic-build = "0.11" +tonic-build = "0.11" \ No newline at end of file diff --git a/scheduler/proto/executor.proto b/scheduler/proto/executor.proto index f164d6d..00fec58 100644 --- a/scheduler/proto/executor.proto +++ b/scheduler/proto/executor.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package executor_interface; +// Arguments for the ExecuteQuery RPC. message ExecuteQueryArgs { // The query fragment identifier. int32 fragment_id = 1; diff --git a/scheduler/proto/scheduler.proto b/scheduler/proto/scheduler.proto index 87aef21..86a937d 100644 --- a/scheduler/proto/scheduler.proto +++ b/scheduler/proto/scheduler.proto @@ -47,15 +47,15 @@ message ScheduleQueryRet { // The identifier to refer to this query execution. int32 query_id = 1; - // The path info for intermediate files bytes file_scan_config = 2; // The status of query execution. //QueryStatus status = 3; - // TODO - // enqueue_time + // The time the query was scheduled for execution in miliseconds since + // the Unix epoch. + uint64 enqueue_time = 3; // TODO // finish_time @@ -83,13 +83,14 @@ message QueryExecutionDoneArgs { // The path info for intermediate files bytes file_scan_config = 3; - + // Whether the file is root bool root = 4; // Query id of the whole plan int32 query_id = 5; + bool generated_hash_table = 6; } // Information returned from the QueryExecutionDone RPC. @@ -100,16 +101,13 @@ message QueryExecutionDoneRet { } -message RegisterExecutorArgs { - int32 port = 1; -} - -message RegisterExecutorRet { +message GetQueryArgs { } -message GetQueryArgs { - +message HashBuildDataInfo { + bytes path_from_parent = 1; + int32 build_fragment_id = 2; } message GetQueryRet { @@ -121,10 +119,11 @@ message GetQueryRet { bool root = 4; + repeated HashBuildDataInfo hash_build_data_info = 5; } // The scheduler interface. -service SchedulerService { +service Scheduler { // Used by the optimizer to schedule a new query. rpc ScheduleQuery(ScheduleQueryArgs) returns (ScheduleQueryRet); @@ -132,8 +131,6 @@ service SchedulerService { // Used by the optimizer to query the status of a job. rpc QueryJobStatus(QueryJobStatusArgs) returns (QueryJobStatusRet); - rpc RegisterExecutor(RegisterExecutorArgs) returns (RegisterExecutorRet); - rpc GetQuery(GetQueryArgs) returns (GetQueryRet); // Used by the execution engine to notify the scheduler that the execution diff --git a/scheduler/src/bench.rs b/scheduler/src/bench.rs index a007cf0..6455b61 100644 --- a/scheduler/src/bench.rs +++ b/scheduler/src/bench.rs @@ -1,16 +1,16 @@ +use chronos::integration::scan_from_parquet; +use chronos::scheduler_interface::scheduler_client::SchedulerClient; use datafusion_proto::protobuf::FileScanExecConf; -use lib::integration::scan_from_parquet; -use lib::scheduler_interface::scheduler_service_client::SchedulerServiceClient; use bytes::IntoBuf; +use chronos::debug_println; +use chronos::scheduler_interface::{QueryInfo, ScheduleQueryArgs}; use datafusion::physical_plan; use datafusion::prelude::*; use datafusion_proto::bytes::physical_plan_to_bytes; use datafusion_proto::physical_plan::from_proto; use futures::stream::TryStreamExt; -use lib::debug_println; -use lib::scheduler_interface::{QueryInfo, ScheduleQueryArgs}; use prost::Message; use std::time::Instant; @@ -52,8 +52,8 @@ async fn run_and_time_bench(query_num: u32, num_runs: u32) -> f64 { panic!("Scheduler port environment variable not set"); }); let uri = format!("http://[::1]:{scheduler_service_port}"); - let mut client: SchedulerServiceClient = - SchedulerServiceClient::connect(uri.clone()) + let mut client: SchedulerClient = + SchedulerClient::connect(uri.clone()) .await .unwrap_or_else(|error| { panic!("Unable to connect to the scheduler instance: {:?}", error); diff --git a/scheduler/src/cli.rs b/scheduler/src/cli.rs index 62e08d0..a9a55f9 100644 --- a/scheduler/src/cli.rs +++ b/scheduler/src/cli.rs @@ -1,16 +1,16 @@ +use chronos::integration::scan_from_parquet; +use chronos::scheduler_interface::scheduler_client::SchedulerClient; use datafusion_proto::protobuf::FileScanExecConf; -use lib::integration::scan_from_parquet; -use lib::scheduler_interface::scheduler_service_client::SchedulerServiceClient; use bytes::IntoBuf; +use chronos::debug_println; +use chronos::scheduler_interface::{QueryInfo, ScheduleQueryArgs}; use datafusion::arrow::{array::RecordBatch, util::pretty}; use datafusion::physical_plan; use datafusion::prelude::*; use datafusion_proto::bytes::physical_plan_to_bytes; use datafusion_proto::physical_plan::from_proto; use futures::stream::TryStreamExt; -use lib::debug_println; -use lib::scheduler_interface::{QueryInfo, ScheduleQueryArgs}; use prost::Message; use std::io::{self, Write}; @@ -47,7 +47,7 @@ async fn main() -> Result<(), Box> { panic!("Scheduler port environment variable not set"); }); let uri = format!("http://[::1]:{scheduler_service_port}"); - let mut client = SchedulerServiceClient::connect(uri.clone()) + let mut client = SchedulerClient::connect(uri.clone()) .await .unwrap_or_else(|error| { panic!("Unable to connect to the scheduler instance: {:?}", error); diff --git a/scheduler/src/executor.rs b/scheduler/src/executor.rs index 332cd0c..84b0f73 100644 --- a/scheduler/src/executor.rs +++ b/scheduler/src/executor.rs @@ -1,30 +1,57 @@ +use async_recursion::async_recursion; +use chronos::executor_interface::executor_service_server::ExecutorService; +use chronos::executor_interface::{ExecuteQueryArgs, ExecuteQueryRet}; +use chronos::scheduler_interface::scheduler_client::SchedulerClient; use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::execution::memory_pool::MemoryConsumer; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::PhysicalExprRef; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::joins::utils::JoinHashMap; +use datafusion::physical_plan::joins::JoinLeftData; +use datafusion::physical_plan::joins::{update_hash, HashBuildExec}; +use datafusion_common::arrow::compute::concat_batches; + +use datafusion_common::DataFusionError; use datafusion_proto::bytes::physical_plan_from_bytes; use datafusion_proto::protobuf::FileScanExecConf; -use lib::executor_interface::executor_service_server::ExecutorService; -use lib::executor_interface::{ExecuteQueryArgs, ExecuteQueryRet}; -use lib::scheduler_interface::scheduler_service_client::SchedulerServiceClient; -use lib::scheduler_interface::{GetQueryArgs, GetQueryRet, QueryExecutionDoneArgs}; -use tokio::runtime::Handle; +use chronos::scheduler_interface::{ + GetQueryArgs, GetQueryRet, QueryExecutionDoneArgs, QueryStatus, +}; +use futures::TryStreamExt; +use tokio::sync::RwLock; use tonic::{Code, Request, Response, Status}; +use ahash::RandomState; +use chronos::integration::{local_file_config, spill_records_to_disk}; use core::time; +use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::prelude::*; -use lib::integration::{local_file_config, spill_records_to_disk}; use prost::Message; -use std::env; +use std::collections::HashMap; +use std::sync::Arc; use std::thread::sleep; -use tokio::fs; -use lib::debug_println; +use std::env; + +use chronos::debug_println; + +enum QueryResult { + Config(FileScanConfig), + HashTable, +} +/// An entity that executes query plans. #[derive(Debug, Default)] -pub struct MyExecutor {} -use datafusion::physical_plan; +pub struct Executor { + random_state: RandomState, + generated_hash_tables: Arc>>, +} +use datafusion::physical_plan::{self, ExecutionPlan}; #[tonic::async_trait] -impl ExecutorService for MyExecutor { +impl ExecutorService for Executor { async fn execute_query( &self, request: Request, @@ -36,113 +63,274 @@ impl ExecutorService for MyExecutor { } } -async fn process_fragment(get_query_response: GetQueryRet, ctx: &SessionContext) -> FileScanConfig { - let wd = env::current_dir().unwrap(); - let wd_str = wd.to_str().unwrap(); - - let query_id = get_query_response.query_id; - let fragment_id = get_query_response.fragment_id; - let process_plan = physical_plan_from_bytes(&get_query_response.physical_plan, ctx).unwrap(); - - let output_schema = process_plan.schema(); - let context = ctx.state().task_ctx(); - let output_stream = physical_plan::execute_stream(process_plan.clone(), context).unwrap(); +impl Executor { + async fn process_fragment( + &self, + get_query_response: GetQueryRet, + ctx: &SessionContext, + ) -> QueryResult { + let wd = env::current_dir().unwrap(); + let wd_str = wd.to_str().unwrap(); + + let query_id = get_query_response.query_id; + let fragment_id = get_query_response.fragment_id; + let process_plan = + physical_plan_from_bytes(&get_query_response.physical_plan, ctx).unwrap(); + let output_schema = process_plan.schema(); + let context = ctx.state().task_ctx(); + + // If this plan requires us to build a hash table. + if let Some(node) = process_plan.as_any().downcast_ref::() { + let input = node.input().clone(); + let on = node.on.clone(); + let join_data = self + .build_hash_table(None, input, context.clone(), on) + .await + .expect("Failed to build a hash table"); + self.generated_hash_tables + .write() + .await + .insert(fragment_id, join_data); + } + + // If we need to add a precomputed hash table to a hash probe exec node + for hash_build_info in get_query_response.hash_build_data_info { + let path_from_parent = hash_build_info.path_from_parent; + let path_from_parent_vec: Vec = serde_json::from_slice(&path_from_parent).unwrap(); + let build_fragment_id = hash_build_info.build_fragment_id; + let join_data = self + .generated_hash_tables + .read() + .await + .get(&build_fragment_id) + .expect("Unable to find the built hash table") + .clone(); + let _modified_plan = self + .add_hash_table_to_hash_probe( + join_data, + process_plan.clone(), + &path_from_parent_vec, + ) + .await; + } + + let output_stream = physical_plan::execute_stream(process_plan, context).unwrap(); + + let intermediate_output = format!( + "{wd_str}/scheduler/src/example_data/query_{query_id}_fragment_{fragment_id}.parquet" + ); + + spill_records_to_disk( + &intermediate_output, + output_stream, + output_schema.clone(), + get_query_response.root, + ) + .await + .unwrap(); + QueryResult::Config(local_file_config( + output_schema, + intermediate_output.as_str(), + )) + } - let intermediate_output = format!( - "/{wd_str}/scheduler/src/example_data/query_{query_id}_fragment_{fragment_id}.parquet" - ); + async fn build_hash_table( + &self, + partition: Option, + node: Arc, + context: Arc, + on: Vec, + ) -> Result { + let schema = node.schema(); + + let (node_input, node_input_partition) = if let Some(partition) = partition { + (node, partition) + } else if node.output_partitioning().partition_count() != 1 { + (Arc::new(CoalescePartitionsExec::new(node)) as _, 0) + } else { + (node, 0) + }; - spill_records_to_disk( - &intermediate_output, - output_stream, - output_schema.clone(), - get_query_response.root, - ) - .await - .unwrap(); + // Depending on partition argument load single partition or whole left side in memory + let stream = node_input.execute(node_input_partition, context.clone())?; + + // This operation performs 2 steps at once: + // 1. creates a [JoinHashMap] of all batches from the stream + // 2. stores the batches in a vector. + let initial = (Vec::new(), 0); + + let (batches, num_rows) = stream + .try_fold(initial, |mut acc, batch| async { + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }) + .await + .unwrap(); + + let mut hashmap = JoinHashMap::with_capacity(num_rows); + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + // Updating hashmap starting from the last batch + let batches_iter = batches.iter().rev(); + for batch in batches_iter.clone() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + let _ = update_hash( + &on, + batch, + &mut hashmap, + offset, + &self.random_state, + &mut hashes_buffer, + 0, + true, + ); + offset += batch.num_rows(); + } + let single_batch = concat_batches(&schema, batches_iter)?; + let _reservation = + MemoryConsumer::new("HashJoinProbe".to_string()).register(context.memory_pool()); + let data = JoinLeftData::new( + hashmap, + single_batch, + node_input.output_partitioning().clone(), + ); + + Ok(data) + } - local_file_config(output_schema, intermediate_output.as_str()) -} + #[async_recursion] + async fn add_hash_table_to_hash_probe( + &self, + join_data: JoinLeftData, + plan: Arc, + path_from_parent: &[u32], + ) -> Arc { + if path_from_parent.is_empty() { + return plan; + } + + let mut new_children = Vec::new(); + let children = plan.children(); + + for (child_num, child) in (0_u32..).zip(children.into_iter()) { + if child_num != path_from_parent[0] { + new_children.push(child); + continue; + } + new_children.push( + self.add_hash_table_to_hash_probe( + join_data.clone(), + plan.clone(), + &path_from_parent[1..], + ) + .await, + ); + } + plan.with_new_children(new_children).unwrap() + } -async fn initialize(port: i32, delete_intermediate: bool) { - let scheduler_service_port = env::var("SCHEDULER_PORT").unwrap_or_else(|_error| { - panic!("Scheduler port environment variable not set"); - }); - let uri = format!("http://[::1]:{scheduler_service_port}"); - let mut client = SchedulerServiceClient::connect(uri.clone()) - .await - .unwrap_or_else(|error| { - panic!("Unable to connect to the scheduler instance: {:?}", error); + async fn initialize(&self, port: i32, delete_intermediate: bool) { + let scheduler_service_port = env::var("SCHEDULER_PORT").unwrap_or_else(|_error| { + panic!("Scheduler port environment variable not set"); }); - - debug_println!( - "executor at port {port} connected to the scheduler at {}", - &uri - ); - let ctx = SessionContext::new(); - - loop { - let get_request = tonic::Request::new(GetQueryArgs {}); - match client.get_query(get_request).await { - Ok(response) => { - let response = response.into_inner(); - if response.query_id < 0 { - sleep(time::Duration::from_millis(500)); - continue; - } - - let interm_file = process_fragment(response.clone(), &ctx).await; - let interm_proto = FileScanExecConf::try_from(&interm_file).unwrap(); - - let finished_request = tonic::Request::new(QueryExecutionDoneArgs { - fragment_id: response.fragment_id, - status: 0, - file_scan_config: interm_proto.encode_to_vec(), - root: response.root, - query_id: response.query_id, - }); - - match client.query_execution_done(finished_request).await { - Err(e) => { - debug_println!("Finished reply unsuccessful: {:?}", e); - //client.kill_query_execution(); TODO + let uri = format!("http://[::1]:{scheduler_service_port}"); + let mut client = SchedulerClient::connect(uri.clone()) + .await + .unwrap_or_else(|error| { + panic!("Unable to connect to the scheduler instance: {:?}", error); + }); + + debug_println!( + "executor at port {port} connected to the scheduler at {}", + &uri + ); + let ctx = SessionContext::new(); + + loop { + let get_request = tonic::Request::new(GetQueryArgs {}); + match client.get_query(get_request).await { + Ok(response) => { + let response = response.into_inner(); + if response.query_id < 0 { + sleep(time::Duration::from_millis(500)); + continue; } - Ok(finished_response) => { - debug_println!("reply for finishing query frag received"); - debug_println!("response : {:?}", finished_response); - if delete_intermediate { - let mut response = finished_response.into_inner(); - - for file in &mut response.intermediate_files { - file.insert(0, '/'); - } - let handles = response - .intermediate_files - .into_iter() - .map(tokio::fs::remove_file) - .map(tokio::spawn) - .collect::>(); + let result = self.process_fragment(response.clone(), &ctx).await; + + let finished_request: tonic::Request; + + match result { + QueryResult::Config(config) => { + let interm_proto = FileScanExecConf::try_from(&config).unwrap(); + finished_request = tonic::Request::new(QueryExecutionDoneArgs { + fragment_id: response.fragment_id, + status: QueryStatus::Done.into(), + file_scan_config: interm_proto.encode_to_vec(), + root: response.root, + query_id: response.query_id, + generated_hash_table: false, + }); + } + QueryResult::HashTable => { + finished_request = tonic::Request::new(QueryExecutionDoneArgs { + fragment_id: response.fragment_id, + status: QueryStatus::Done.into(), + file_scan_config: vec![], + root: response.root, + query_id: response.query_id, + generated_hash_table: true, + }); + } + }; - let _results = futures::future::join_all(handles).await; + match client.query_execution_done(finished_request).await { + Err(e) => { + debug_println!("Finished reply unsuccessful: {:?}", e); + //client.kill_query_execution(); TODO + } + Ok(finished_response) => { + debug_println!("reply for finishing query frag received"); + debug_println!("response : {:?}", finished_response); + if delete_intermediate { + let mut response = finished_response.into_inner(); + + for file in &mut response.intermediate_files { + file.insert(0, '/'); + } + + let handles = response + .intermediate_files + .into_iter() + .map(tokio::fs::remove_file) + .map(tokio::spawn) + .collect::>(); + + let _results = futures::future::join_all(handles).await; + } } } } - } - Err(e) => match e.code() { - Code::Unavailable => { - debug_println!("get_query rpc unsuccessful: {:?}", e); - debug_println!("executor on port {port} is exiting"); - break; - } - _ => { - debug_println!("unhandled status {:?}", e); - debug_println!("go implement handler, sleeping for 500ms..."); - sleep(time::Duration::from_millis(500)); - } - }, - }; + Err(e) => match e.code() { + Code::Unavailable => { + debug_println!("get_query rpc unsuccessful: {:?}", e); + debug_println!("executor on port {port} is exiting"); + break; + } + _ => { + debug_println!("unhandled status {:?}", e); + debug_println!("go implement handler, sleeping for 500ms..."); + sleep(time::Duration::from_millis(500)); + } + }, + } + } } } @@ -158,11 +346,18 @@ async fn main() -> Result<(), Box> { let mut handles = Vec::new(); let base_port = 5555; - let _handle = Handle::current; + let generated_hash_tables = Arc::new(RwLock::new(HashMap::new())); for i in 0..num_workers { + let generated_hash_tables = generated_hash_tables.clone(); handles.push(tokio::spawn(async move { - initialize(base_port + i, delete_intermediate).await; + let executor = Executor { + random_state: RandomState::with_seeds(0, 0, 0, 0), + generated_hash_tables, + }; + executor + .initialize(base_port + i, delete_intermediate) + .await; })); } diff --git a/scheduler/src/integration.rs b/scheduler/src/integration.rs index 059bd72..f347de2 100644 --- a/scheduler/src/integration.rs +++ b/scheduler/src/integration.rs @@ -1,4 +1,4 @@ -use crate::parser::PhysicalPlanFragment; +use crate::parser::QueryFragment; use crate::scheduler; use crate::scheduler::SCHEDULER_INSTANCE; @@ -79,10 +79,10 @@ pub async fn process_sql_request( item_id: u64, ) -> Result<(), Box> { let sql = format!( - "SELECT a.*, b.price, a.quantity * b.price as total - FROM orders a inner join prices b - ON a.item_id = b.item_id - and a.item_id = {} + "SELECT a.*, b.price, a.quantity * b.price as total + FROM orders a inner join prices b + ON a.item_id = b.item_id + and a.item_id = {} ORDER by a.order_id", item_id ); @@ -96,7 +96,7 @@ pub async fn process_sql_request( } pub async fn process_physical_fragment( - fragment: PhysicalPlanFragment, + fragment: QueryFragment, ctx: &SessionContext, abs_path_str: &str, id: u64, @@ -256,10 +256,10 @@ mod tests { .await?; // create a plan to run a SQL query - let sql = "SELECT a.*, b.price, a.quantity * b.price as total - FROM orders a inner join prices b - ON a.item_id = b.item_id - and a.item_id = 6 + let sql = "SELECT a.*, b.price, a.quantity * b.price as total + FROM orders a inner join prices b + ON a.item_id = b.item_id + and a.item_id = 6 ORDER by a.order_id"; let logical_plan = ctx.state().create_logical_plan(sql).await?; let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?; diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index 84c25d0..f9311dd 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,6 +1,11 @@ +//! A scheduler for databases. +//! +//! Provides inter-query and intra-query parallelism by splitting up queries +//! into fragments. + pub mod integration; mod parser; -pub mod queue; +mod queue; pub mod scheduler; pub mod scheduler_interface { @@ -14,6 +19,3 @@ pub mod executor_interface { macro_rules! debug_println { ($($arg:tt)*) => (if ::std::cfg!(debug_assertions) { ::std::println!($($arg)*); }) } - -// mod mock_executor; -// mod proto_server; diff --git a/scheduler/src/parser.rs b/scheduler/src/parser.rs index ea521b4..163e9b6 100644 --- a/scheduler/src/parser.rs +++ b/scheduler/src/parser.rs @@ -20,17 +20,17 @@ pub type QueryFragmentId = u64; // Metadata struct for now #[derive(Debug, Clone)] -pub struct PhysicalPlanFragment { - // The id assigned with this [`PhysicalPlanFragment`] +pub struct QueryFragment { + // The id assigned with this [`QueryFragment`] pub fragment_id: QueryFragmentId, // The id of the query which this plan fragment belongs to pub query_id: QueryId, - // the entry into this [`PhysicalPlanFragment`] + // the entry into this [`QueryFragment`] pub root: Option>, - // convenience pointers into the parent [`PhysicalPlanFragment`] + // convenience pointers into the parent [`QueryFragment`] pub parent_path_from_root: Vec>, // vector of dependant fragment ids @@ -52,10 +52,11 @@ pub struct PhysicalPlanFragment { } // Function to populate the cost of running a fragment. +// // Currently it goes through all the execution plan nodes in the fragment // and sums up the number of rows based on provided statistics. -// It can later used for sophisticated costs provided by the optimizer -async fn populate_fragment_cost(fragment: &mut PhysicalPlanFragment) { +// It can later used for sophisticated costs provided by the optimizer. +async fn populate_fragment_cost(fragment: &mut QueryFragment) { let mut cur_cost = 0; let root = fragment.root.clone().unwrap(); @@ -83,18 +84,18 @@ async fn populate_fragment_cost(fragment: &mut PhysicalPlanFragment) { } } -// Wrapper function for parsing into fragments +/// Parse `root` into query fragments. pub async fn parse_into_fragments_wrapper( root: Arc, query_id: u64, priority: i64, pipelined: bool, -) -> HashMap { +) -> HashMap { let fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let mut output = HashMap::::new(); + let mut output = HashMap::::new(); - let root_fragment = PhysicalPlanFragment { + let root_fragment = QueryFragment { query_id, fragment_id, root: None, @@ -108,41 +109,54 @@ pub async fn parse_into_fragments_wrapper( }; let path = Vec::::new(); output.insert(root_fragment.fragment_id, root_fragment); - let new_root = if pipelined { - parse_into_fragments(root, fragment_id, &mut output, query_id, path, priority).await - } else { - parse_into_fragments_naive(root, fragment_id, &mut output, query_id, path, priority).await - }; + let new_root = parse_into_fragments( + root, + fragment_id, + &mut output, + query_id, + path, + priority, + pipelined, + ) + .await; output.get_mut(&fragment_id).unwrap().root = Some(new_root); populate_fragment_cost(output.get_mut(&fragment_id).unwrap()).await; output } -// Turn this into DAG traversal with book keeping -// Recursively return child node to the parent node -// A child node is either a: -// dummy scan node (to be modified data is executed and returned) or -// a valid execution node -// When a node has siblings, it save itself into the output vector (since it is the start of a -// fragment) and returns a dummy scan node +/// Parse the given query fragment rooted at `root` with id `fragment_id` +/// into individual fragments. +/// +/// Generated fragments are added to `output`. 'path' is the current path from +/// the root of the physical plan to the current execution node. 'query_id` +/// is the id of the query that this fragment is a part of. +/// +/// TODO: Turn this into DAG traversal with book keeping +/// Recursively return child node to the parent node +/// A child node is either a: +/// dummy scan node (to be modified data is executed and returned) or +/// a valid execution node +/// When a node has siblings, it save itself into the output vector (since it is the start of a +/// fragment) and returns a dummy scan node #[async_recursion] pub async fn parse_into_fragments( root: Arc, fragment_id: QueryFragmentId, - output: &mut HashMap, + output: &mut HashMap, query_id: u64, mut path: Vec, priority: i64, + pipelined: bool, ) -> Arc { let children = root.children(); - // Trivial case of no children + // Trivial case of no children. if children.is_empty() { return root; } - // Single child just go down + // Single child just go down. if children.len() == 1 { path.push(0); let new_child = parse_into_fragments( @@ -152,161 +166,59 @@ pub async fn parse_into_fragments( query_id, path, priority, + pipelined, ) .await; return root.with_new_children(vec![new_child]).unwrap(); } - let mut new_children = Vec::>::new(); - - if let Some(node) = root.as_any().downcast_ref::() { - let build_side = node.left.clone(); - - let build_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let build_fragment = PhysicalPlanFragment { - query_id, - fragment_id: build_fragment_id, - root: None, - parent_path_from_root: Vec::new(), - child_fragments: Vec::new(), - parent_fragments: vec![fragment_id], - enqueued_time: None, - fragment_cost: None, - query_priority: 0, - intermediate_files: HashSet::::new(), - }; - output.insert(build_fragment_id, build_fragment); - - let parsed_build_side = parse_into_fragments( - build_side, - build_fragment_id, - output, - query_id, - Vec::new(), - priority, - ) - .await; - - let build_side_new = HashBuildExec::try_new( - parsed_build_side.clone(), - node.on.iter().map(|on| on.0.clone()).collect(), - None, - &node.join_type, - None, - node.mode, - node.null_equals_null, - ) - .unwrap(); - - let build_fragment_ref = output.get_mut(&build_fragment_id).unwrap(); - let mut new_path = path.clone(); - new_path.push(0); - build_fragment_ref.parent_path_from_root.push(new_path); - build_fragment_ref.root = Some(Arc::new(build_side_new)); - - let probe_side = node.right.clone(); - let probe_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let parsed_probe_side = parse_into_fragments( - probe_side, - probe_fragment_id, + // If we encounter a hash build execution node we should execute the build + // side as a separate fragment. + if pipelined && root.as_any().downcast_ref::().is_some() { + return create_build_fragment( + root, + fragment_id, output, query_id, - Vec::new(), + path, priority, + pipelined, ) .await; - - let new_root = HashProbeExec::try_new( - parsed_build_side, - parsed_probe_side, - node.on.clone(), - node.filter.clone(), - &node.join_type, - None, - node.mode, - node.null_equals_null, - ); - return Arc::new(new_root.unwrap()); } - for (child_num, child) in (0_u32..).zip(children.into_iter()) { - let child_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let child_query_fragment = PhysicalPlanFragment { - query_id, - fragment_id: child_fragment_id, - root: None, - parent_path_from_root: vec![], - child_fragments: vec![], - parent_fragments: vec![fragment_id], - query_priority: priority, - enqueued_time: None, - fragment_cost: None, - intermediate_files: HashSet::::new(), - }; - output.insert(child_fragment_id, child_query_fragment); - - let new_child = - parse_into_fragments(child, child_fragment_id, output, query_id, vec![], priority) - .await; - - let dummy_scan_node = create_dummy_scans(&new_child).await.unwrap(); - new_children.push(dummy_scan_node); - - // Get a reference to the newly created child fragment - let child_fragment_ref = output.get_mut(&child_fragment_id).unwrap(); - let mut new_path = path.clone(); - new_path.push(child_num); - child_fragment_ref.parent_path_from_root.push(new_path); - child_fragment_ref.root = Some(new_child); - populate_fragment_cost(output.get_mut(&child_fragment_id).unwrap()).await; - - output - .get_mut(&fragment_id) - .unwrap() - .child_fragments - .push(child_fragment_id); - } + // Otherwise we should create the child fragments. + let new_children = create_child_fragments( + children, + fragment_id, + output, + query_id, + path, + priority, + pipelined, + ) + .await; let new_root = root.with_new_children(new_children); new_root.unwrap() } -#[async_recursion] -pub async fn parse_into_fragments_naive( - root: Arc, +/// Parse `children` into query fragments. +async fn create_child_fragments( + children: Vec>, fragment_id: QueryFragmentId, - output: &mut HashMap, + output: &mut HashMap, query_id: u64, - mut path: Vec, + path: Vec, priority: i64, -) -> Arc { - let children = root.children(); - - // Trivial case of no children - if children.is_empty() { - return root; - } - - // Single child just go down - if children.len() == 1 { - path.push(0); - let new_child = parse_into_fragments_naive( - children[0].clone(), - fragment_id, - output, - query_id, - path, - priority, - ) - .await; - return root.with_new_children(vec![new_child]).unwrap(); - } - + pipelined: bool, +) -> Vec> { let mut new_children = Vec::>::new(); + // Iterate through all the children for (child_num, child) in (0_u32..).zip(children.into_iter()) { let child_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); - let child_query_fragment = PhysicalPlanFragment { + let child_query_fragment = QueryFragment { query_id, fragment_id: child_fragment_id, root: None, @@ -320,20 +232,23 @@ pub async fn parse_into_fragments_naive( }; output.insert(child_fragment_id, child_query_fragment); - let new_child = parse_into_fragments_naive( + let new_child = parse_into_fragments( child, child_fragment_id, output, query_id, vec![], priority, + pipelined, ) .await; + // Add a dummy scan node as a placeholder for the output of this + // just-created query fragment. let dummy_scan_node = create_dummy_scans(&new_child).await.unwrap(); new_children.push(dummy_scan_node); - // Get a reference to the newly created child fragment + // Get a reference to the newly-created child fragment. let child_fragment_ref = output.get_mut(&child_fragment_id).unwrap(); let mut new_path = path.clone(); new_path.push(child_num); @@ -347,13 +262,108 @@ pub async fn parse_into_fragments_naive( .child_fragments .push(child_fragment_id); } + new_children +} - let new_root = root.with_new_children(new_children); - new_root.unwrap() +/// Split the hash join execution `node` into build and probe fragments. +/// Returns a new [`ExecutionPlan`] that represents the results of executing +/// this hash join. +async fn create_build_fragment( + arc_node: Arc, + fragment_id: QueryFragmentId, + output: &mut HashMap, + query_id: u64, + path: Vec, + priority: i64, + pipelined: bool, +) -> Arc { + let node = arc_node.as_any().downcast_ref::().unwrap(); + let build_side = node.left.clone(); + + // Create the build fragment with default parameters and add it to + // `output`. + let build_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); + let build_fragment = QueryFragment { + query_id, + fragment_id: build_fragment_id, + root: None, + parent_path_from_root: Vec::new(), + child_fragments: Vec::new(), + parent_fragments: vec![fragment_id], + enqueued_time: None, + fragment_cost: None, + query_priority: 0, + intermediate_files: HashSet::::new(), + }; + output.insert(build_fragment_id, build_fragment); + + // Parse the build side and create a [`HashBuildExec`] execution node + // for it. + let parsed_build_side = parse_into_fragments( + build_side, + build_fragment_id, + output, + query_id, + Vec::new(), + priority, + pipelined, + ) + .await; + + let build_side_new = HashBuildExec::try_new( + parsed_build_side.clone(), + node.on.iter().map(|on| on.0.clone()).collect(), + None, + &node.join_type, + None, + node.mode, + node.null_equals_null, + ) + .unwrap(); + + let build_fragment_ref = output.get_mut(&build_fragment_id).unwrap(); + let mut new_path = path.clone(); + new_path.push(0); + build_fragment_ref.parent_path_from_root.push(new_path); + build_fragment_ref.root = Some(Arc::new(build_side_new)); + + // TODO(Aditya): the probe side should not be a separate fragment, + // it should extent the current fragment to form a long pipeline. + let probe_side = node.right.clone(); + let probe_fragment_id = FRAGMENT_ID_GENERATOR.fetch_add(1, Ordering::SeqCst); + let parsed_probe_side = parse_into_fragments( + probe_side, + probe_fragment_id, + output, + query_id, + Vec::new(), + priority, + pipelined, + ) + .await; + + let stats = node.statistics().unwrap(); + + Arc::new( + HashProbeExec::try_new( + parsed_probe_side, + node.on.clone(), + node.filter.clone(), + &node.join_type, + node.projection.clone(), + node.mode, + node.null_equals_null, + node.schema(), + node.column_indices.clone(), + node.properties().clone(), + stats, + ) + .unwrap(), + ) } -// Dummy Scan nodes will created using [`plan`], attached to its parents. -// Update these dummy nodes as results are produced by the execution team. +/// Dummy Scan nodes will created using [`plan`], attached to its parents. +/// Update these dummy nodes as results are produced by the execution team. async fn create_dummy_scans(plan: &Arc) -> Result> { let empty_table = Arc::new(EmptyTable::new(plan.schema())); let table_source = Arc::new(DefaultTableSource::new(empty_table)); @@ -538,7 +548,7 @@ mod tests { assert_eq!(fragments.len(), 3); let mut root_fragment = None; - let mut child_fragment_vec = Vec::::new(); + let mut child_fragment_vec = Vec::::new(); for (_, fragment) in fragments { assert!(fragment.root.is_some()); if fragment.root.as_ref().unwrap().children().len() == 2 { diff --git a/scheduler/src/queue.rs b/scheduler/src/queue.rs index 6fae852..30a3b38 100644 --- a/scheduler/src/queue.rs +++ b/scheduler/src/queue.rs @@ -1,8 +1,9 @@ use crate::{ - parser::{PhysicalPlanFragment, QueryFragmentId}, + parser::{QueryFragment, QueryFragmentId}, scheduler::{QueryResult, SCHEDULER_INSTANCE}, }; -use datafusion::{config::TableParquetOptions, datasource::listing::PartitionedFile}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::{config::TableParquetOptions, physical_plan::joins::HashJoinExec}; use datafusion::{ datasource::physical_plan::{ArrowExec, FileScanConfig, ParquetExec}, physical_plan::joins::HashBuildExec, @@ -11,10 +12,10 @@ use datafusion::{ use super::debug_println; use datafusion::physical_plan::ExecutionPlan; -use std::{collections::HashMap, collections::HashSet, sync::Arc, time::SystemTime}; +use std::{collections::HashMap, sync::Arc, time::SystemTime}; /// Once a Execution plan has been parsed push all the fragments that can be scheduled onto the queue. -pub async fn add_fragments_to_scheduler(mut map: HashMap) { +pub async fn add_fragments_to_scheduler(mut map: HashMap) { let mut pending_fragments = SCHEDULER_INSTANCE.pending_fragments.write().await; let mut all_fragments = SCHEDULER_INSTANCE.all_fragments.write().await; for (&id, fragment) in map.iter_mut() { @@ -36,7 +37,7 @@ pub async fn add_fragments_to_scheduler(mut map: HashMap i128 { +pub fn get_priority_from_fragment(fragment: &QueryFragment) -> i128 { let time = fragment .enqueued_time .unwrap() @@ -53,8 +54,8 @@ pub fn get_priority_from_fragment(fragment: &PhysicalPlanFragment) -> i128 { priority + cost_offset * 20 } -/// Get the plan with the highest priorty from the queue -pub async fn get_plan_from_queue() -> Option { +/// Get the plan with the highest priorty from the queue. +pub async fn get_plan_from_queue() -> Option { let mut pending_fragments = SCHEDULER_INSTANCE.pending_fragments.write().await; let all_fragments = SCHEDULER_INSTANCE.all_fragments.read().await; @@ -71,7 +72,7 @@ pub async fn get_plan_from_queue() -> Option { if ref_id.is_none() { return None; } else { - let frag = all_fragments.get(&ref_id.unwrap()).unwrap(); + let _frag = all_fragments.get(&ref_id.unwrap()).unwrap(); pending_fragments.retain(|x| *x != ref_id.unwrap()); } @@ -106,6 +107,24 @@ pub fn update_plan_parent( if i != path[0] { new_children.push(child); } else { + if path.len() == 2 { + if let Some(node) = child.as_any().downcast_ref::() { + let _probe_side = node.right().clone(); + // return Arc::new( + // HashProbeExec::try_new( + // probe_side, + // node.on, + // node.filter, + // &node.join_type, + // node.projection, + // node.mode, + // node.null_equals_null, + // //node.join_data, + // ) + // .unwrap(), + // ); + } + } new_children.push(update_plan_parent(child, &path[1..], query_result)); } i += 1; @@ -113,8 +132,10 @@ pub fn update_plan_parent( root.with_new_children(new_children).unwrap() } +/// Marks the completion of the execution of the query fragment with +/// `fragment_id` with result `fragment_result`. pub async fn finish_fragment( - child_fragment_id: QueryFragmentId, + fragment_id: QueryFragmentId, fragment_result: QueryResult, intermediate_files: Vec>, ) -> Vec { @@ -122,20 +143,20 @@ pub async fn finish_fragment( let mut all_fragments = SCHEDULER_INSTANCE.all_fragments.write().await; let mut intermediate_file_pin = SCHEDULER_INSTANCE.intermediate_files.write().await; let parent_fragment_ids = all_fragments - .get(&child_fragment_id) + .get(&fragment_id) .unwrap() .parent_fragments .clone(); let parent_fragment_paths = all_fragments - .get(&child_fragment_id) + .get(&fragment_id) .unwrap() .parent_path_from_root .clone(); // these intermediate files belongs to the child_fragment which has been processed and should be deleted let child_fragment_intermediate_files = all_fragments - .get(&child_fragment_id) + .get(&fragment_id) .unwrap() .intermediate_files .clone() @@ -185,13 +206,13 @@ pub async fn finish_fragment( parent_fragment .child_fragments - .retain(|x| *x != child_fragment_id); + .retain(|x| *x != fragment_id); if parent_fragment.child_fragments.is_empty() { parent_fragment.enqueued_time = Some(SystemTime::now()); new_ids_to_push.push(id); } } - all_fragments.remove(&child_fragment_id); + all_fragments.remove(&fragment_id); pending_fragments.extend(new_ids_to_push); debug_println!( "Updated finished fragments, {} left available to be executed in pending queue", @@ -237,6 +258,7 @@ mod tests { use more_asserts as ma; use std::collections::HashMap; + use std::collections::HashSet; use std::sync::Arc; async fn create_physical_plan(logical_plan: LogicalPlan) -> Result> { @@ -306,7 +328,7 @@ mod tests { validate_toy_physical_plan_structure(&physical_plan); // Returns a hash map from query fragment ID to physical plan fragment structs - let fragment = PhysicalPlanFragment { + let fragment = QueryFragment { fragment_id: 0, query_id: 0, root: Some(physical_plan), @@ -318,7 +340,7 @@ mod tests { fragment_cost: None, intermediate_files: HashSet::::new(), }; - let mut map: HashMap = HashMap::new(); + let mut map: HashMap = HashMap::new(); map.insert(0, fragment); add_fragments_to_scheduler(map).await; let queued_fragment = get_plan_from_queue().await.unwrap(); @@ -397,7 +419,7 @@ mod tests { add_fragments_to_scheduler(fragment_map).await; assert_eq!(SCHEDULER_INSTANCE.pending_fragments.read().await.len(), 2); - let mut child_fragment_vec = Vec::::new(); + let mut child_fragment_vec = Vec::::new(); let mut queued_fragment = get_plan_from_queue().await.unwrap(); assert!(queued_fragment.root.is_some()); diff --git a/scheduler/src/scheduler.rs b/scheduler/src/scheduler.rs index 964ff37..8d7a79a 100644 --- a/scheduler/src/scheduler.rs +++ b/scheduler/src/scheduler.rs @@ -1,4 +1,4 @@ -use crate::parser::{parse_into_fragments_wrapper, PhysicalPlanFragment, QueryFragmentId}; +use crate::parser::{parse_into_fragments_wrapper, QueryFragment, QueryFragmentId}; use crate::queue::{add_fragments_to_scheduler, finish_fragment}; use crate::scheduler_interface::*; use datafusion::datasource::listing::PartitionedFile; @@ -11,27 +11,28 @@ use lazy_static::lazy_static; use tokio::sync::RwLock; use std::collections::HashMap; -use std::path::Path; + use std::sync::atomic::{AtomicU64, Ordering}; use std::time::SystemTime; static QUERY_ID_GENERATOR: AtomicU64 = AtomicU64::new(0); -use super::debug_println; use std::sync::Arc; use tokio::sync::mpsc::Sender; -#[derive(Debug)] -struct ExecutorHandle { - port: i32, -} - +/// The scheduler instance. +/// +/// Stores the metadata needed for query scheduling. #[derive(Debug)] pub struct Scheduler { - pub all_fragments: RwLock>, + /// Map from query fragment id to fragment. + pub all_fragments: RwLock>, + + /// Query fragments pending execution. pub pending_fragments: RwLock>, + pub job_status: RwLock>>>, + pub intermediate_files: RwLock>, - executors: RwLock>, } pub enum PipelineBreakers { @@ -86,6 +87,8 @@ impl Scheduler { } } + /// Marks the completion of the execution of the query fragment with + /// `fragment_id` with result `fragment_result`. pub async fn finish_fragment( &self, child_fragment_id: QueryFragmentId, @@ -105,12 +108,7 @@ impl Scheduler { pub fn parse_physical_plan(&self, _physical_plan: &dyn ExecutionPlan) {} - pub async fn register_executor(&self, port: i32) { - self.executors.write().await.push(ExecutorHandle { port }); - debug_println!("Executor registered; port={port}"); - } - - pub async fn get_plan_from_queue(&self) -> Option { + pub async fn get_plan_from_queue(&self) -> Option { crate::queue::get_plan_from_queue().await } } @@ -120,7 +118,6 @@ lazy_static! { all_fragments: RwLock::new(HashMap::new()), pending_fragments: RwLock::new(vec![]), job_status: RwLock::new(HashMap::>>::new()), - executors: RwLock::new(vec![]), intermediate_files: RwLock::new(HashMap::::new()), }; } diff --git a/scheduler/src/proto_server.rs b/scheduler/src/scheduler_service.rs similarity index 83% rename from scheduler/src/proto_server.rs rename to scheduler/src/scheduler_service.rs index bd0d4ec..16de41e 100644 --- a/scheduler/src/proto_server.rs +++ b/scheduler/src/scheduler_service.rs @@ -10,24 +10,26 @@ use datafusion::execution::context::SessionContext; use datafusion_proto::physical_plan::from_proto; use datafusion_proto::protobuf::FileScanExecConf; -use lib::scheduler::SCHEDULER_INSTANCE; -use lib::scheduler_interface::scheduler_service_server::SchedulerService; -use lib::scheduler_interface::scheduler_service_server::SchedulerServiceServer; -use lib::scheduler_interface::*; +use chronos::scheduler::SCHEDULER_INSTANCE; +use chronos::scheduler_interface::scheduler_server::Scheduler; +use chronos::scheduler_interface::scheduler_server::SchedulerServer; +use chronos::scheduler_interface::*; use bytes::IntoBuf; use tokio::sync::mpsc; +use std::time::UNIX_EPOCH; + #[derive(Debug, Default)] pub struct MyScheduler {} #[tonic::async_trait] -impl SchedulerService for MyScheduler { +impl Scheduler for MyScheduler { async fn get_query( &self, _request: Request, ) -> Result, Status> { - let plan = lib::scheduler::SCHEDULER_INSTANCE + let plan = chronos::scheduler::SCHEDULER_INSTANCE .get_plan_from_queue() .await; @@ -42,17 +44,19 @@ impl SchedulerService for MyScheduler { fragment_id: i32::try_from(p.fragment_id).unwrap(), physical_plan: p_bytes.to_vec(), root: p.parent_fragments.is_empty(), + hash_build_data_info: vec![], }; Ok(Response::new(reply)) } Err(e) => { println!("{:?}", e); - // lib::queue::kill_query(p.query_id); TODO + // chronos::queue::kill_query(p.query_id); TODO let reply = GetQueryRet { query_id: -1, fragment_id: -1, physical_plan: vec![], root: true, // setting this to true frees the CLI on the tokio channel, will do for now + hash_build_data_info: vec![], }; Ok(Response::new(reply)) } @@ -64,6 +68,7 @@ impl SchedulerService for MyScheduler { fragment_id: -1, physical_plan: vec![], root: false, + hash_build_data_info: vec![], }; Ok(Response::new(reply)) } @@ -88,12 +93,10 @@ impl SchedulerService for MyScheduler { return Err(status); } - let sched_info = lib::scheduler::SCHEDULER_INSTANCE + let sched_info = chronos::scheduler::SCHEDULER_INSTANCE .schedule_query(physical_plan, metadata.unwrap(), false) .await; - let _finish_time = std::time::SystemTime::now(); // TODO: send this back over rpc as well, figure out proto type - let (tx, mut rx) = mpsc::channel::>(1); { @@ -107,6 +110,13 @@ impl SchedulerService for MyScheduler { let reply = ScheduleQueryRet { query_id: sched_info.query_id, file_scan_config: rx.recv().await.unwrap_or_default(), + enqueue_time: sched_info + .enqueue_time + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(), }; Ok(Response::new(reply)) } @@ -143,10 +153,10 @@ impl SchedulerService for MyScheduler { return Err(status); } - let to_delete = lib::scheduler::SCHEDULER_INSTANCE + let to_delete = chronos::scheduler::SCHEDULER_INSTANCE .finish_fragment( fragment_id.try_into().unwrap(), - lib::scheduler::QueryResult::ParquetExec(file_scan_conf.clone()), + chronos::scheduler::QueryResult::ParquetExec(file_scan_conf.clone()), file_scan_conf.file_groups, ) .await; @@ -168,19 +178,6 @@ impl SchedulerService for MyScheduler { }; Ok(Response::new(reply)) } - - async fn register_executor( - &self, - request: Request, - ) -> Result, Status> { - let request_content = request.into_inner(); - let port = request_content.port; - - SCHEDULER_INSTANCE.register_executor(port).await; - - let reply = RegisterExecutorRet {}; - Ok(Response::new(reply)) - } } async fn server() -> Result<(), Box> { @@ -190,7 +187,7 @@ async fn server() -> Result<(), Box> { println!("Scheduler server listening on {addr}"); Server::builder() - .add_service(SchedulerServiceServer::new(scheduler)) + .add_service(SchedulerServer::new(scheduler)) .serve(addr) .await?;