Skip to content

Commit

Permalink
[server] Persist log on append #1387 (#1390)
Browse files Browse the repository at this point in the history
* separate commit & execution

* Update cluster.rs

* refactor
  • Loading branch information
michaelvlach authored Dec 19, 2024
1 parent c179f7f commit add4052
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 242 deletions.
18 changes: 4 additions & 14 deletions agdb_server/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,17 @@ pub(crate) enum ClusterAction {
UserRemove(UserRemove),
}

#[derive(Clone, Serialize, Deserialize)]
pub(crate) enum ClusterResponse {
None,
}

pub(crate) trait Action: Sized {
async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse>;
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult;
}

impl Action for ClusterAction {
async fn exec(
impl ClusterAction {
pub(crate) async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult<()> {
match self {
ClusterAction::UserAdd(action) => action.exec(db, db_pool, config).await,
ClusterAction::ClusterLogin(action) => action.exec(db, db_pool, config).await,
Expand Down
5 changes: 2 additions & 3 deletions agdb_server/src/action/change_password.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -21,12 +20,12 @@ impl Action for ChangePassword {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
let mut user = db.user(&self.user).await?;
user.password = self.new_password;
user.salt = self.new_salt;
db.save_user(user).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
5 changes: 2 additions & 3 deletions agdb_server/src/action/cluster_login.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -20,10 +19,10 @@ impl Action for ClusterLogin {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
let user_id = db.user_id(&self.user).await?;
db.save_token(user_id, &self.new_token).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
5 changes: 2 additions & 3 deletions agdb_server/src/action/user_add.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_db::ServerUser;
use crate::server_error::ServerResult;
Expand All @@ -22,7 +21,7 @@ impl Action for UserAdd {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
db.insert_user(ServerUser {
db_id: None,
username: self.user,
Expand All @@ -32,6 +31,6 @@ impl Action for UserAdd {
})
.await?;

Ok(ClusterResponse::None)
Ok(())
}
}
10 changes: 2 additions & 8 deletions agdb_server/src/action/user_remove.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::config::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -14,15 +13,10 @@ pub(crate) struct UserRemove {
}

impl Action for UserRemove {
async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse> {
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult {
let dbs = db.remove_user(&self.user).await?;
db_pool.remove_user_dbs(&self.user, &dbs, config).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
109 changes: 52 additions & 57 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::action::Action;
use crate::action::ClusterAction;
use crate::action::ClusterResponse;
use crate::config::Config;
use crate::db_pool::DbPool;
use crate::raft;
Expand All @@ -10,14 +9,15 @@ use crate::raft::Response;
use crate::raft::Storage;
use crate::server_db::ServerDb;
use crate::server_error::ServerResult;
use agdb::DbId;
use agdb::StableHash;
use agdb_api::HttpClient;
use agdb_api::ReqwestClient;
use axum::body::Body;
use axum::extract::Request as AxumRequest;
use axum::response::Response as AxumResponse;
use reqwest::StatusCode;
use std::collections::VecDeque;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand All @@ -33,7 +33,7 @@ use tokio::sync::RwLock;
pub(crate) type Cluster = Arc<ClusterImpl>;

type ClusterNode = Arc<ClusterNodeImpl>;
type ResultNotifier = tokio::sync::oneshot::Sender<ServerResult<ClusterResponse>>;
type ResultNotifier = tokio::sync::oneshot::Sender<ServerResult<u64>>;
type ClusterResponseReceiver = UnboundedReceiver<(Request<ClusterAction>, Response)>;

pub(crate) struct ClusterNodeImpl {
Expand All @@ -57,8 +57,8 @@ impl ClusterImpl {
pub(crate) async fn append<T: Action + Into<ClusterAction>>(
&self,
action: T,
) -> ServerResult<ClusterResponse> {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerResult<ClusterResponse>>();
) -> ServerResult<u64> {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerResult<u64>>();
let requests = self
.raft
.write()
Expand Down Expand Up @@ -291,91 +291,99 @@ pub(crate) async fn start_with_shutdown(
}

pub(crate) struct ClusterStorage {
logs: VecDeque<(Log<ClusterAction>, Option<ResultNotifier>)>,
result_notifiers: HashMap<DbId, ResultNotifier>,
index: u64,
term: u64,
commit: Arc<AtomicU64>,
commit: u64,
executed: Arc<AtomicU64>,
db: ServerDb,
db_pool: DbPool,
config: Config,
}

impl ClusterStorage {
async fn new(db: ServerDb, db_pool: DbPool, config: Config) -> ServerResult<Self> {
let (index, term) = db.cluster_log().await?;
let unexecuted_logs = db.logs_unexecuted().await?;
let (index, term, commit) = db.cluster_log().await?;
let logs = db.logs_unexecuted_until(commit).await?;

let storage = Self {
logs: VecDeque::new(),
let mut storage = Self {
result_notifiers: HashMap::new(),
index,
term,
commit: Arc::new(AtomicU64::new(index)),
commit,
executed: Arc::new(AtomicU64::new(index)),
db,
db_pool,
config,
};

for log in unexecuted_logs {
storage.execute_log(log, None).await?;
for log in logs {
storage.execute_log(log).await?;
}

Ok(storage)
}

async fn execute_log(
&self,
log: Log<ClusterAction>,
notifier: Option<ResultNotifier>,
) -> Result<(), crate::server_error::ServerError> {
let log_id = self.db.commit_log(&log).await?;
let commit = self.commit.clone();
async fn execute_log(&mut self, log: Log<ClusterAction>) -> ServerResult<()> {
let log_id = log.db_id.unwrap_or_default();
let executed = self.executed.clone();
let mut db = self.db.clone();
let mut db_pool = self.db_pool.clone();
let config = self.config.clone();
let notifier = self.result_notifiers.remove(&log_id);

tokio::spawn(async move {
let result = log.data.exec(&mut db, &mut db_pool, &config).await;
commit.fetch_max(log.index, Ordering::Relaxed);
let _ = db.commit_log_executed(log_id).await;
executed.fetch_max(log.index, Ordering::Relaxed);
let _ = db.log_executed(log_id).await;

if let Some(notifier) = notifier {
let _ = notifier.send(result);
let _ = notifier.send(result.map(|_| log.index));
}
});

Ok(())
}
}

impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
async fn append(&mut self, log: Log<ClusterAction>, notifier: Option<ResultNotifier>) {
if let Some(index) = self
.logs
.iter()
.rev()
.position(|(l, _)| l.index == log.index && l.term == log.term)
{
self.logs.truncate(index);
}

async fn append(
&mut self,
log: Log<ClusterAction>,
notifier: Option<ResultNotifier>,
) -> ServerResult<()> {
self.db.remove_unexecuted_logs_since(log.index).await?;
let log_id = self.db.append_log(&log).await?;
self.index = log.index;
self.term = log.term;
self.logs.push_back((log, notifier));

if let Some(notifier) = notifier {
self.result_notifiers.insert(log_id, notifier);
}

Ok(())
}

async fn commit(&mut self, index: u64) -> ServerResult<()> {
while let Some((log, _notifier)) = self.logs.front() {
if log.index <= index {
if let Some((log, notifier)) = self.logs.pop_front() {
self.execute_log(log, notifier).await?;
}
} else {
break;
}
for log in self.db.logs_unexecuted_until(index).await? {
self.commit = index;
self.db
.log_committed(log.db_id.expect("log should have db_id"))
.await?;
self.execute_log(log).await?;
}

Ok(())
}

fn log_commit(&self) -> u64 {
self.commit
}

fn log_executed(&self) -> u64 {
self.executed.load(Ordering::Relaxed)
}

fn log_index(&self) -> u64 {
self.index
}
Expand All @@ -384,20 +392,7 @@ impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
self.term
}

fn log_commit(&self) -> u64 {
self.commit.load(Ordering::Relaxed)
}

async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log<ClusterAction>>> {
let mut logs = self.db.logs(since_index).await?;
logs.extend_from_slice(
&self
.logs
.iter()
.map(|(log, _)| log)
.cloned()
.collect::<Vec<Log<ClusterAction>>>(),
);
Ok(logs)
self.db.logs_since(since_index).await
}
}
19 changes: 5 additions & 14 deletions agdb_server/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use axum::middleware::Next;
use axum::response::Response;
use reqwest::StatusCode;

const REDIRECT_PATHS: [&str; 13] = [
const REDIRECT_PATHS: [&str; 15] = [
"/add",
"/backup",
"/change_password",
"/clear",
"/cluster/login",
"/cluster/logout",
"/cluster/admin/logout",
"/convert",
"/copy",
"/delete",
Expand All @@ -27,8 +29,6 @@ pub(crate) async fn forward_to_leader(
request: Request,
next: Next,
) -> Response {
let forwarded = request.headers().get("forwarded-by").is_some();

if REDIRECT_PATHS
.iter()
.any(|pattern| request.uri().path().ends_with(pattern))
Expand All @@ -48,7 +48,7 @@ pub(crate) async fn forward_to_leader(
if let Some(commit_index) = response.headers_mut().remove("commit-index") {
if let Ok(commit_index) = commit_index.to_str() {
if let Ok(commit_index) = commit_index.parse::<u64>() {
while state.cluster.raft.read().await.storage.log_commit()
while state.cluster.raft.read().await.storage.log_executed()
< commit_index
{
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Expand All @@ -68,14 +68,5 @@ pub(crate) async fn forward_to_leader(
}
}

let mut response = next.run(request).await;

if forwarded && response.status().is_success() {
response.headers_mut().insert(
"commit-index",
state.cluster.raft.read().await.storage.log_commit().into(),
);
}

response
next.run(request).await
}
Loading

0 comments on commit add4052

Please sign in to comment.