Skip to content

Commit

Permalink
Transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Almaju committed Feb 10, 2024
1 parent e1d3f25 commit 8bf3b34
Show file tree
Hide file tree
Showing 21 changed files with 186 additions and 156 deletions.
14 changes: 2 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ reqwest = { version = "0.11", features = ["json"], optional = true }
serde = "1.0.196"
serde_urlencoded = { version = "0.7.1", optional = true }
sha2 = "0.10.8"
tokio-retry = { version = "0.3", optional = true }
urlencoding = { version = "2.1.3", optional = true }
uuid = { version = "1.7.0", features = ["serde", "v4"] }
wasm-bindgen = { version = "0.2.90", optional = true }
Expand All @@ -32,7 +31,7 @@ tokio = { version = "1.0", features = ["full"] }

[features]
default = ["jwt", "linkedin"]
axum = ["dep:axum", "dep:tokio-retry"]
axum = ["dep:axum"]
jwt = ["dep:hmac", "dep:jwt"]
mongodb = ["dep:mongodb"]
linkedin = ["dep:reqwest", "dep:serde_urlencoded"]
Expand Down
15 changes: 5 additions & 10 deletions auth/src/application/command.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// use super::event_bus::EventBus;
use crate::application::port::*;
use super::port::*;
use crate::domain;
use crate::domain::scalar::*;
use framework::*;
Expand All @@ -17,7 +16,7 @@ where
{
type Error = CommandError;

async fn execute(&self, runtime: &R) -> Result<(), CommandError> {
async fn execute(&self, runtime: &R) -> Result<TransactionId, Self::Error> {
let user_id = EventStore::identify_by_email(runtime, &self.email).await?;

let existing_events = match user_id {
Expand All @@ -31,9 +30,7 @@ where
})?;

EventStore::push(runtime, &new_events).await?;
EventBus::publish(runtime, new_events);

Ok(())
Ok(EventBus::publish(runtime, new_events))
}
}

Expand All @@ -50,7 +47,7 @@ where
{
type Error = CommandError;

async fn execute(&self, runtime: &R) -> Result<(), CommandError> {
async fn execute(&self, runtime: &R) -> Result<TransactionId, Self::Error> {
let user_id = EventStore::identify_by_email(runtime, &self.email).await?;

let existing_events = match user_id {
Expand All @@ -64,9 +61,7 @@ where
})?;

EventStore::push(runtime, &new_events).await?;
EventBus::publish(runtime, new_events);

Ok(())
Ok(EventBus::publish(runtime, new_events))
}
}

Expand Down
49 changes: 29 additions & 20 deletions auth/src/application/listener.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
use self::event::Event;

use super::port::*;
use super::projection::User;
use super::transaction::Transaction;
use crate::domain::scalar::*;
use crate::*;
use crate::domain::Event;
use framework::*;
use futures::stream::StreamExt;
use std::collections::HashMap;

pub struct RecomputeUserProjections(pub Vec<Event>);

#[async_trait]
impl<R> Execute<R> for RecomputeUserProjections
where
R: EventStore + UserRepository,
R: Send + Sync,
{
type Error = UnexpectedError;

async fn execute(&self, runtime: &R) -> Result<(), UnexpectedError> {
impl RecomputeUserProjections {
async fn reflect<R>(&self, runtime: &R) -> Result<Vec<UserId>, UnexpectedError>
where
R: EventStore + TransactionBus + UserRepository,
R: Send + Sync,
{
// Caching projections
let mut users = HashMap::<UserId, User>::new();

// Applying events
for event in self.0.iter() {
let user_id = match event {
domain::Event::Registered { user_id, .. } => user_id.clone(),
domain::Event::LoggedIn { user_id, .. } => user_id.clone(),
Event::Registered { user_id, .. } => user_id.clone(),
Event::LoggedIn { user_id, .. } => user_id.clone(),
_ => continue,
};

Expand All @@ -46,28 +42,41 @@ where
},
};

user.apply(event);
user.apply(&event);
}

// Save projections
for user in users.values() {
UserRepository::save(runtime, user).await?;
}

Ok(())
Ok(users.keys().cloned().collect())
}
}

