Skip to content

Commit

Permalink
twomultiaggregate command
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyx committed Apr 28, 2024
1 parent 14dbabd commit 4a6aa8b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 2 deletions.
144 changes: 144 additions & 0 deletions Sharpino.Lib/CommandHandler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,151 @@ module CommandHandler =
| _ ->
log.Warn "locktype is pessimistic, but we are not using it in runNAggregateCommands"
delayedCommand()

let inline runTwoNAggregateCommands<'A1, 'E1, 'A2, 'E2
when 'A1 :> Aggregate
and 'A1 :> Entity
and 'E1 :> Event<'A1>
and 'E1 : (member Serialize: ISerializer -> string)
and 'E1 : (static member Deserialize: ISerializer -> Json -> Result<'E1, string>)
and 'A1 : (static member Deserialize: ISerializer -> Json -> Result<'A1, string>)
and 'A1 : (static member SnapshotsInterval: int)
and 'A1 : (static member StorageName: string)
and 'A1 : (static member Version: string)
and 'A2 :> Aggregate
and 'A2 :> Entity
and 'E2 :> Event<'A2>
and 'E2 : (member Serialize: ISerializer -> string)
and 'E2 : (static member Deserialize: ISerializer -> Json -> Result<'E2, string>)
and 'A2 : (static member Deserialize: ISerializer -> Json -> Result<'A2, string>)
and 'A2 : (static member SnapshotsInterval: int)
and 'A2 : (static member StorageName: string)
and 'A2 : (static member Version: string)
>
(aggregateIds1: List<Guid>)
(aggregateIds2: List<Guid>)
(storage: IEventStore)
(eventBroker: IEventBroker)
(stateViewer1: AggregateViewer<'A1>)
(stateViewer2: AggregateViewer<'A2>)
(command1: List<Command<'A1, 'E1>>)
(command2: List<Command<'A2, 'E2>>)
=
log.Debug "runTwoNAggregateCommands"
let delayedCommand = fun () ->
async {
return
result {
let! states1 =
aggregateIds1
|> List.traverseResultM stateViewer1

let! states2 =
aggregateIds2
|> List.traverseResultM stateViewer2

let states1' =
states1
|>> fun (_, state, _, _) -> state

let states2' =
states2
|>> fun (_, state, _, _) -> state

let statesAndCommands1 =
List.zip states1' command1

let statesAndCommands2 =
List.zip states2' command2

let! events1 =
statesAndCommands1
|>> fun (state, command) -> command.Execute state
|> List.traverseResultM id

let! events2 =
statesAndCommands2
|>> fun (state, command) -> command.Execute state
|> List.traverseResultM id

let serializedEvents1 =
events1
|>> fun x -> x |>> fun (z: 'E1) -> z.Serialize serializer

let serializedEvents2 =
events2
|>> fun x -> x |>> fun (z: 'E2) -> z.Serialize serializer

let aggregateIdsWithStateIds1 =
List.zip aggregateIds1 states1'
|>> fun (id, state ) -> (id, state.StateId)

let aggregateIdsWithStateIds2 =
List.zip aggregateIds2 states2'
|>> fun (id, state ) -> (id, state.StateId)

let packParametersForDb1 =
List.zip serializedEvents1 aggregateIdsWithStateIds1
|>> fun (events, (id, stateId)) -> (events, 'A1.Version, 'A1.StorageName, id, stateId)

let packParametersForDb2 =
List.zip serializedEvents2 aggregateIdsWithStateIds2
|>> fun (events, (id, stateId)) -> (events, 'A2.Version, 'A2.StorageName, id, stateId)

let allPacked = packParametersForDb1 @ packParametersForDb2

let! eventIds =
allPacked
|> storage.MultiAddAggregateEvents

let eventIds1 = eventIds |> List.take aggregateIds1.Length
let eventIds2 = eventIds |> List.skip aggregateIds1.Length

let aggregateIdsWithEventIds1 =
List.zip aggregateIds1 eventIds1

let aggregateIdsWithEventIds2 =
List.zip aggregateIds2 eventIds2


let kafkaParmeters1 =
List.map2 (fun idList serializedEvents -> (idList, serializedEvents)) aggregateIdsWithEventIds1 serializedEvents1
|>> fun (((aggId: Guid), idList), serializedEvents) -> (aggId, List.zip idList serializedEvents)

let kafkaParameters2 =
(List.map2 (fun idList serializedEvents -> (idList, serializedEvents)) aggregateIdsWithEventIds2 serializedEvents2
|>> fun (((aggId: Guid), idList), serializedEvents) -> (aggId, List.zip idList serializedEvents))

if (eventBroker.notifyAggregate.IsSome) then
kafkaParmeters1
|>> fun (id, x) -> postToProcessor (fun () -> tryPublishAggregateEvent eventBroker id 'A1.Version 'A1.StorageName x |> ignore)
|> ignore

if (eventBroker.notifyAggregate.IsSome) then
kafkaParameters2
|>> fun (id, x) -> postToProcessor (fun () -> tryPublishAggregateEvent eventBroker id 'A2.Version 'A2.StorageName x |> ignore)
|> ignore

let _ =
aggregateIds1
|>> mkAggregateSnapshotIfIntervalPassed<'A1, 'E1> storage
let _ =
aggregateIds2
|>> mkAggregateSnapshotIfIntervalPassed<'A2, 'E2> storage

return eventIds
}
}
|> Async.RunSynchronously

match config.LockType with
| Optimistic ->
delayedCommand()
| _ ->
log.Warn "locktype is pessimistic, but we are not using it in runTwoNAggregateCommands"
delayedCommand()


let inline runTwoCommands<'A1, 'A2, 'E1, 'E2
when 'A1: (static member Zero: 'A1)
and 'A1: (static member StorageName: string)
Expand Down
2 changes: 1 addition & 1 deletion Sharpino.Lib/Sharpino.Lib.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageId>Sharpino</PackageId>
<Version>1.6.4</Version>
<Version>1.6.5</Version>
<Authors>Antonio Lucca</Authors>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
Expand Down
2 changes: 1 addition & 1 deletion Sharpino.Lib/Sharpino.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2012/06/nuspec.xsd">
<metadata>
<id>Sharpino</id>
<version>1.6.4</version>
<version>1.6.5</version>
<title>Sharpino</title>
<authors>Antonio Lucca</authors>
<description>Simple F# Event-Sourcing Library</description>
Expand Down

0 comments on commit 4a6aa8b

Please sign in to comment.