Skip to content

Commit

Permalink
Merge pull request #52 from fraktalio/feature/event_sourced_orchestra…
Browse files Browse the repository at this point in the history
…ting_aggregate

`EventSourcedOrchestratingAggregate` added
  • Loading branch information
idugalic authored Jan 18, 2025
2 parents 68af48e + 2e5cfe7 commit 33613d7
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 46 deletions.
159 changes: 145 additions & 14 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ pub trait EventRepository<C, E, Version, Error> {
/// Saves events.
/// Desugared `async fn save(&self, events: &[E], latest_version: &Option<Version>) -> Result<Vec<(E, Version)>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
/// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
fn save(
fn save(&self, events: &[E]) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;

/// Version provider. It is used to provide the version/sequence of the stream to wich this event belongs to. Optimistic locking is useing this version to check if the event is already saved.
/// Desugared `async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error>;` to a normal `fn` that returns `impl Future`, and adds bound `Send`
/// You can freely move between the `async fn` and `-> impl Future` spelling in your traits and impls. This is true even when one form has a Send bound.
fn version_provider(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> impl Future<Output = Result<Vec<(E, Version)>, Error>> + Send;
event: &E,
) -> impl Future<Output = Result<Option<Version>, Error>> + Send;
}

/// Event Sourced Aggregate.
Expand Down Expand Up @@ -82,12 +86,12 @@ where
self.repository.fetch_events(command).await
}
/// Saves events.
async fn save(
&self,
events: &[E],
latest_version: &Option<Version>,
) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events, latest_version).await
async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events).await
}
/// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
self.repository.version_provider(event).await
}
}

Expand All @@ -113,14 +117,12 @@ where
/// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut version: Option<Version> = None;
let mut current_events: Vec<E> = vec![];
for (event, ver) in events {
version = Some(ver);
for (event, _) in events {
current_events.push(event);
}
let new_events = self.compute_new_events(&current_events, command)?;
let saved_events = self.save(&new_events, &version).await?;
let saved_events = self.save(&new_events).await?;
Ok(saved_events)
}
}
Expand Down Expand Up @@ -245,6 +247,135 @@ where
}
}

/// Orchestrating Event Sourced Aggregate.
/// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
/// It is using a [EventRepository] to fetch the current events and to save the new events.
/// Generic parameters:
/// - `C` - Command
/// - `S` - State
/// - `E` - Event
/// - `Repository` - Event repository
/// - `Version` - Version/Offset/Sequence number
/// - `Error` - Error
pub struct EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
{
repository: Repository,
decider: Decider<'a, C, S, E, Error>,
saga: Saga<'a, E, C>,
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Version, Error> EventRepository<C, E, Version, Error>
for EventSourcedOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Version: Sync,
Error: Sync,
{
/// Fetches current events, based on the command.
async fn fetch_events(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
self.repository.fetch_events(command).await
}
/// Saves events.
async fn save(&self, events: &[E]) -> Result<Vec<(E, Version)>, Error> {
self.repository.save(events).await
}
/// Version provider. It is used to provide the version/sequence of the event. Optimistic locking is useing this version to check if the event is already saved.
async fn version_provider(&self, event: &E) -> Result<Option<Version>, Error> {
self.repository.version_provider(event).await
}
}

impl<'a, C, S, E, Repository, Version, Error>
EventSourcedOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
C: Sync,
S: Sync,
E: Sync + Clone,
Version: Sync,
Error: Sync,
{
/// Creates a new instance of [EventSourcedAggregate].
pub fn new(
repository: Repository,
decider: Decider<'a, C, S, E, Error>,
saga: Saga<'a, E, C>,
) -> Self {
EventSourcedOrchestratingAggregate {
repository,
decider,
saga,
_marker: PhantomData,
}
}
/// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut current_events: Vec<E> = vec![];
for (event, _) in events {
current_events.push(event);
}
let new_events = self
.compute_new_events_dynamically(&current_events, command)
.await?;
let saved_events = self.save(&new_events).await?;
Ok(saved_events)
}
/// Computes new events based on the current events and the command.
/// It is using a [Decider] and [Saga] to compute new events based on the current events and the command.
/// If the `decider` is combined out of many deciders via `combine` function, a `saga` could be used to react on new events and send new commands to the `decider` recursively, in single transaction.
/// It is using a [EventRepository] to fetch the current events for the command that is computed by the `saga`.
async fn compute_new_events_dynamically(
&self,
current_events: &[E],
command: &C,
) -> Result<Vec<E>, Error> {
let current_state: S = current_events
.iter()
.fold((self.decider.initial_state)(), |state, event| {
(self.decider.evolve)(&state, event)
});

let initial_events = (self.decider.decide)(command, &current_state)?;

let commands: Vec<C> = initial_events
.iter()
.flat_map(|event: &E| self.saga.compute_new_actions(event))
.collect();

// Collect all events including recursively computed new events.
let mut all_events = initial_events.clone();

for command in commands.iter() {
let previous_events = [
self.repository
.fetch_events(command)
.await?
.iter()
.map(|(e, _)| e.clone())
.collect::<Vec<E>>(),
initial_events.clone(),
]
.concat();

// Recursively compute new events and extend the accumulated events list.
// By wrapping the recursive call in a Box, we ensure that the future type is not self-referential.
let new_events =
Box::pin(self.compute_new_events_dynamically(&previous_events, command)).await?;
all_events.extend(new_events);
}

Ok(all_events)
}
}

/// Orchestrating State Stored Aggregate.
///
/// It is using a [Decider] and [Saga] to compute new state based on the current state and the command.
Expand Down
Loading

0 comments on commit 33613d7

Please sign in to comment.