Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Persist log on append #1387 #1390

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions agdb_server/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,17 @@ pub(crate) enum ClusterAction {
UserRemove(UserRemove),
}

#[derive(Clone, Serialize, Deserialize)]
pub(crate) enum ClusterResponse {
None,
}

pub(crate) trait Action: Sized {
async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse>;
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult;
}

impl Action for ClusterAction {
async fn exec(
impl ClusterAction {
pub(crate) async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult<()> {
match self {
ClusterAction::UserAdd(action) => action.exec(db, db_pool, config).await,
ClusterAction::ClusterLogin(action) => action.exec(db, db_pool, config).await,
Expand Down
5 changes: 2 additions & 3 deletions agdb_server/src/action/change_password.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -21,12 +20,12 @@ impl Action for ChangePassword {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
let mut user = db.user(&self.user).await?;
user.password = self.new_password;
user.salt = self.new_salt;
db.save_user(user).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
5 changes: 2 additions & 3 deletions agdb_server/src/action/cluster_login.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -20,10 +19,10 @@ impl Action for ClusterLogin {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
let user_id = db.user_id(&self.user).await?;
db.save_token(user_id, &self.new_token).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
5 changes: 2 additions & 3 deletions agdb_server/src/action/user_add.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::action::Config;
use crate::server_db::ServerUser;
use crate::server_error::ServerResult;
Expand All @@ -22,7 +21,7 @@ impl Action for UserAdd {
db: &mut ServerDb,
_db_pool: &mut DbPool,
_config: &Config,
) -> ServerResult<ClusterResponse> {
) -> ServerResult {
db.insert_user(ServerUser {
db_id: None,
username: self.user,
Expand All @@ -32,6 +31,6 @@ impl Action for UserAdd {
})
.await?;

Ok(ClusterResponse::None)
Ok(())
}
}
10 changes: 2 additions & 8 deletions agdb_server/src/action/user_remove.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::DbPool;
use super::ServerDb;
use crate::action::Action;
use crate::action::ClusterResponse;
use crate::config::Config;
use crate::server_error::ServerResult;
use agdb::UserValue;
Expand All @@ -14,15 +13,10 @@ pub(crate) struct UserRemove {
}

impl Action for UserRemove {
async fn exec(
self,
db: &mut ServerDb,
db_pool: &mut DbPool,
config: &Config,
) -> ServerResult<ClusterResponse> {
async fn exec(self, db: &mut ServerDb, db_pool: &mut DbPool, config: &Config) -> ServerResult {
let dbs = db.remove_user(&self.user).await?;
db_pool.remove_user_dbs(&self.user, &dbs, config).await?;

Ok(ClusterResponse::None)
Ok(())
}
}
109 changes: 52 additions & 57 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::action::Action;
use crate::action::ClusterAction;
use crate::action::ClusterResponse;
use crate::config::Config;
use crate::db_pool::DbPool;
use crate::raft;
Expand All @@ -10,14 +9,15 @@ use crate::raft::Response;
use crate::raft::Storage;
use crate::server_db::ServerDb;
use crate::server_error::ServerResult;
use agdb::DbId;
use agdb::StableHash;
use agdb_api::HttpClient;
use agdb_api::ReqwestClient;
use axum::body::Body;
use axum::extract::Request as AxumRequest;
use axum::response::Response as AxumResponse;
use reqwest::StatusCode;
use std::collections::VecDeque;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
Expand All @@ -33,7 +33,7 @@ use tokio::sync::RwLock;
pub(crate) type Cluster = Arc<ClusterImpl>;

type ClusterNode = Arc<ClusterNodeImpl>;
type ResultNotifier = tokio::sync::oneshot::Sender<ServerResult<ClusterResponse>>;
type ResultNotifier = tokio::sync::oneshot::Sender<ServerResult<u64>>;
type ClusterResponseReceiver = UnboundedReceiver<(Request<ClusterAction>, Response)>;

pub(crate) struct ClusterNodeImpl {
Expand All @@ -57,8 +57,8 @@ impl ClusterImpl {
pub(crate) async fn append<T: Action + Into<ClusterAction>>(
&self,
action: T,
) -> ServerResult<ClusterResponse> {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerResult<ClusterResponse>>();
) -> ServerResult<u64> {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerResult<u64>>();
let requests = self
.raft
.write()
Expand Down Expand Up @@ -291,91 +291,99 @@ pub(crate) async fn start_with_shutdown(
}

pub(crate) struct ClusterStorage {
logs: VecDeque<(Log<ClusterAction>, Option<ResultNotifier>)>,
result_notifiers: HashMap<DbId, ResultNotifier>,
index: u64,
term: u64,
commit: Arc<AtomicU64>,
commit: u64,
executed: Arc<AtomicU64>,
db: ServerDb,
db_pool: DbPool,
config: Config,
}

impl ClusterStorage {
async fn new(db: ServerDb, db_pool: DbPool, config: Config) -> ServerResult<Self> {
let (index, term) = db.cluster_log().await?;
let unexecuted_logs = db.logs_unexecuted().await?;
let (index, term, commit) = db.cluster_log().await?;
let logs = db.logs_unexecuted_until(commit).await?;

let storage = Self {
logs: VecDeque::new(),
let mut storage = Self {
result_notifiers: HashMap::new(),
index,
term,
commit: Arc::new(AtomicU64::new(index)),
commit,
executed: Arc::new(AtomicU64::new(index)),
db,
db_pool,
config,
};

for log in unexecuted_logs {
storage.execute_log(log, None).await?;
for log in logs {
storage.execute_log(log).await?;
}

Ok(storage)
}

async fn execute_log(
&self,
log: Log<ClusterAction>,
notifier: Option<ResultNotifier>,
) -> Result<(), crate::server_error::ServerError> {
let log_id = self.db.commit_log(&log).await?;
let commit = self.commit.clone();
async fn execute_log(&mut self, log: Log<ClusterAction>) -> ServerResult<()> {
let log_id = log.db_id.unwrap_or_default();
let executed = self.executed.clone();
let mut db = self.db.clone();
let mut db_pool = self.db_pool.clone();
let config = self.config.clone();
let notifier = self.result_notifiers.remove(&log_id);

tokio::spawn(async move {
let result = log.data.exec(&mut db, &mut db_pool, &config).await;
commit.fetch_max(log.index, Ordering::Relaxed);
let _ = db.commit_log_executed(log_id).await;
executed.fetch_max(log.index, Ordering::Relaxed);
let _ = db.log_executed(log_id).await;

if let Some(notifier) = notifier {
let _ = notifier.send(result);
let _ = notifier.send(result.map(|_| log.index));
}
});

Ok(())
}
}

impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
async fn append(&mut self, log: Log<ClusterAction>, notifier: Option<ResultNotifier>) {
if let Some(index) = self
.logs
.iter()
.rev()
.position(|(l, _)| l.index == log.index && l.term == log.term)
{
self.logs.truncate(index);
}

async fn append(
&mut self,
log: Log<ClusterAction>,
notifier: Option<ResultNotifier>,
) -> ServerResult<()> {
self.db.remove_unexecuted_logs_since(log.index).await?;
let log_id = self.db.append_log(&log).await?;
self.index = log.index;
self.term = log.term;
self.logs.push_back((log, notifier));

if let Some(notifier) = notifier {
self.result_notifiers.insert(log_id, notifier);
}

Ok(())
}

async fn commit(&mut self, index: u64) -> ServerResult<()> {
while let Some((log, _notifier)) = self.logs.front() {
if log.index <= index {
if let Some((log, notifier)) = self.logs.pop_front() {
self.execute_log(log, notifier).await?;
}
} else {
break;
}
for log in self.db.logs_unexecuted_until(index).await? {
self.commit = index;
self.db
.log_committed(log.db_id.expect("log should have db_id"))
.await?;
self.execute_log(log).await?;
}

Ok(())
}

fn log_commit(&self) -> u64 {
self.commit
}

fn log_executed(&self) -> u64 {
self.executed.load(Ordering::Relaxed)
}

fn log_index(&self) -> u64 {
self.index
}
Expand All @@ -384,20 +392,7 @@ impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
self.term
}

fn log_commit(&self) -> u64 {
self.commit.load(Ordering::Relaxed)
}

async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log<ClusterAction>>> {
let mut logs = self.db.logs(since_index).await?;
logs.extend_from_slice(
&self
.logs
.iter()
.map(|(log, _)| log)
.cloned()
.collect::<Vec<Log<ClusterAction>>>(),
);
Ok(logs)
self.db.logs_since(since_index).await
}
}
19 changes: 5 additions & 14 deletions agdb_server/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use axum::middleware::Next;
use axum::response::Response;
use reqwest::StatusCode;

const REDIRECT_PATHS: [&str; 13] = [
const REDIRECT_PATHS: [&str; 15] = [
"/add",
"/backup",
"/change_password",
"/clear",
"/cluster/login",
"/cluster/logout",
"/cluster/admin/logout",
"/convert",
"/copy",
"/delete",
Expand All @@ -27,8 +29,6 @@ pub(crate) async fn forward_to_leader(
request: Request,
next: Next,
) -> Response {
let forwarded = request.headers().get("forwarded-by").is_some();

if REDIRECT_PATHS
.iter()
.any(|pattern| request.uri().path().ends_with(pattern))
Expand All @@ -48,7 +48,7 @@ pub(crate) async fn forward_to_leader(
if let Some(commit_index) = response.headers_mut().remove("commit-index") {
if let Ok(commit_index) = commit_index.to_str() {
if let Ok(commit_index) = commit_index.parse::<u64>() {
while state.cluster.raft.read().await.storage.log_commit()
while state.cluster.raft.read().await.storage.log_executed()
< commit_index
{
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Expand All @@ -68,14 +68,5 @@ pub(crate) async fn forward_to_leader(
}
}

let mut response = next.run(request).await;

if forwarded && response.status().is_success() {
response.headers_mut().insert(
"commit-index",
state.cluster.raft.read().await.storage.log_commit().into(),
);
}

response
next.run(request).await
}
Loading
Loading