Skip to content

Commit

Permalink
chore: remove latest epoch id and add some todo documentation comments (
Browse files Browse the repository at this point in the history
  • Loading branch information
lanlou1554 authored Nov 9, 2024
1 parent 592b283 commit 9c98703
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 32 deletions.
1 change: 1 addition & 0 deletions optd-persistent/src/cost_model/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct MockTrigger {
pub function: String,
}

/// TODO: documentation
#[derive(Default)]
pub struct MockCatalog {
pub databases: Vec<MockDatabaseMetadata>,
Expand Down
16 changes: 10 additions & 6 deletions optd-persistent/src/cost_model/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,51 @@ use sea_orm_migration::prelude::*;
use serde_json::json;
use std::sync::Arc;

/// TODO: documentation
pub enum CatalogSource {
Iceberg(),
Mock,
}

/// TODO: documentation
pub enum AttrType {
Integer,
Float,
Varchar,
Boolean,
}

/// TODO: documentation
pub enum IndexType {
BTree,
Hash,
}

/// TODO: documentation
pub enum ConstraintType {
PrimaryKey,
ForeignKey,
Unique,
Check,
}

/// TODO: documentation
pub enum StatType {
Count,
Cardinality,
Min,
Max,
}

/// TODO: documentation
#[derive(PartialEq)]
pub enum EpochOption {
// TODO(lanlou): Could I make i32 -> EpochId?
Existed(i32),
New(String, String),
}

/// TODO: documentation
#[derive(Clone)]
pub struct Stat {
pub stat_type: i32,
Expand All @@ -58,6 +65,7 @@ pub struct Stat {
pub name: String,
}

/// TODO: documentation
#[trait_variant::make(Send)]
pub trait CostModelStorageLayer {
type GroupId;
Expand All @@ -68,11 +76,7 @@ pub trait CostModelStorageLayer {
type StatId;

// TODO: Change EpochId to event::Model::epoch_id
async fn create_new_epoch(
&mut self,
source: String,
data: String,
) -> StorageResult<Self::EpochId>;
async fn create_new_epoch(&self, source: String, data: String) -> StorageResult<Self::EpochId>;

async fn update_stats_from_catalog(
&self,
Expand All @@ -81,7 +85,7 @@ pub trait CostModelStorageLayer {
) -> StorageResult<()>;

async fn update_stats(
&mut self,
&self,
stat: Stat,
epoch_option: EpochOption,
) -> StorageResult<Option<Self::EpochId>>;
Expand Down
33 changes: 11 additions & 22 deletions optd-persistent/src/cost_model/orm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,19 @@ impl CostModelStorageLayer for BackendManager {
type EpochId = i32;
type StatId = i32;

async fn create_new_epoch(
&mut self,
source: String,
data: String,
) -> StorageResult<Self::EpochId> {
/// TODO: documentation
async fn create_new_epoch(&self, source: String, data: String) -> StorageResult<Self::EpochId> {
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)),
..Default::default()
};
let insert_res = Event::insert(new_event).exec(&self.db).await?;
self.latest_epoch_id.store(
insert_res.last_insert_id as usize,
std::sync::atomic::Ordering::Relaxed,
);
Ok(insert_res.last_insert_id)
}

/// TODO: documentation
async fn update_stats_from_catalog(
&self,
c: CatalogSource,
Expand Down Expand Up @@ -154,6 +148,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: improve the documentation
/* Update the statistics in the database.
* The statistic can be newly inserted or updated. If the statistic value
* is the same as the latest existing one, the update will be ignored, and
Expand All @@ -166,14 +161,14 @@ impl CostModelStorageLayer for BackendManager {
* If the statistic value is the same as the latest existing one, this function
* won't create a new epoch.
*
* For batch updates, if the caller can directly call this function with
* For batch updates, the caller can directly call this function with
* New epoch option at the first time, and if the epoch_id is returned, the
* caller can use the returned epoch_id for the rest of the updates.
* But if the epoch_id is not returned, the caller should continue using
* the New epoch option for the next statistic update.
*/
async fn update_stats(
&mut self,
&self,
stat: Stat,
epoch_option: EpochOption,
) -> StorageResult<Option<Self::EpochId>> {
Expand Down Expand Up @@ -270,11 +265,9 @@ impl CostModelStorageLayer for BackendManager {
}
};
// 1. Insert into attr_stats and related junction tables.
let mut insert_new_epoch = false;
let epoch_id = match epoch_option {
EpochOption::Existed(e) => e,
EpochOption::New(source, data) => {
insert_new_epoch = true;
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
Expand Down Expand Up @@ -317,19 +310,11 @@ impl CostModelStorageLayer for BackendManager {
.exec(&transaction)
.await?;

// TODO(lanlou): consider the update conflict for latest_epoch_id in multiple threads
// Assume the txn fails to commit, and the epoch_id is updated. But the epoch_id
// is always increasing and won't be overwritten even if the record is deleted, it
// might be fine.
if insert_new_epoch {
self.latest_epoch_id
.store(epoch_id as usize, std::sync::atomic::Ordering::Relaxed);
}

transaction.commit().await?;
Ok(Some(epoch_id))
}

/// TODO: documentation
async fn store_expr_stats_mappings(
&self,
expr_id: Self::ExprId,
Expand All @@ -350,6 +335,7 @@ impl CostModelStorageLayer for BackendManager {
Ok(())
}

/// TODO: documentation
async fn get_stats_for_table(
&self,
table_id: i32,
Expand Down Expand Up @@ -377,6 +363,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: documentation
async fn get_stats_for_attr(
&self,
mut attr_ids: Vec<Self::AttrId>,
Expand Down Expand Up @@ -414,6 +401,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: documentation
async fn get_cost_analysis(
&self,
expr_id: Self::ExprId,
Expand All @@ -440,6 +428,7 @@ impl CostModelStorageLayer for BackendManager {
Ok(cost.map(|c| c.cost))
}

/// TODO: documentation
async fn store_cost(
&self,
physical_expression_id: Self::ExprId,
Expand Down
6 changes: 2 additions & 4 deletions optd-persistent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use std::{cell::LazyCell, sync::atomic::AtomicUsize};
use std::cell::LazyCell;

use sea_orm::*;
use sea_orm_migration::prelude::*;
Expand Down Expand Up @@ -47,7 +47,7 @@ pub enum CostModelError {
pub enum BackendError {
CostModel(CostModelError),
Database(DbErr),
// Add other variants as needed for different error types
// TODO: Add other variants as needed for different error types
}

impl From<CostModelError> for BackendError {
Expand All @@ -64,15 +64,13 @@ impl From<DbErr> for BackendError {

pub struct BackendManager {
db: DatabaseConnection,
latest_epoch_id: AtomicUsize,
}

impl BackendManager {
/// Creates a new `BackendManager`.
pub async fn new(database_url: Option<&str>) -> StorageResult<Self> {
Ok(Self {
db: Database::connect(database_url.unwrap_or(DATABASE_URL)).await?,
latest_epoch_id: AtomicUsize::new(0),
})
}
}
Expand Down

0 comments on commit 9c98703

Please sign in to comment.