Skip to content

Commit

Permalink
[server] Wait for local commit when forwarding to leader #1385 (#1386)
Browse files Browse the repository at this point in the history
* commit index receiver

* add commit-index header to responses
  • Loading branch information
michaelvlach authored Dec 16, 2024
1 parent a9e3b96 commit c645d5a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 132 deletions.
66 changes: 42 additions & 24 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reqwest::StatusCode;
use std::collections::VecDeque;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -125,19 +126,18 @@ impl ClusterNodeImpl {
url,
)
.headers(parts.headers)
.header("Forwarded-By", local_index)
.header("forwarded-by", local_index)
.body(reqwest::Body::wrap_stream(body.into_data_stream()))
.send()
.await
.map_err(|e| Self::bad_request(&e.to_string()))?;

let mut axum_response = AxumResponse::builder().status(response.status());

if let Some(headers) = axum_response.headers_mut() {
std::mem::swap(headers, response.headers_mut());
std::mem::swap(headers, response.headers_mut())
}

tracing::info!("Responding...");

axum_response
.body(Body::from_stream(response.bytes_stream()))
.map_err(|e| Self::bad_request(&e.to_string()))
Expand Down Expand Up @@ -294,24 +294,55 @@ pub(crate) struct ClusterStorage {
logs: VecDeque<(Log<ClusterAction>, Option<ResultNotifier>)>,
index: u64,
term: u64,
commit: u64,
commit: Arc<AtomicU64>,
db: ServerDb,
db_pool: DbPool,
config: Config,
}

impl ClusterStorage {
async fn new(db: ServerDb, db_pool: DbPool, config: Config) -> ServerResult<Self> {
let (index, term, commit) = db.cluster_log().await?;
Ok(Self {
let (index, term) = db.cluster_log().await?;
let unexecuted_logs = db.logs_unexecuted().await?;

let storage = Self {
logs: VecDeque::new(),
index,
term,
commit,
commit: Arc::new(AtomicU64::new(index)),
db,
db_pool,
config,
})
};

for log in unexecuted_logs {
storage.execute_log(log, None).await?;
}

Ok(storage)
}

async fn execute_log(
&self,
log: Log<ClusterAction>,
notifier: Option<ResultNotifier>,
) -> Result<(), crate::server_error::ServerError> {
let log_id = self.db.commit_log(&log).await?;
let commit = self.commit.clone();
let mut db = self.db.clone();
let mut db_pool = self.db_pool.clone();
let config = self.config.clone();

tokio::spawn(async move {
let result = log.data.exec(&mut db, &mut db_pool, &config).await;
commit.fetch_max(log.index, Ordering::Relaxed);
let _ = db.commit_log_executed(log_id).await;

if let Some(notifier) = notifier {
let _ = notifier.send(result);
}
});
Ok(())
}
}

