Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Add infra for hash join build and probe phase #25

Merged
merged 15 commits into from
Apr 30, 2024
Merged
2 changes: 1 addition & 1 deletion arrow-datafusion
16 changes: 10 additions & 6 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "lib"
name = "chronos"
path = "src/lib.rs"

[[bin]]
name = "scheduler-api-server"
path = "src/proto_server.rs"
name = "scheduler-service"
path = "src/scheduler_service.rs"

[[bin]]
name = "executor"
name = "executor-service"
path = "src/executor.rs"

[[bin]]
name = "cli"
name = "scheduler-cli"
path = "src/cli.rs"

[[bin]]
Expand All @@ -41,6 +41,10 @@ futures = "0.3.30"
anyhow = "1.0.82"
clap = "4.5.4"
bytes = "0.4.12"
ahash = "0.8.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"


[build-dependencies]
tonic-build = "0.11"
tonic-build = "0.11"
1 change: 1 addition & 0 deletions scheduler/proto/executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package executor_interface;

// Arguments for the ExecuteQuery RPC.
message ExecuteQueryArgs {
// The query fragment identifier.
int32 fragment_id = 1;
Expand Down
25 changes: 11 additions & 14 deletions scheduler/proto/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ message ScheduleQueryRet {
// The identifier to refer to this query execution.
int32 query_id = 1;


// The path info for intermediate files
bytes file_scan_config = 2;

// The status of query execution.
//QueryStatus status = 3;

// TODO
// enqueue_time
// The time the query was scheduled for execution in miliseconds since
// the Unix epoch.
uint64 enqueue_time = 3;

// TODO
// finish_time
Expand Down Expand Up @@ -83,13 +83,14 @@ message QueryExecutionDoneArgs {

// The path info for intermediate files
bytes file_scan_config = 3;

// Whether the file is root
bool root = 4;

// Query id of the whole plan
int32 query_id = 5;

bool generated_hash_table = 6;
}

// Information returned from the QueryExecutionDone RPC.
Expand All @@ -100,16 +101,13 @@ message QueryExecutionDoneRet {

}

message RegisterExecutorArgs {
int32 port = 1;
}

message RegisterExecutorRet {
message GetQueryArgs {

}

message GetQueryArgs {

message HashBuildDataInfo {
bytes path_from_parent = 1;
int32 build_fragment_id = 2;
}

message GetQueryRet {
Expand All @@ -121,19 +119,18 @@ message GetQueryRet {

bool root = 4;

repeated HashBuildDataInfo hash_build_data_info = 5;
}

// The scheduler interface.
service SchedulerService {
service Scheduler {

// Used by the optimizer to schedule a new query.
rpc ScheduleQuery(ScheduleQueryArgs) returns (ScheduleQueryRet);

// Used by the optimizer to query the status of a job.
rpc QueryJobStatus(QueryJobStatusArgs) returns (QueryJobStatusRet);

rpc RegisterExecutor(RegisterExecutorArgs) returns (RegisterExecutorRet);

rpc GetQuery(GetQueryArgs) returns (GetQueryRet);

// Used by the execution engine to notify the scheduler that the execution
Expand Down
12 changes: 6 additions & 6 deletions scheduler/src/bench.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use chronos::integration::scan_from_parquet;
use chronos::scheduler_interface::scheduler_client::SchedulerClient;
use datafusion_proto::protobuf::FileScanExecConf;
use lib::integration::scan_from_parquet;
use lib::scheduler_interface::scheduler_service_client::SchedulerServiceClient;

use bytes::IntoBuf;

use chronos::debug_println;
use chronos::scheduler_interface::{QueryInfo, ScheduleQueryArgs};
use datafusion::physical_plan;
use datafusion::prelude::*;
use datafusion_proto::bytes::physical_plan_to_bytes;
use datafusion_proto::physical_plan::from_proto;
use futures::stream::TryStreamExt;
use lib::debug_println;
use lib::scheduler_interface::{QueryInfo, ScheduleQueryArgs};
use prost::Message;

use std::time::Instant;
Expand Down Expand Up @@ -52,8 +52,8 @@ async fn run_and_time_bench(query_num: u32, num_runs: u32) -> f64 {
panic!("Scheduler port environment variable not set");
});
let uri = format!("http://[::1]:{scheduler_service_port}");
let mut client: SchedulerServiceClient<tonic::transport::Channel> =
SchedulerServiceClient::connect(uri.clone())
let mut client: SchedulerClient<tonic::transport::Channel> =
SchedulerClient::connect(uri.clone())
.await
.unwrap_or_else(|error| {
panic!("Unable to connect to the scheduler instance: {:?}", error);
Expand Down
10 changes: 5 additions & 5 deletions scheduler/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use chronos::integration::scan_from_parquet;
use chronos::scheduler_interface::scheduler_client::SchedulerClient;
use datafusion_proto::protobuf::FileScanExecConf;
use lib::integration::scan_from_parquet;
use lib::scheduler_interface::scheduler_service_client::SchedulerServiceClient;

use bytes::IntoBuf;
use chronos::debug_println;
use chronos::scheduler_interface::{QueryInfo, ScheduleQueryArgs};
use datafusion::arrow::{array::RecordBatch, util::pretty};
use datafusion::physical_plan;
use datafusion::prelude::*;
use datafusion_proto::bytes::physical_plan_to_bytes;
use datafusion_proto::physical_plan::from_proto;
use futures::stream::TryStreamExt;
use lib::debug_println;
use lib::scheduler_interface::{QueryInfo, ScheduleQueryArgs};
use prost::Message;
use std::io::{self, Write};

Expand Down Expand Up @@ -47,7 +47,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
panic!("Scheduler port environment variable not set");
});
let uri = format!("http://[::1]:{scheduler_service_port}");
let mut client = SchedulerServiceClient::connect(uri.clone())
let mut client = SchedulerClient::connect(uri.clone())
.await
.unwrap_or_else(|error| {
panic!("Unable to connect to the scheduler instance: {:?}", error);
Expand Down
Loading
Loading