diff --git a/agdb_server/src/action.rs b/agdb_server/src/action.rs index a1f33e0a..80c996d5 100644 --- a/agdb_server/src/action.rs +++ b/agdb_server/src/action.rs @@ -22,27 +22,17 @@ pub(crate) enum ClusterAction { UserRemove(UserRemove), } -#[derive(Clone, Serialize, Deserialize)] -pub(crate) enum ClusterResponse { - None, -} - pub(crate) trait Action: Sized { - async fn exec( - self, - db: &mut ServerDb, - db_pool: &mut DbPool, - config: &Config, - ) -> ServerResult; + async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult; } -impl Action for ClusterAction { - async fn exec( +impl ClusterAction { + pub(crate) async fn exec( self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config, - ) -> ServerResult { + ) -> ServerResult<()> { match self { ClusterAction::UserAdd(action) => action.exec(db, db_pool, config).await, ClusterAction::ClusterLogin(action) => action.exec(db, db_pool, config).await, diff --git a/agdb_server/src/action/change_password.rs b/agdb_server/src/action/change_password.rs index cec422e3..7301fbb2 100644 --- a/agdb_server/src/action/change_password.rs +++ b/agdb_server/src/action/change_password.rs @@ -1,7 +1,6 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; -use crate::action::ClusterResponse; use crate::action::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -21,12 +20,12 @@ impl Action for ChangePassword { db: &mut ServerDb, _db_pool: &mut DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { let mut user = db.user(&self.user).await?; user.password = self.new_password; user.salt = self.new_salt; db.save_user(user).await?; - Ok(ClusterResponse::None) + Ok(()) } } diff --git a/agdb_server/src/action/cluster_login.rs b/agdb_server/src/action/cluster_login.rs index d383bed8..21f3a437 100644 --- a/agdb_server/src/action/cluster_login.rs +++ b/agdb_server/src/action/cluster_login.rs @@ -1,7 +1,6 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; -use crate::action::ClusterResponse; use crate::action::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -20,10 +19,10 @@ impl Action for ClusterLogin { db: &mut ServerDb, _db_pool: &mut DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { let user_id = db.user_id(&self.user).await?; db.save_token(user_id, &self.new_token).await?; - Ok(ClusterResponse::None) + Ok(()) } } diff --git a/agdb_server/src/action/user_add.rs b/agdb_server/src/action/user_add.rs index 3e21e8cf..3cba0d69 100644 --- a/agdb_server/src/action/user_add.rs +++ b/agdb_server/src/action/user_add.rs @@ -1,7 +1,6 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; -use crate::action::ClusterResponse; use crate::action::Config; use crate::server_db::ServerUser; use crate::server_error::ServerResult; @@ -22,7 +21,7 @@ impl Action for UserAdd { db: &mut ServerDb, _db_pool: &mut DbPool, _config: &Config, - ) -> ServerResult { + ) -> ServerResult { db.insert_user(ServerUser { db_id: None, username: self.user, @@ -32,6 +31,6 @@ impl Action for UserAdd { }) .await?; - Ok(ClusterResponse::None) + Ok(()) } } diff --git a/agdb_server/src/action/user_remove.rs b/agdb_server/src/action/user_remove.rs index d9e49b2b..e6dc76ae 100644 --- a/agdb_server/src/action/user_remove.rs +++ b/agdb_server/src/action/user_remove.rs @@ -1,7 +1,6 @@ use super::DbPool; use super::ServerDb; use crate::action::Action; -use crate::action::ClusterResponse; use crate::config::Config; use crate::server_error::ServerResult; use agdb::UserValue; @@ -14,15 +13,10 @@ pub(crate) struct UserRemove { } impl Action for UserRemove { - async fn exec( - self, - db: &mut ServerDb, - db_pool: &mut DbPool, - config: &Config, - ) -> ServerResult { + async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult { let dbs = db.remove_user(&self.user).await?; db_pool.remove_user_dbs(&self.user, &dbs, config).await?; - Ok(ClusterResponse::None) + Ok(()) } } diff --git a/agdb_server/src/cluster.rs b/agdb_server/src/cluster.rs index f0b67a90..6434ba10 100644 --- a/agdb_server/src/cluster.rs +++ b/agdb_server/src/cluster.rs @@ -1,6 +1,5 @@ use crate::action::Action; use crate::action::ClusterAction; -use crate::action::ClusterResponse; use crate::config::Config; use crate::db_pool::DbPool; use crate::raft; @@ -10,6 +9,7 @@ use crate::raft::Response; use crate::raft::Storage; use crate::server_db::ServerDb; use crate::server_error::ServerResult; +use agdb::DbId; use agdb::StableHash; use agdb_api::HttpClient; use agdb_api::ReqwestClient; @@ -17,7 +17,7 @@ use axum::body::Body; use axum::extract::Request as AxumRequest; use axum::response::Response as AxumResponse; use reqwest::StatusCode; -use std::collections::VecDeque; +use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; @@ -33,7 +33,7 @@ use tokio::sync::RwLock; pub(crate) type Cluster = Arc; type ClusterNode = Arc; -type ResultNotifier = tokio::sync::oneshot::Sender>; +type ResultNotifier = tokio::sync::oneshot::Sender>; type ClusterResponseReceiver = UnboundedReceiver<(Request, Response)>; pub(crate) struct ClusterNodeImpl { @@ -57,8 +57,8 @@ impl ClusterImpl { pub(crate) async fn append>( &self, action: T, - ) -> ServerResult { - let (sender, receiver) = tokio::sync::oneshot::channel::>(); + ) -> ServerResult { + let (sender, receiver) = tokio::sync::oneshot::channel::>(); let requests = self .raft .write() @@ -291,10 +291,11 @@ pub(crate) async fn start_with_shutdown( } pub(crate) struct ClusterStorage { - logs: VecDeque<(Log, Option)>, + result_notifiers: HashMap, index: u64, term: u64, - commit: Arc, + commit: u64, + executed: Arc, db: ServerDb, db_pool: DbPool, config: Config, @@ -302,80 +303,87 @@ pub(crate) struct ClusterStorage { impl ClusterStorage { async fn new(db: ServerDb, db_pool: DbPool, config: Config) -> ServerResult { - let (index, term) = db.cluster_log().await?; - let unexecuted_logs = db.logs_unexecuted().await?; + let (index, term, commit) = db.cluster_log().await?; + let logs = db.logs_unexecuted_until(commit).await?; - let storage = Self { - logs: VecDeque::new(), + let mut storage = Self { + result_notifiers: HashMap::new(), index, term, - commit: Arc::new(AtomicU64::new(index)), + commit, + executed: Arc::new(AtomicU64::new(index)), db, db_pool, config, }; - for log in unexecuted_logs { - storage.execute_log(log, None).await?; + for log in logs { + storage.execute_log(log).await?; } Ok(storage) } - async fn execute_log( - &self, - log: Log, - notifier: Option, - ) -> Result<(), crate::server_error::ServerError> { - let log_id = self.db.commit_log(&log).await?; - let commit = self.commit.clone(); + async fn execute_log(&mut self, log: Log) -> ServerResult<()> { + let log_id = log.db_id.unwrap_or_default(); + let executed = self.executed.clone(); let mut db = self.db.clone(); let mut db_pool = self.db_pool.clone(); let config = self.config.clone(); + let notifier = self.result_notifiers.remove(&log_id); tokio::spawn(async move { let result = log.data.exec(&mut db, &mut db_pool, &config).await; - commit.fetch_max(log.index, Ordering::Relaxed); - let _ = db.commit_log_executed(log_id).await; + executed.fetch_max(log.index, Ordering::Relaxed); + let _ = db.log_executed(log_id).await; if let Some(notifier) = notifier { - let _ = notifier.send(result); + let _ = notifier.send(result.map(|_| log.index)); } }); + Ok(()) } } impl Storage for ClusterStorage { - async fn append(&mut self, log: Log, notifier: Option) { - if let Some(index) = self - .logs - .iter() - .rev() - .position(|(l, _)| l.index == log.index && l.term == log.term) - { - self.logs.truncate(index); - } - + async fn append( + &mut self, + log: Log, + notifier: Option, + ) -> ServerResult<()> { + self.db.remove_unexecuted_logs_since(log.index).await?; + let log_id = self.db.append_log(&log).await?; self.index = log.index; self.term = log.term; - self.logs.push_back((log, notifier)); + + if let Some(notifier) = notifier { + self.result_notifiers.insert(log_id, notifier); + } + + Ok(()) } async fn commit(&mut self, index: u64) -> ServerResult<()> { - while let Some((log, _notifier)) = self.logs.front() { - if log.index <= index { - if let Some((log, notifier)) = self.logs.pop_front() { - self.execute_log(log, notifier).await?; - } - } else { - break; - } + for log in self.db.logs_unexecuted_until(index).await? { + self.commit = index; + self.db + .log_committed(log.db_id.expect("log should have db_id")) + .await?; + self.execute_log(log).await?; } Ok(()) } + fn log_commit(&self) -> u64 { + self.commit + } + + fn log_executed(&self) -> u64 { + self.executed.load(Ordering::Relaxed) + } + fn log_index(&self) -> u64 { self.index } @@ -384,20 +392,7 @@ impl Storage for ClusterStorage { self.term } - fn log_commit(&self) -> u64 { - self.commit.load(Ordering::Relaxed) - } - async fn logs(&self, since_index: u64) -> ServerResult>> { - let mut logs = self.db.logs(since_index).await?; - logs.extend_from_slice( - &self - .logs - .iter() - .map(|(log, _)| log) - .cloned() - .collect::>>(), - ); - Ok(logs) + self.db.logs_since(since_index).await } } diff --git a/agdb_server/src/forward.rs b/agdb_server/src/forward.rs index a76571e2..495c28a2 100644 --- a/agdb_server/src/forward.rs +++ b/agdb_server/src/forward.rs @@ -6,12 +6,14 @@ use axum::middleware::Next; use axum::response::Response; use reqwest::StatusCode; -const REDIRECT_PATHS: [&str; 13] = [ +const REDIRECT_PATHS: [&str; 15] = [ "/add", "/backup", "/change_password", "/clear", "/cluster/login", + "/cluster/logout", + "/cluster/admin/logout", "/convert", "/copy", "/delete", @@ -27,8 +29,6 @@ pub(crate) async fn forward_to_leader( request: Request, next: Next, ) -> Response { - let forwarded = request.headers().get("forwarded-by").is_some(); - if REDIRECT_PATHS .iter() .any(|pattern| request.uri().path().ends_with(pattern)) @@ -48,7 +48,7 @@ pub(crate) async fn forward_to_leader( if let Some(commit_index) = response.headers_mut().remove("commit-index") { if let Ok(commit_index) = commit_index.to_str() { if let Ok(commit_index) = commit_index.parse::() { - while state.cluster.raft.read().await.storage.log_commit() + while state.cluster.raft.read().await.storage.log_executed() < commit_index { tokio::time::sleep(std::time::Duration::from_millis(10)).await; @@ -68,14 +68,5 @@ pub(crate) async fn forward_to_leader( } } - let mut response = next.run(request).await; - - if forwarded && response.status().is_success() { - response.headers_mut().insert( - "commit-index", - state.cluster.raft.read().await.storage.log_commit().into(), - ); - } - - response + next.run(request).await } diff --git a/agdb_server/src/raft.rs b/agdb_server/src/raft.rs index a929b346..97d61ae1 100644 --- a/agdb_server/src/raft.rs +++ b/agdb_server/src/raft.rs @@ -1,4 +1,5 @@ use crate::server_error::ServerResult; +use agdb::DbId; use serde::Deserialize; use serde::Serialize; use std::marker::PhantomData; @@ -7,6 +8,7 @@ use std::time::Instant; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub(crate) struct Log { + pub(crate) db_id: Option, pub(crate) index: u64, pub(crate) term: u64, pub(crate) data: T, @@ -80,11 +82,12 @@ struct Node { } pub(crate) trait Storage { - async fn append(&mut self, log: Log, notifier: Option); + async fn append(&mut self, log: Log, notifier: Option) -> ServerResult<()>; async fn commit(&mut self, index: u64) -> ServerResult<()>; fn log_index(&self) -> u64; fn log_term(&self) -> u64; fn log_commit(&self) -> u64; + fn log_executed(&self) -> u64; async fn logs(&self, since_index: u64) -> ServerResult>>; } @@ -159,6 +162,7 @@ impl> Cluster { notifier: Option, ) -> ServerResult>> { let log = Log { + db_id: None, index: self.local().log_index, term: self.term, data, @@ -180,7 +184,7 @@ impl> Cluster { data: RequestType::Append(vec![log.clone()]), }) .collect(); - self.storage.append(log, notifier).await; + self.storage.append(log, notifier).await?; if self.size == 1 { self.commit_storage(self.local().log_index).await?; @@ -319,7 +323,9 @@ impl> Cluster { for log in logs { self.validate_log_commit(request, log)?; - self.append_storage(log).await; + self.append_storage(log) + .await + .map_err(|e| self.commit_error(request, e.description))?; } if self.local().log_commit < request.log_commit { @@ -332,10 +338,11 @@ impl> Cluster { Self::ok(request) } - async fn append_storage(&mut self, log: &Log) { - self.storage.append(log.clone(), None).await; + async fn append_storage(&mut self, log: &Log) -> ServerResult<()> { + self.storage.append(log.clone(), None).await?; self.local_mut().log_index = log.index + 1; self.local_mut().log_term = log.term; + Ok(()) } async fn commit_storage(&mut self, index: u64) -> ServerResult<()> { @@ -672,9 +679,10 @@ mod test { } impl Storage for TestStorage { - async fn append(&mut self, log: Log, _notifier: Option<()>) { + async fn append(&mut self, log: Log, _notifier: Option<()>) -> ServerResult<()> { self.logs.truncate(log.index as usize); self.logs.push(log); + Ok(()) } async fn commit(&mut self, index: u64) -> ServerResult<()> { @@ -694,6 +702,10 @@ mod test { self.commit } + fn log_executed(&self) -> u64 { + self.commit + } + async fn logs(&self, index: u64) -> ServerResult>> { Ok(self.logs[index as usize..].to_vec()) } diff --git a/agdb_server/src/routes/admin/user.rs b/agdb_server/src/routes/admin/user.rs index 2c54dc27..9757128b 100644 --- a/agdb_server/src/routes/admin/user.rs +++ b/agdb_server/src/routes/admin/user.rs @@ -13,6 +13,7 @@ use agdb_api::UserStatus; use axum::extract::Path; use axum::extract::State; use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::Json; #[utoipa::path(post, @@ -38,7 +39,7 @@ pub(crate) async fn add( State(cluster): State, Path(username): Path, Json(request): Json, -) -> ServerResponse { +) -> ServerResponse { password::validate_username(&username)?; password::validate_password(&request.password)?; @@ -48,7 +49,7 @@ pub(crate) async fn add( let pswd = Password::create(&username, &request.password); - cluster + let commit_index = cluster .append(UserAdd { user: username, password: pswd.password.to_vec(), @@ -56,7 +57,10 @@ pub(crate) async fn add( }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(put, @@ -81,12 +85,12 @@ pub(crate) async fn change_password( State(cluster): State, Path(username): Path, Json(request): Json, -) -> ServerResponse { +) -> ServerResponse { let _user = server_db.user_id(&username).await?; password::validate_password(&request.password)?; let pswd = Password::create(&username, &request.password); - cluster + let commit_index = cluster .append(ChangePasswordAction { user: username.to_string(), new_password: pswd.password.to_vec(), @@ -94,7 +98,10 @@ pub(crate) async fn change_password( }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(get, @@ -158,14 +165,17 @@ pub(crate) async fn remove( State(server_db): State, State(cluster): State, Path(username): Path, -) -> ServerResponse { +) -> ServerResponse { server_db.user_id(&username).await?; - cluster + let commit_index = cluster .append(UserRemove { user: username.to_string(), }) .await?; - Ok(StatusCode::NO_CONTENT) + Ok(( + StatusCode::NO_CONTENT, + [("commit-index", commit_index.to_string())], + )) } diff --git a/agdb_server/src/routes/cluster.rs b/agdb_server/src/routes/cluster.rs index 8cddab6b..f266868d 100644 --- a/agdb_server/src/routes/cluster.rs +++ b/agdb_server/src/routes/cluster.rs @@ -16,6 +16,7 @@ use agdb_api::UserLogin; use axum::extract::Path; use axum::extract::State; use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::Json; pub(crate) async fn cluster( @@ -46,17 +47,20 @@ pub(crate) async fn admin_logout( State(server_db): State, State(cluster): State, Path(username): Path, -) -> ServerResponse { +) -> ServerResponse { let _user_id = server_db.user_id(&username).await?; - cluster + let commit_index = cluster .append(ClusterLogin { user: username, new_token: String::new(), }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(post, @@ -73,11 +77,12 @@ pub(crate) async fn login( State(server_db): State, State(cluster): State, Json(request): Json, -) -> ServerResponse<(StatusCode, Json)> { +) -> ServerResponse { let (token, user_id) = do_login(&server_db, &request.username, &request.password).await?; + let mut commit_index = 0; if user_id.is_some() { - cluster + commit_index = cluster .append(ClusterLogin { user: request.username, new_token: token.clone(), @@ -85,7 +90,11 @@ pub(crate) async fn login( .await?; } - Ok((StatusCode::OK, Json(token))) + Ok(( + StatusCode::OK, + [("commit-index", commit_index.to_string())], + Json(token), + )) } #[utoipa::path(post, @@ -102,11 +111,12 @@ pub(crate) async fn logout( user: UserId, State(server_db): State, State(cluster): State, -) -> ServerResponse { +) -> ServerResponse { let token = server_db.user_token(user.0).await?; + let mut commit_index = 0; if !token.is_empty() { - cluster + commit_index = cluster .append(ClusterLogin { user: server_db.user_name(user.0).await?, new_token: String::new(), @@ -114,7 +124,10 @@ pub(crate) async fn logout( .await?; } - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(get, diff --git a/agdb_server/src/routes/user.rs b/agdb_server/src/routes/user.rs index 2154e0e4..ee293944 100644 --- a/agdb_server/src/routes/user.rs +++ b/agdb_server/src/routes/user.rs @@ -15,6 +15,7 @@ use agdb_api::UserLogin; use agdb_api::UserStatus; use axum::extract::State; use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::Json; use uuid::Uuid; @@ -100,7 +101,7 @@ pub(crate) async fn change_password( State(server_db): State, State(cluster): State, Json(request): Json, -) -> ServerResponse { +) -> ServerResponse { let user = server_db.user_by_id(user.0).await?; let old_pswd = Password::new(&user.username, &user.password, &user.salt)?; @@ -111,7 +112,7 @@ pub(crate) async fn change_password( password::validate_password(&request.new_password)?; let pswd = Password::create(&user.username, &request.new_password); - cluster + let commit_index = cluster .append(ChangePasswordAction { user: user.username, new_password: pswd.password.to_vec(), @@ -119,7 +120,10 @@ pub(crate) async fn change_password( }) .await?; - Ok(StatusCode::CREATED) + Ok(( + StatusCode::CREATED, + [("commit-index", commit_index.to_string())], + )) } #[utoipa::path(get, diff --git a/agdb_server/src/server_db.rs b/agdb_server/src/server_db.rs index f1ddab09..ffa19bd9 100644 --- a/agdb_server/src/server_db.rs +++ b/agdb_server/src/server_db.rs @@ -18,6 +18,8 @@ use agdb::QueryBuilder; use agdb::QueryId; use agdb::QueryResult; use agdb::SearchQuery; +use agdb::StorageData; +use agdb::Transaction; use agdb::UserValue; use agdb_api::DbType; use agdb_api::DbUser; @@ -49,6 +51,7 @@ pub(crate) struct ServerDb(pub(crate) Arc>); const ADMIN: &str = "admin"; const CLUSTER_LOG: &str = "cluster_log"; +const COMMITTED: &str = "committed"; const DBS: &str = "dbs"; const EXECUTED: &str = "executed"; const NAME: &str = "name"; @@ -128,7 +131,7 @@ impl ServerDb { Ok(Self(Arc::new(RwLock::new(db)))) } - pub(crate) async fn cluster_log(&self) -> ServerResult<(u64, u64)> { + pub(crate) async fn cluster_log(&self) -> ServerResult<(u64, u64, u64)> { self.0.write().await.transaction_mut(|t| { if let Some(e) = t .exec( @@ -145,35 +148,49 @@ impl ServerDb { .elements .first() { - Ok((e.values[0].value.to_u64()?, e.values[1].value.to_u64()?)) - } else { - Ok((0, 0)) + let commit = if let Some(c) = t + .exec( + QueryBuilder::select() + .values("index") + .search() + .depth_first() + .from(CLUSTER_LOG) + .limit(1) + .where_() + .distance(CountComparison::Equal(2)) + .and() + .not() + .keys(COMMITTED) + .query(), + )? + .elements + .first() + { + c.values[0].value.to_u64()? + } else { + 0 + }; + + return Ok(( + e.values[0].value.to_u64()?, + e.values[1].value.to_u64()?, + commit, + )); } + + Ok((0, 0, 0)) }) } - pub(crate) async fn commit_log(&self, log: &Log) -> ServerResult { - self.0.write().await.transaction_mut( - |t: &mut agdb::TransactionMut<'_, agdb::FileStorageMemoryMapped>| { - let mut values = log_db_values(log); - values.push((EXECUTED, false).into()); - let log_id = t - .exec_mut(QueryBuilder::insert().nodes().values([values]).query())? - .elements[0] - .id; - t.exec_mut( - QueryBuilder::insert() - .edges() - .from(CLUSTER_LOG) - .to(log_id) - .query(), - )?; - Ok(log_id) - }, - ) + pub(crate) async fn log_committed(&self, log_id: DbId) -> ServerResult<()> { + self.0 + .write() + .await + .exec_mut(QueryBuilder::remove().values(COMMITTED).ids(log_id).query())?; + Ok(()) } - pub(crate) async fn commit_log_executed(&self, log_id: DbId) -> ServerResult<()> { + pub(crate) async fn log_executed(&self, log_id: DbId) -> ServerResult<()> { self.0 .write() .await @@ -369,31 +386,94 @@ impl ServerDb { == 1) } - pub(crate) async fn logs_unexecuted(&self) -> ServerResult>> { - if let Some(index) = self - .0 - .read() - .await - .exec( - QueryBuilder::select() - .values("index") - .search() - .index(EXECUTED) - .value(false) - .query(), - )? - .elements - .into_iter() - .map(|e| e.values[0].value.to_u64().unwrap_or_default()) - .min() - { - self.logs(index).await - } else { - Ok(Vec::new()) - } + pub(crate) async fn append_log(&self, log: &Log) -> ServerResult { + self.0.write().await.transaction_mut( + |t: &mut agdb::TransactionMut<'_, agdb::FileStorageMemoryMapped>| { + let mut values = log_db_values(log); + values.push((COMMITTED, false).into()); + values.push((EXECUTED, false).into()); + + let log_id = t + .exec_mut(QueryBuilder::insert().nodes().values([values]).query())? + .elements[0] + .id; + t.exec_mut( + QueryBuilder::insert() + .edges() + .from(CLUSTER_LOG) + .to(log_id) + .query(), + )?; + Ok(log_id) + }, + ) } - pub(crate) async fn logs(&self, since_index: u64) -> ServerResult>> { + pub(crate) async fn logs_unexecuted_until( + &self, + index: u64, + ) -> ServerResult>> { + self.0.read().await.transaction(|t| { + let mut log_ids: Vec<(u64, DbId)> = t + .exec( + QueryBuilder::select() + .values("index") + .search() + .index(EXECUTED) + .value(false) + .query(), + )? + .elements + .into_iter() + .filter_map(|e| { + let log_index = e.values[0].value.to_u64().unwrap_or_default(); + + if log_index <= index { + Some((log_index, e.id)) + } else { + None + } + }) + .collect(); + log_ids.sort_by_key(|l| l.0); + Self::logs(t, log_ids.into_iter().map(|l| l.1).collect()) + }) + } + + pub(crate) async fn remove_unexecuted_logs_since(&self, since_index: u64) -> ServerResult<()> { + self.0.write().await.transaction_mut(|t| { + let logs: Vec = t + .exec( + QueryBuilder::select() + .values(["index", "term"]) + .search() + .index(EXECUTED) + .value(false) + .query(), + )? + .elements + .into_iter() + .filter_map(|e| { + let index = e.values[0].value.to_u64().unwrap_or_default(); + + if index >= since_index { + Some(e.id) + } else { + None + } + }) + .collect(); + + t.exec_mut(QueryBuilder::remove().ids(logs).query()) + })?; + + Ok(()) + } + + pub(crate) async fn logs_since( + &self, + since_index: u64, + ) -> ServerResult>> { self.0.read().await.transaction(|t| { let log_count = t .exec( @@ -406,77 +486,94 @@ impl ServerDb { .values[0] .value .to_u64()?; + let mut log_ids = t + .exec( + QueryBuilder::search() + .depth_first() + .from(CLUSTER_LOG) + .limit(log_count - since_index) + .where_() + .distance(CountComparison::Equal(2)) + .query(), + )? + .ids(); - let mut actions = Vec::new(); + log_ids.reverse(); + Self::logs(t, log_ids) + }) + } - if log_count < since_index { - for element in t - .exec( + fn logs( + t: &Transaction, + log_ids: Vec, + ) -> ServerResult>> { + let mut actions = Vec::new(); + + for element in t + .exec( + QueryBuilder::select() + .values(["index", "term", "action"]) + .ids(log_ids) + .query(), + )? + .elements + { + let index = element.values[0].value.to_u64()?; + let term = element.values[1].value.to_u64()?; + let action = element.values[2].value.string()?; + + let data = match action.as_str() { + "UserAdd" => Ok(ClusterAction::UserAdd( + t.exec( QueryBuilder::select() - .values(["index", "term", "action"]) - .search() - .depth_first() - .from(CLUSTER_LOG) - .limit(log_count - since_index) - .where_() - .distance(CountComparison::Equal(2)) + .elements::() + .ids(element.id) .query(), )? - .elements - { - let index = element.values[0].value.to_u64()?; - let term = element.values[1].value.to_u64()?; - let action = element.values[2].value.string()?; - - let data = match action.as_str() { - "UserAdd" => Ok(ClusterAction::UserAdd( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "ClusterLogin" => Ok(ClusterAction::ClusterLogin( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "ChangePassword" => Ok(ClusterAction::ChangePassword( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - "UserRemove" => Ok(ClusterAction::UserRemove( - t.exec( - QueryBuilder::select() - .elements::() - .ids(element.id) - .query(), - )? - .try_into()?, - )), - _ => Err(ServerError::new( - StatusCode::INTERNAL_SERVER_ERROR, - &format!("unknown action: {action}"), - )), - }?; - - actions.push(Log { index, term, data }); - } - } + .try_into()?, + )), + "ClusterLogin" => Ok(ClusterAction::ClusterLogin( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "ChangePassword" => Ok(ClusterAction::ChangePassword( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + "UserRemove" => Ok(ClusterAction::UserRemove( + t.exec( + QueryBuilder::select() + .elements::() + .ids(element.id) + .query(), + )? + .try_into()?, + )), + _ => Err(ServerError::new( + StatusCode::INTERNAL_SERVER_ERROR, + &format!("unknown action: {action}"), + )), + }?; + + actions.push(Log { + db_id: Some(element.id), + index, + term, + data, + }); + } - Ok(actions) - }) + Ok(actions) } pub(crate) async fn remove_db(&self, user: DbId, db: &str) -> ServerResult<()> { diff --git a/agdb_server/tests/routes/cluster_test.rs b/agdb_server/tests/routes/cluster_test.rs index 8aad45ce..acfdcdb4 100644 --- a/agdb_server/tests/routes/cluster_test.rs +++ b/agdb_server/tests/routes/cluster_test.rs @@ -137,7 +137,7 @@ async fn user() -> anyhow::Result<()> { .await?; let mut leader = leader.client.write().await; - leader.cluster_login(ADMIN, ADMIN).await?; + leader.user_login(ADMIN, ADMIN).await?; leader.admin_cluster_logout("user1").await?; leader.admin_user_remove("user1").await?; client.write().await.user_login(ADMIN, ADMIN).await?;