Skip to content

Commit

Permalink
Merge pull request #51 from fraktalio/feature/decider_with_result
Browse files Browse the repository at this point in the history
Decider: decide fn returns Result
  • Loading branch information
idugalic authored Jan 16, 2025
2 parents 40805da + c7d7aa0 commit 68af48e
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 157 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ construct.
Abstraction and generalization are often used together. Abstracts are generalized through parameterization to provide
more excellent utility.

## `Box<dyn Fn(&C, &S) -> Vec<E>>`
## `Box<dyn Fn(&C, &S) -> Result<Vec<E>, Error>`

`type DecideFunction<'a, C, S, E> = Box<dyn Fn(&C, &S) -> Vec<E> + 'a + Send + Sync>`
`type DecideFunction<'a, C, S, E> = Box<dyn Fn(&C, &S) -> Result<Vec<E>, Error> + 'a + Send + Sync>`

On a higher level of abstraction, any information system is responsible for handling the intent (`Command`) and based on
the current `State`, produce new facts (`Events`):
Expand Down
58 changes: 29 additions & 29 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ pub trait EventRepository<C, E, Version, Error> {
pub struct EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
Decider: EventComputation<C, S, E, Error>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E>
impl<C, S, E, Repository, Decider, Version, Error> EventComputation<C, S, E, Error>
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error>,
Decider: EventComputation<C, S, E>,
Decider: EventComputation<C, S, E, Error>,
{
/// Computes new events based on the current events and the command.
fn compute_new_events(&self, current_events: &[E], command: &C) -> Vec<E> {
fn compute_new_events(&self, current_events: &[E], command: &C) -> Result<Vec<E>, Error> {
self.decider.compute_new_events(current_events, command)
}
}
Expand All @@ -70,7 +70,7 @@ impl<C, S, E, Repository, Decider, Version, Error> EventRepository<C, E, Version
for EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
Decider: EventComputation<C, S, E, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Expand All @@ -95,7 +95,7 @@ impl<C, S, E, Repository, Decider, Version, Error>
EventSourcedAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: EventRepository<C, E, Version, Error> + Sync,
Decider: EventComputation<C, S, E> + Sync,
Decider: EventComputation<C, S, E, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Expand All @@ -119,7 +119,7 @@ where
version = Some(ver);
current_events.push(event);
}
let new_events = self.compute_new_events(&current_events, command);
let new_events = self.compute_new_events(&current_events, command)?;
let saved_events = self.save(&new_events, &version).await?;
Ok(saved_events)
}
Expand Down Expand Up @@ -168,21 +168,21 @@ pub trait StateRepository<C, S, Version, Error> {
pub struct StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
Decider: StateComputation<C, S, E, Error>,
{
repository: Repository,
decider: Decider,
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E>
impl<C, S, E, Repository, Decider, Version, Error> StateComputation<C, S, E, Error>
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
Decider: StateComputation<C, S, E>,
Decider: StateComputation<C, S, E, Error>,
{
/// Computes new state based on the current state and the command.
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
self.decider.compute_new_state(current_state, command)
}
}
Expand All @@ -191,7 +191,7 @@ impl<C, S, E, Repository, Decider, Version, Error> StateRepository<C, S, Version
for StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
Decider: StateComputation<C, S, E, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Expand All @@ -212,7 +212,7 @@ impl<C, S, E, Repository, Decider, Version, Error>
StateStoredAggregate<C, S, E, Repository, Decider, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
Decider: StateComputation<C, S, E> + Sync,
Decider: StateComputation<C, S, E, Error> + Sync,
C: Sync,
S: Sync,
E: Sync,
Expand All @@ -232,12 +232,12 @@ where
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let new_state = self.compute_new_state(None, command)?;
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let new_state = self.compute_new_state(Some(state), command)?;
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
Expand All @@ -264,38 +264,38 @@ where
Repository: StateRepository<C, S, Version, Error>,
{
repository: Repository,
decider: Decider<'a, C, S, E>,
decider: Decider<'a, C, S, E, Error>,
saga: Saga<'a, E, C>,
_marker: PhantomData<(C, S, E, Version, Error)>,
}

impl<'a, C, S, E, Repository, Version, Error> StateComputation<C, S, E>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
impl<C, S, E, Repository, Version, Error> StateComputation<C, S, E, Error>
for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error>,
S: Clone,
{
/// Computes new state based on the current state and the command.
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> S {
fn compute_new_state(&self, current_state: Option<S>, command: &C) -> Result<S, Error> {
let effective_current_state =
current_state.unwrap_or_else(|| (self.decider.initial_state)());
let events = (self.decider.decide)(command, &effective_current_state);
let events = (self.decider.decide)(command, &effective_current_state)?;
let mut new_state = events.iter().fold(effective_current_state, |state, event| {
(self.decider.evolve)(&state, event)
});
let commands = events
.iter()
.flat_map(|event: &E| self.saga.compute_new_actions(event))
.collect::<Vec<C>>();
commands.iter().for_each(|action| {
new_state = self.compute_new_state(Some(new_state.clone()), action);
});
new_state
for action in commands {
new_state = self.compute_new_state(Some(new_state.clone()), &action)?;
}
Ok(new_state)
}
}

impl<'a, C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredOrchestratingAggregate<'a, C, S, E, Repository, Version, Error>
impl<C, S, E, Repository, Version, Error> StateRepository<C, S, Version, Error>
for StateStoredOrchestratingAggregate<'_, C, S, E, Repository, Version, Error>
where
Repository: StateRepository<C, S, Version, Error> + Sync,
C: Sync,
Expand Down Expand Up @@ -327,7 +327,7 @@ where
/// Creates a new instance of [StateStoredAggregate].
pub fn new(
repository: Repository,
decider: Decider<'a, C, S, E>,
decider: Decider<'a, C, S, E, Error>,
saga: Saga<'a, E, C>,
) -> Self {
StateStoredOrchestratingAggregate {
Expand All @@ -342,12 +342,12 @@ where
let state_version = self.fetch_state(command).await?;
match state_version {
None => {
let new_state = self.compute_new_state(None, command);
let new_state = self.compute_new_state(None, command)?;
let saved_state = self.save(&new_state, &None).await?;
Ok(saved_state)
}
Some((state, version)) => {
let new_state = self.compute_new_state(Some(state), command);
let new_state = self.compute_new_state(Some(state), command)?;
let saved_state = self.save(&new_state, &Some(version)).await?;
Ok(saved_state)
}
Expand Down
Loading

0 comments on commit 68af48e

Please sign in to comment.