Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Remove global lock on queue #2

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/bin/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ async fn main() {

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");
const POLL_INTERVAL: Duration = Duration::from_millis(100);

// creates server, executors, and the frontend
Expand Down Expand Up @@ -165,7 +164,6 @@ pub async fn run_single_query(
drop(frontend_lock);
return Ok(());
}
unreachable!();
}

async fn interactive_mode() {
Expand Down
3 changes: 1 addition & 2 deletions src/executor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@

use crate::composable_database::scheduler_api_client::SchedulerApiClient;
use crate::composable_database::QueryStatus::InProgress;
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId};
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus};
use crate::frontend::JobInfo;
use crate::intermediate_results::{insert_results, rewrite_query, TaskKey};
use crate::mock_catalog::load_catalog;
use crate::mock_executor::MockExecutor;
use chrono::Utc;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::bytes::physical_plan_from_bytes;
use std::path::Path;
use std::path::PathBuf;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl MockFrontend {
let existing_value = self.jobs.insert(
query_id,
JobInfo {
query_id: query_id,
query_id,
submitted_at: Utc::now(),
sql_string: sql_string.to_string(),
result: None,
Expand Down Expand Up @@ -280,7 +280,7 @@ impl MockFrontend {
// eprintln!("Polling!");
assert!(self.scheduler_api_client.is_some());

let mut client = self.scheduler_api_client.as_mut().unwrap();
let client = self.scheduler_api_client.as_mut().unwrap();

let keys: Vec<u64> = self.jobs.keys().cloned().collect();
for query_id in keys {
Expand Down
12 changes: 5 additions & 7 deletions src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ use crate::server::SchedulerService;
use datafusion::arrow::array::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{col, Expr};
use datafusion::prelude::{concat, SessionContext};
use serde::{Deserialize, Serialize};
use datafusion::prelude::SessionContext;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Server;

pub struct IntegrationTest {
catalog_path: String,
config_path: String,
ctx: Arc<SessionContext>,
config: Config,
pub frontend: Arc<Mutex<MockFrontend>>,
}

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
pub const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");

Expand All @@ -37,7 +34,6 @@ impl IntegrationTest {
ctx,
config,
catalog_path,
config_path,
frontend: Arc::new(Mutex::new(frontend)),
}
}
Expand Down Expand Up @@ -195,20 +191,22 @@ impl IntegrationTest {
mod tests {
use crate::integration_test::IntegrationTest;
use crate::parser::ExecutionPlanParser;
// use crate::CATALOG_PATH;
use super::*;
use datafusion::arrow::array::{Int32Array, RecordBatch};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;

const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");

async fn initialize_integration_test() -> IntegrationTest {
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
let config_path = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
let config_path = CONFIG_PATH;
IntegrationTest::new(catalog_path.to_string(), config_path.to_string()).await
}

#[allow(dead_code)]
pub async fn get_all_tpch_queries_test() -> Vec<String> {
let parser = ExecutionPlanParser::new(CATALOG_PATH).await;
let mut res = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/mock_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::mock_catalog::load_catalog;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::execution::context::SessionContext;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
Expand Down
4 changes: 1 addition & 3 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
use crate::mock_catalog::load_catalog;
use datafusion::{
arrow::{
array::{RecordBatch, RecordBatchReader},
array::RecordBatch,
ipc::{reader::FileReader, writer::FileWriter},
},
error::{DataFusionError, Result},
execution::context::SessionContext,
physical_plan::ExecutionPlan,
physical_planner::PhysicalPlanner,
};
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
use futures::TryFutureExt;
use sqlparser::{dialect::GenericDialect, parser::Parser};
use std::{fmt, io::Cursor, sync::Arc};
use tokio::{fs::File, io::AsyncReadExt};
Expand Down
27 changes: 17 additions & 10 deletions src/query_graph.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#![allow(dead_code)]
use crate::composable_database::{QueryStatus, TaskId};
use crate::task::{Task, TaskStatus};
use crate::composable_database::QueryStatus;
use crate::task::Task;
use crate::task_queue::TaskQueue;
use datafusion::arrow::datatypes::Schema;
use datafusion::common::JoinSide;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::joins::{
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
};
use datafusion::physical_plan::limit::GlobalLimitExec;
// use datafusion::physical_plan::joins::{
// use datafusion::physical_plan::aggregates::AggregateExec;
// CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
// };
// use datafusion::physical_plan::limit::GlobalLimitExec;
// use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

// TODO Change to Waiting, Ready, Running(vec[taskid]), Finished(vec[locations?])
#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -48,10 +48,11 @@ pub struct QueryGraph {
tid_counter: AtomicU64, // TODO: add mutex to stages and make elements pointers to avoid copying
pub stages: Vec<QueryStage>, // Can be a vec since all stages in a query are enumerated from 0.
task_queue: TaskQueue, // Ready tasks in this graph
pub time: Duration,
}

impl QueryGraph {
pub async fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
pub fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
// Build stages.
let mut builder = GraphBuilder::new();
let stages = builder.build(plan.clone());
Expand All @@ -63,6 +64,7 @@ impl QueryGraph {
tid_counter: AtomicU64::new(0),
stages,
task_queue: TaskQueue::new(),
time: Duration::new(0, 0),
};

// Build tasks for leaf stages.
Expand Down Expand Up @@ -121,6 +123,7 @@ impl QueryGraph {
let outputs = stage.outputs.clone();

if outputs.is_empty() {
println!("QueryGraph::update_stage_status: Query {} is done.", self.query_id);
self.status = QueryStatus::Done;
return Ok(());
}
Expand Down Expand Up @@ -161,6 +164,10 @@ impl QueryGraph {
}

// fn build_tasks(&mut self)
pub fn get_plan(&self, stage_id: u64) -> Arc<dyn ExecutionPlan> {
let plan = self.stages[stage_id as usize].plan.clone();
plan
}
}

#[derive(Clone, Debug)]
Expand Down
Loading
Loading