diff --git a/optd-persistent/src/bin/migrate_test.rs b/optd-persistent/src/bin/migrate_test.rs new file mode 100644 index 0000000..f4c9001 --- /dev/null +++ b/optd-persistent/src/bin/migrate_test.rs @@ -0,0 +1,23 @@ +use optd_persistent::{migrate, TEST_DATABASE_URL}; +use sea_orm::*; +use sea_orm_migration::prelude::*; + +#[tokio::main] +async fn main() { + let _ = std::fs::remove_file(TEST_DATABASE_URL); + + let db = Database::connect(TEST_DATABASE_URL) + .await + .expect("Unable to connect to the database"); + + migrate(&db) + .await + .expect("Something went wrong during migration"); + + db.execute(sea_orm::Statement::from_string( + sea_orm::DatabaseBackend::Sqlite, + "PRAGMA foreign_keys = ON;".to_owned(), + )) + .await + .expect("Unable to enable foreign keys"); +} diff --git a/optd-persistent/src/cost_model_orm_manager_impl.rs b/optd-persistent/src/cost_model_orm_manager_impl.rs new file mode 100644 index 0000000..c181467 --- /dev/null +++ b/optd-persistent/src/cost_model_orm_manager_impl.rs @@ -0,0 +1,82 @@ +#![allow(dead_code, unused_imports, unused_variables)] + +use crate::cost_model_storage_layer::CostModelStorageLayer; +use crate::orm_manager::ORMManager; + +impl CostModelStorageLayer for ORMManager { + async fn create_new_epoch( + &mut self, + source: String, + data: String, + ) -> crate::StorageResult { + todo!() + } + + async fn update_stats_from_catalog( + &self, + c: crate::cost_model_storage_layer::CatalogSource, + epoch_id: crate::EpochId, + ) -> crate::StorageResult<()> { + todo!() + } + + async fn update_stats(&self, stats: i32, epoch_id: crate::EpochId) -> crate::StorageResult<()> { + todo!() + } + + async fn store_cost( + &self, + expr_id: crate::ExprId, + cost: i32, + epoch_id: crate::EpochId, + ) -> crate::StorageResult<()> { + todo!() + } + + async fn store_expr_stats_mappings( + &self, + expr_id: crate::ExprId, + stat_ids: Vec, + ) -> crate::StorageResult<()> { + todo!() + } + + async fn get_stats_for_table( + &self, + table_id: i32, + stat_type: i32, + epoch_id: Option, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_stats_for_attr( + &self, + attr_id: i32, + stat_type: i32, + epoch_id: Option, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_stats_for_attrs( + &self, + attr_ids: Vec, + stat_type: i32, + epoch_id: Option, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_cost_analysis( + &self, + expr_id: crate::ExprId, + epoch_id: crate::EpochId, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_cost(&self, expr_id: crate::ExprId) -> crate::StorageResult> { + todo!() + } +} diff --git a/optd-persistent/src/cost_model_storage_layer.rs b/optd-persistent/src/cost_model_storage_layer.rs new file mode 100644 index 0000000..0dc5ae2 --- /dev/null +++ b/optd-persistent/src/cost_model_storage_layer.rs @@ -0,0 +1,75 @@ +#![allow(dead_code, unused_imports)] + +use crate::entities::cascades_group; +use crate::entities::event::Model as event_model; +use crate::entities::logical_expression; +use crate::entities::physical_expression; +use crate::{EpochId, ExprId, StatId, StorageResult}; +use sea_orm::*; +use sea_orm_migration::prelude::*; +use serde_json::json; +use std::sync::Arc; + +pub enum CatalogSource { + Iceberg(), +} + +pub trait CostModelStorageLayer { + // TODO: Change EpochId to event::Model::epoch_id + async fn create_new_epoch(&mut self, source: String, data: String) -> StorageResult; + + async fn update_stats_from_catalog( + &self, + c: CatalogSource, + epoch_id: EpochId, + ) -> StorageResult<()>; + + // i32 in `stats:i32` is a placeholder for the stats type + async fn update_stats(&self, stats: i32, epoch_id: EpochId) -> StorageResult<()>; + + async fn store_cost(&self, expr_id: ExprId, cost: i32, epoch_id: EpochId) -> StorageResult<()>; + + async fn store_expr_stats_mappings( + &self, + expr_id: ExprId, + stat_ids: Vec, + ) -> StorageResult<()>; + + /// Get the statistics for a given table. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_table( + &self, + table_id: i32, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult>; + + /// Get the statistics for a given attribute. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_attr( + &self, + attr_id: i32, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult>; + + /// Get the joint statistics for a list of attributes. + /// + /// If `epoch_id` is None, it will return the latest statistics. + async fn get_stats_for_attrs( + &self, + attr_ids: Vec, + stat_type: i32, + epoch_id: Option, + ) -> StorageResult>; + + async fn get_cost_analysis( + &self, + expr_id: ExprId, + epoch_id: EpochId, + ) -> StorageResult>; + + async fn get_cost(&self, expr_id: ExprId) -> StorageResult>; +} diff --git a/optd-persistent/src/entities/prelude.rs b/optd-persistent/src/entities/prelude.rs index 0d36cf0..5c62836 100644 --- a/optd-persistent/src/entities/prelude.rs +++ b/optd-persistent/src/entities/prelude.rs @@ -1,4 +1,5 @@ //! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 +#![allow(dead_code, unused_imports, unused_variables)] pub use super::attribute::Entity as Attribute; pub use super::attribute_constraint_junction::Entity as AttributeConstraintJunction; diff --git a/optd-persistent/src/lib.rs b/optd-persistent/src/lib.rs index c11bafa..e63da11 100644 --- a/optd-persistent/src/lib.rs +++ b/optd-persistent/src/lib.rs @@ -1,11 +1,26 @@ use sea_orm::*; use sea_orm_migration::prelude::*; +mod cost_model_orm_manager_impl; +mod cost_model_storage_layer; +mod entities; +mod memo_orm_manager_impl; +mod memo_storage_layer; mod migrator; +mod orm_manager; + use migrator::Migrator; +pub type GroupId = i32; +pub type ExprId = i32; +pub type EpochId = i32; +pub type StatId = i32; + +pub type StorageResult = Result; + pub const DATABASE_URL: &str = "sqlite:./sqlite.db?mode=rwc"; pub const DATABASE_FILE: &str = "./sqlite.db"; +pub const TEST_DATABASE_URL: &str = "sqlite:./test.db?mode=rwc"; pub async fn migrate(db: &DatabaseConnection) -> Result<(), DbErr> { Migrator::refresh(db).await diff --git a/optd-persistent/src/memo_orm_manager_impl.rs b/optd-persistent/src/memo_orm_manager_impl.rs new file mode 100644 index 0000000..ef8edce --- /dev/null +++ b/optd-persistent/src/memo_orm_manager_impl.rs @@ -0,0 +1,86 @@ +#![allow(dead_code, unused_imports, unused_variables)] + +use crate::memo_storage_layer::MemoStorageLayer; +use crate::orm_manager::ORMManager; + +impl MemoStorageLayer for ORMManager { + async fn get_group_winner_from_group_id( + &self, + group_id: i32, + ) -> crate::StorageResult> { + todo!() + } + + async fn add_new_expr( + &mut self, + expr: crate::memo_storage_layer::Expression, + ) -> crate::StorageResult<(crate::GroupId, crate::ExprId)> { + todo!() + } + + async fn add_expr_to_group( + &mut self, + expr: crate::memo_storage_layer::Expression, + group_id: crate::GroupId, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_group_id(&self, expr_id: crate::ExprId) -> crate::StorageResult { + todo!() + } + + async fn get_expr_memoed( + &self, + expr_id: crate::ExprId, + ) -> crate::StorageResult { + todo!() + } + + async fn get_all_group_ids(&self) -> crate::StorageResult> { + todo!() + } + + async fn get_group( + &self, + group_id: crate::GroupId, + ) -> crate::StorageResult { + todo!() + } + + async fn update_group_winner( + &mut self, + group_id: crate::GroupId, + latest_winner: Option, + ) -> crate::StorageResult<()> { + todo!() + } + + async fn get_all_exprs_in_group( + &self, + group_id: crate::GroupId, + ) -> crate::StorageResult> { + todo!() + } + + async fn get_group_info( + &self, + group_id: crate::GroupId, + ) -> crate::StorageResult<&Option> { + todo!() + } + + async fn get_predicate_binding( + &self, + group_id: crate::GroupId, + ) -> crate::StorageResult> { + todo!() + } + + async fn try_get_predicate_binding( + &self, + group_id: crate::GroupId, + ) -> crate::StorageResult> { + todo!() + } +} diff --git a/optd-persistent/src/memo_storage_layer.rs b/optd-persistent/src/memo_storage_layer.rs new file mode 100644 index 0000000..b5280f7 --- /dev/null +++ b/optd-persistent/src/memo_storage_layer.rs @@ -0,0 +1,115 @@ +#![allow(dead_code, unused_imports)] + +use crate::entities::cascades_group; +use crate::entities::event::Model as event_model; +use crate::entities::logical_expression; +use crate::entities::physical_expression; +use crate::{ExprId, GroupId, StorageResult}; +use sea_orm::*; +use sea_orm_migration::prelude::*; +use serde_json::json; +use std::sync::Arc; + +pub enum Expression { + LogicalExpression(logical_expression::Model), + PhysicalExpression(physical_expression::Model), +} + +// TODO +// A dummy WinnerInfo struct +// pub struct WinnerInfo { +// pub expr_id: ExprId, +// pub total_weighted_cost: f64, +// pub operation_weighted_cost: f64, +// pub total_cost: Cost, +// pub operation_cost: Cost, +// pub statistics: Arc, +// } +// The optd WinnerInfo struct makes everything too coupled. +pub struct WinnerInfo {} + +pub trait MemoStorageLayer { + async fn get_group_winner_from_group_id( + &self, + group_id: i32, + ) -> StorageResult>; + + /// Add an expression to the memo table. If the expression already exists, it will return the existing group id and + /// expr id. Otherwise, a new group and expr will be created. + async fn add_new_expr(&mut self, expr: Expression) -> StorageResult<(GroupId, ExprId)>; + + /// Add a new expression to an existing group. If the expression is a group, it will merge the two groups. Otherwise, + /// it will add the expression to the group. Returns the expr id if the expression is not a group. + async fn add_expr_to_group( + &mut self, + expr: Expression, + group_id: GroupId, + ) -> StorageResult>; + + /// Get the group id of an expression. + /// The group id is volatile, depending on whether the groups are merged. + async fn get_group_id(&self, expr_id: ExprId) -> StorageResult; + + /// Get the memoized representation of a node. + async fn get_expr_memoed(&self, expr_id: ExprId) -> StorageResult; + + /// Get all groups IDs in the memo table. + async fn get_all_group_ids(&self) -> StorageResult>; + + /// Get a group by ID + async fn get_group(&self, group_id: GroupId) -> StorageResult; + + /// Update the group winner. + async fn update_group_winner( + &mut self, + group_id: GroupId, + latest_winner: Option, + ) -> StorageResult<()>; + + // The below functions can be overwritten by the memo table implementation if there + // are more efficient way to retrieve the information. + + /// Get all expressions in the group. + async fn get_all_exprs_in_group(&self, group_id: GroupId) -> StorageResult>; + + /// Get winner info for a group id + async fn get_group_info(&self, group_id: GroupId) -> StorageResult<&Option>; + + // TODO: + /// Get the best group binding based on the cost + // fn get_best_group_binding( + // &self, + // group_id: GroupId, + // mut post_process: impl FnMut(Arc, GroupId, &WinnerInfo), + // ) -> Result; + // { + // // let info: &GroupInfo = this.get_group_info(group_id); + // // if let Winner::Full(info @ WinnerInfo { expr_id, .. }) = &info.winner { + // // let expr = this.get_expr_memoed(*expr_id); + // // let mut children = Vec::with_capacity(expr.children.len()); + // // for child in &expr.children { + // // children.push( + // // get_best_group_binding_inner(this, *child, post_process) + // // .with_context(|| format!("when processing expr {}", expr_id))?, + // // ); + // // } + // // let node = Arc::new(RelNode { + // // typ: expr.typ.clone(), + // // children, + // // data: expr.data.clone(), + // // }); + // // post_process(node.clone(), group_id, info); + // // return Ok(node); + // // } + // // bail!("no best group binding for group {}", group_id) + // }; + + /// Get all bindings of a predicate group. Will panic if the group contains more than one bindings. + async fn get_predicate_binding(&self, group_id: GroupId) -> StorageResult>; + + /// Get all bindings of a predicate group. Returns None if the group contains zero or more than one bindings. + async fn try_get_predicate_binding( + &self, + group_id: GroupId, + ) -> StorageResult>; +} diff --git a/optd-persistent/src/orm_manager.rs b/optd-persistent/src/orm_manager.rs new file mode 100644 index 0000000..11d0c9a --- /dev/null +++ b/optd-persistent/src/orm_manager.rs @@ -0,0 +1,24 @@ +#![allow(dead_code, unused_imports, unused_variables)] + +use sea_orm::{Database, DatabaseConnection}; + +use crate::{EpochId, DATABASE_URL}; + +pub struct ORMManager { + db_conn: DatabaseConnection, + // TODO: Change EpochId to event::Model::epoch_id + latest_epoch_id: EpochId, +} + +impl ORMManager { + pub async fn new(database_url: Option<&str>) -> Self { + let latest_epoch_id = -1; + let db_conn = Database::connect(database_url.unwrap_or(DATABASE_URL)) + .await + .unwrap(); + Self { + db_conn, + latest_epoch_id, + } + } +}