Expand All @@ -335,20 +366,7 @@ impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
while let Some((log, _notifier)) = self.logs.front() {
if log.index <= index {
if let Some((log, notifier)) = self.logs.pop_front() {
self.commit = log.index;
self.db.commit_log(&log).await?;

let mut db = self.db.clone();
let mut db_pool = self.db_pool.clone();
let config = self.config.clone();

tokio::spawn(async move {
let result = log.data.exec(&mut db, &mut db_pool, &config).await;

if let Some(notifier) = notifier {
let _ = notifier.send(result);
}
});
self.execute_log(log, notifier).await?;
}
} else {
break;
Expand All @@ -367,7 +385,7 @@ impl Storage<ClusterAction, ResultNotifier> for ClusterStorage {
}

fn log_commit(&self) -> u64 {
self.commit
self.commit.load(Ordering::Relaxed)
}

async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log<ClusterAction>>> {
Expand Down
32 changes: 30 additions & 2 deletions agdb_server/src/forward.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::raft::Storage;
use crate::server_state::ServerState;
use axum::extract::Request;
use axum::extract::State;
Expand Down Expand Up @@ -26,20 +27,38 @@ pub(crate) async fn forward_to_leader(
request: Request,
next: Next,
) -> Response {
let forwarded = request.headers().get("forwarded-by").is_some();

if REDIRECT_PATHS
.iter()
.any(|pattern| request.uri().path().ends_with(pattern))
{
let leader = state.cluster.raft.read().await.leader();
if let Some(leader) = leader {
if state.cluster.index != leader as usize {
return match state.cluster.nodes[leader as usize]
let mut response = match state.cluster.nodes[leader as usize]
.forward(request, state.cluster.index)
.await
{
Ok(r) => r,
Err(r) => r,
};

if response.status().is_success() {
if let Some(commit_index) = response.headers_mut().remove("commit-index") {
if let Ok(commit_index) = commit_index.to_str() {
if let Ok(commit_index) = commit_index.parse::<u64>() {
while state.cluster.raft.read().await.storage.log_commit()
< commit_index
{
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
}
}

return response;
}
} else {
return Response::builder()
Expand All @@ -49,5 +68,14 @@ pub(crate) async fn forward_to_leader(
}
}

next.run(request).await
let mut response = next.run(request).await;

if forwarded && response.status().is_success() {
response.headers_mut().insert(
"commit-index",
state.cluster.raft.read().await.storage.log_commit().into(),
);
}

response
}
77 changes: 49 additions & 28 deletions agdb_server/src/server_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub(crate) struct ServerDb(pub(crate) Arc<RwLock<Db>>);
const ADMIN: &str = "admin";
const CLUSTER_LOG: &str = "cluster_log";
const DBS: &str = "dbs";
const EXECUTED: &str = "executed";
const NAME: &str = "name";
const ROLE: &str = "role";
const TOKEN: &str = "token";
Expand Down Expand Up @@ -103,6 +104,10 @@ impl ServerDb {
t.exec_mut(QueryBuilder::insert().index(TOKEN).query())?;
}

if !indexes.iter().any(|i| i == EXECUTED) {
t.exec_mut(QueryBuilder::insert().index(EXECUTED).query())?;
}

if t.exec(QueryBuilder::select().ids(USERS).query()).is_err() {
t.exec_mut(QueryBuilder::insert().nodes().aliases(USERS).query())?;
}
Expand All @@ -123,20 +128,8 @@ impl ServerDb {
Ok(Self(Arc::new(RwLock::new(db))))
}

pub(crate) async fn cluster_log(&self) -> ServerResult<(u64, u64, u64)> {
pub(crate) async fn cluster_log(&self) -> ServerResult<(u64, u64)> {
self.0.write().await.transaction_mut(|t| {
let commit = t
.exec(
QueryBuilder::select()
.edge_count_from()
.ids(CLUSTER_LOG)
.query(),
)?
.elements[0]
.values[0]
.value
.to_u64()
.unwrap_or_default();
if let Some(e) = t
.exec(
QueryBuilder::select()
Expand All @@ -152,38 +145,42 @@ impl ServerDb {
.elements
.first()
{
Ok((
e.values[0].value.to_u64()?,
e.values[1].value.to_u64()?,
commit,
))
Ok((e.values[0].value.to_u64()?, e.values[1].value.to_u64()?))
} else {
Ok((0, 0, 0))
Ok((0, 0))
}
})
}

pub(crate) async fn commit_log(&self, log: &Log<ClusterAction>) -> ServerResult<()> {
pub(crate) async fn commit_log(&self, log: &Log<ClusterAction>) -> ServerResult<DbId> {
self.0.write().await.transaction_mut(
|t: &mut agdb::TransactionMut<'_, agdb::FileStorageMemoryMapped>| -> ServerResult<()> {
let log_id = t.exec_mut(
QueryBuilder::insert()
.nodes()
.values([log_db_values(log)])
.query(),
)?;
|t: &mut agdb::TransactionMut<'_, agdb::FileStorageMemoryMapped>| {
let mut values = log_db_values(log);
values.push((EXECUTED, false).into());
let log_id = t
.exec_mut(QueryBuilder::insert().nodes().values([values]).query())?
.elements[0]
.id;
t.exec_mut(
QueryBuilder::insert()
.edges()
.from(CLUSTER_LOG)
.to(log_id)
.query(),
)?;
Ok(())
Ok(log_id)
},
)
}

pub(crate) async fn commit_log_executed(&self, log_id: DbId) -> ServerResult<()> {
self.0
.write()
.await
.exec_mut(QueryBuilder::remove().values(EXECUTED).ids(log_id).query())?;
Ok(())
}

pub(crate) async fn db_count(&self) -> ServerResult<u64> {
Ok(self
.0
Expand Down Expand Up @@ -372,6 +369,30 @@ impl ServerDb {
== 1)
}

pub(crate) async fn logs_unexecuted(&self) -> ServerResult<Vec<Log<ClusterAction>>> {
if let Some(index) = self
.0
.read()
.await
.exec(
QueryBuilder::select()
.values("index")
.search()
.index(EXECUTED)
.value(false)
.query(),
)?
.elements
.into_iter()
.map(|e| e.values[0].value.to_u64().unwrap_or_default())
.min()
{
self.logs(index).await
} else {
Ok(Vec::new())
}
}

pub(crate) async fn logs(&self, since_index: u64) -> ServerResult<Vec<Log<ClusterAction>>> {
self.0.read().await.transaction(|t| {
let log_count = t
Expand Down
Loading

0 comments on commit c645d5a

Please sign in to comment.