#[async_trait]
impl<R> Listen<R> for RecomputeUserProjections
where
R: EventBus + EventStore + UserRepository,
R: EventBus + EventStore + TransactionBus + UserRepository,
R: Send + Sync,
{
async fn listen(runtime: &R) {
while let Some(events) = EventBus::subscribe(runtime).next().await {
if let Err(err) = RecomputeUserProjections(events).execute(runtime).await {
error!("Failed to recompute user projections: {}", err);
while let Some((id, events)) = EventBus::subscribe(runtime).next().await {
match RecomputeUserProjections(events).reflect(runtime).await {
Ok(user_ids) => {
for user_id in user_ids {
TransactionBus::publish(
runtime,
Transaction::UserProjected {
id: id.clone(),
user_id,
},
);
}
}
Err(err) => {
error!("Failed to recompute user projections: {}", err);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions auth/src/application/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod listener;
pub mod port;
pub mod projection;
pub mod query;
pub mod transaction;

pub use crate::domain::event;
pub use crate::domain::scalar;
Expand Down
26 changes: 16 additions & 10 deletions auth/src/application/port.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
use std::pin::Pin;

use super::projection::User;
use super::transaction::Transaction;
use crate::domain;
use crate::domain::scalar::*;
use crate::domain::Event;
use framework::*;
use futures::stream::Stream;

pub type EventStream = Pin<Box<dyn Stream<Item = Vec<Event>> + Send>>;

#[async_trait]
#[delegate]
pub trait EventBus {
fn publish(&self, events: Vec<Event>);
fn subscribe(&self) -> EventStream;
fn publish(&self, events: Vec<Event>) -> TransactionId;
fn subscribe(&self) -> EventStream<Event>;
}

#[async_trait]
#[delegate]
pub trait TransactionBus {
fn publish(&self, transaction: Transaction);
fn subscribe(&self) -> TransactionStream<Transaction>;
}

#[async_trait]
#[delegate]
pub trait EventStore {
async fn identify_by_email(&self, email: &Email) -> Result<Option<UserId>, UnexpectedError>;
async fn pull_by_user_id(&self, user_id: &UserId) -> Result<Vec<Event>, UnexpectedError>;
async fn push(&self, events: &[Event]) -> Result<(), UnexpectedError>;
async fn pull_by_user_id(
&self,
user_id: &UserId,
) -> Result<Vec<domain::Event>, UnexpectedError>;
async fn push(&self, events: &[domain::Event]) -> Result<(), UnexpectedError>;
}

#[async_trait]
#[delegate]
pub trait UserRepository {
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, UnexpectedError>;
async fn find_by_user_id(&self, user_id: &UserId) -> Result<Option<User>, UnexpectedError>;
async fn save(&self, projection: &User) -> Result<(), UnexpectedError>;
}
Expand Down
8 changes: 4 additions & 4 deletions auth/src/application/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use super::port::*;
use crate::domain::scalar::*;
use framework::*;

pub struct GetJwtByEmail {
pub email: Email,
pub struct GetJwtByUserId {
pub user_id: UserId,
}

#[async_trait]
impl<R> Fetch<R> for GetJwtByEmail
impl<R> Fetch<R> for GetJwtByUserId
where
R: JwtPort + UserRepository,
R: Send + Sync,
Expand All @@ -16,7 +16,7 @@ where
type Error = GetJwtByEmailError;

async fn fetch(&self, runtime: &R) -> Result<Jwt, GetJwtByEmailError> {
let user = UserRepository::find_by_email(runtime, &self.email)
let user = UserRepository::find_by_user_id(runtime, &self.user_id)
.await?
.ok_or(GetJwtByEmailError::UserNotFound)?;
let jwt = JwtPort::sign(runtime, &user)?;
Expand Down
9 changes: 0 additions & 9 deletions auth/src/application/test_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ impl EventStore for TestRuntime {

#[async_trait]
impl UserRepository for TestRuntime {
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, UnexpectedError> {
Ok(Some(User {
email: email.to_string(),
user_id: UserId::default().to_string(),
}))
}

async fn find_by_user_id(&self, user_id: &UserId) -> Result<Option<User>, UnexpectedError> {
Ok(Some(User {
email: "[email protected]".to_string(),
Expand All @@ -54,5 +47,3 @@ impl JwtPort for TestRuntime {
})
}
}

impl Framework for TestRuntime {}
7 changes: 7 additions & 0 deletions auth/src/application/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::domain::scalar::UserId;
use framework::TransactionId;

#[derive(Clone, PartialEq)]
pub enum Transaction {
UserProjected { id: TransactionId, user_id: UserId },
}
38 changes: 26 additions & 12 deletions auth/src/clients/axum/api/login.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::application::{
command::Login,
port::*,
query::GetJwtByEmail,
scalar::{Email, Password},
use crate::{
application::{
command::Login,
port::*,
query::GetJwtByUserId,
scalar::{Email, Password},
},
transaction::Transaction,
};
use axum::{
extract::State,
Expand All @@ -11,6 +14,7 @@ use axum::{
Json,
};
use framework::*;
use futures::StreamExt;
use serde::Deserialize;
use std::sync::Arc;

Expand All @@ -19,19 +23,29 @@ pub async fn handler<R>(
Json(payload): Json<Payload>,
) -> Result<String, Response>
where
R: Framework,
R: EventBus + EventStore + JwtPort + UserRepository,
R: EventBus + EventStore + JwtPort + UserRepository + TransactionBus,
R: Send + Sync,
{
let command = Login::try_from(payload)?;
let email = command.email.clone();

runtime
.execute(command)
let transaction_id = command
.execute(runtime.as_ref())
.await
.map_err(|err| (StatusCode::UNAUTHORIZED, format!("{}", err)).into_response())?;

let jwt = runtime
.fetch(GetJwtByEmail { email })
let user_id = loop {
if let Some(transaction) = TransactionBus::subscribe(runtime.as_ref()).next().await {
let Transaction::UserProjected { id, user_id } = transaction;
if id == transaction_id {
break user_id;
}
}
};

let query = GetJwtByUserId { user_id };

let jwt = query
.fetch(runtime.as_ref())
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", err)).into_response())?;

Expand Down
Loading

0 comments on commit 8bf3b34

Please sign in to comment.