diff --git a/Sharpino.Lib/CommandHandler.fs b/Sharpino.Lib/CommandHandler.fs index 180aa88..ac01318 100644 --- a/Sharpino.Lib/CommandHandler.fs +++ b/Sharpino.Lib/CommandHandler.fs @@ -439,7 +439,151 @@ module CommandHandler = | _ -> log.Warn "locktype is pessimistic, but we are not using it in runNAggregateCommands" delayedCommand() + + let inline runTwoNAggregateCommands<'A1, 'E1, 'A2, 'E2 + when 'A1 :> Aggregate + and 'A1 :> Entity + and 'E1 :> Event<'A1> + and 'E1 : (member Serialize: ISerializer -> string) + and 'E1 : (static member Deserialize: ISerializer -> Json -> Result<'E1, string>) + and 'A1 : (static member Deserialize: ISerializer -> Json -> Result<'A1, string>) + and 'A1 : (static member SnapshotsInterval: int) + and 'A1 : (static member StorageName: string) + and 'A1 : (static member Version: string) + and 'A2 :> Aggregate + and 'A2 :> Entity + and 'E2 :> Event<'A2> + and 'E2 : (member Serialize: ISerializer -> string) + and 'E2 : (static member Deserialize: ISerializer -> Json -> Result<'E2, string>) + and 'A2 : (static member Deserialize: ISerializer -> Json -> Result<'A2, string>) + and 'A2 : (static member SnapshotsInterval: int) + and 'A2 : (static member StorageName: string) + and 'A2 : (static member Version: string) + > + (aggregateIds1: List) + (aggregateIds2: List) + (storage: IEventStore) + (eventBroker: IEventBroker) + (stateViewer1: AggregateViewer<'A1>) + (stateViewer2: AggregateViewer<'A2>) + (command1: List>) + (command2: List>) + = + log.Debug "runTwoNAggregateCommands" + let delayedCommand = fun () -> + async { + return + result { + let! states1 = + aggregateIds1 + |> List.traverseResultM stateViewer1 + + let! states2 = + aggregateIds2 + |> List.traverseResultM stateViewer2 + + let states1' = + states1 + |>> fun (_, state, _, _) -> state + + let states2' = + states2 + |>> fun (_, state, _, _) -> state + + let statesAndCommands1 = + List.zip states1' command1 + + let statesAndCommands2 = + List.zip states2' command2 + + let! events1 = + statesAndCommands1 + |>> fun (state, command) -> command.Execute state + |> List.traverseResultM id + + let! events2 = + statesAndCommands2 + |>> fun (state, command) -> command.Execute state + |> List.traverseResultM id + + let serializedEvents1 = + events1 + |>> fun x -> x |>> fun (z: 'E1) -> z.Serialize serializer + + let serializedEvents2 = + events2 + |>> fun x -> x |>> fun (z: 'E2) -> z.Serialize serializer + + let aggregateIdsWithStateIds1 = + List.zip aggregateIds1 states1' + |>> fun (id, state ) -> (id, state.StateId) + let aggregateIdsWithStateIds2 = + List.zip aggregateIds2 states2' + |>> fun (id, state ) -> (id, state.StateId) + + let packParametersForDb1 = + List.zip serializedEvents1 aggregateIdsWithStateIds1 + |>> fun (events, (id, stateId)) -> (events, 'A1.Version, 'A1.StorageName, id, stateId) + + let packParametersForDb2 = + List.zip serializedEvents2 aggregateIdsWithStateIds2 + |>> fun (events, (id, stateId)) -> (events, 'A2.Version, 'A2.StorageName, id, stateId) + + let allPacked = packParametersForDb1 @ packParametersForDb2 + + let! eventIds = + allPacked + |> storage.MultiAddAggregateEvents + + let eventIds1 = eventIds |> List.take aggregateIds1.Length + let eventIds2 = eventIds |> List.skip aggregateIds1.Length + + let aggregateIdsWithEventIds1 = + List.zip aggregateIds1 eventIds1 + + let aggregateIdsWithEventIds2 = + List.zip aggregateIds2 eventIds2 + + + let kafkaParmeters1 = + List.map2 (fun idList serializedEvents -> (idList, serializedEvents)) aggregateIdsWithEventIds1 serializedEvents1 + |>> fun (((aggId: Guid), idList), serializedEvents) -> (aggId, List.zip idList serializedEvents) + + let kafkaParameters2 = + (List.map2 (fun idList serializedEvents -> (idList, serializedEvents)) aggregateIdsWithEventIds2 serializedEvents2 + |>> fun (((aggId: Guid), idList), serializedEvents) -> (aggId, List.zip idList serializedEvents)) + + if (eventBroker.notifyAggregate.IsSome) then + kafkaParmeters1 + |>> fun (id, x) -> postToProcessor (fun () -> tryPublishAggregateEvent eventBroker id 'A1.Version 'A1.StorageName x |> ignore) + |> ignore + + if (eventBroker.notifyAggregate.IsSome) then + kafkaParameters2 + |>> fun (id, x) -> postToProcessor (fun () -> tryPublishAggregateEvent eventBroker id 'A2.Version 'A2.StorageName x |> ignore) + |> ignore + + let _ = + aggregateIds1 + |>> mkAggregateSnapshotIfIntervalPassed<'A1, 'E1> storage + let _ = + aggregateIds2 + |>> mkAggregateSnapshotIfIntervalPassed<'A2, 'E2> storage + + return eventIds + } + } + |> Async.RunSynchronously + + match config.LockType with + | Optimistic -> + delayedCommand() + | _ -> + log.Warn "locktype is pessimistic, but we are not using it in runTwoNAggregateCommands" + delayedCommand() + + let inline runTwoCommands<'A1, 'A2, 'E1, 'E2 when 'A1: (static member Zero: 'A1) and 'A1: (static member StorageName: string) diff --git a/Sharpino.Lib/Sharpino.Lib.fsproj b/Sharpino.Lib/Sharpino.Lib.fsproj index 50c3e16..0ccf86f 100644 --- a/Sharpino.Lib/Sharpino.Lib.fsproj +++ b/Sharpino.Lib/Sharpino.Lib.fsproj @@ -6,7 +6,7 @@ true true Sharpino - 1.6.4 + 1.6.5 Antonio Lucca MIT LICENSE diff --git a/Sharpino.Lib/Sharpino.nuspec b/Sharpino.Lib/Sharpino.nuspec index 3548366..5f11203 100644 --- a/Sharpino.Lib/Sharpino.nuspec +++ b/Sharpino.Lib/Sharpino.nuspec @@ -2,7 +2,7 @@ Sharpino - 1.6.4 + 1.6.5 Sharpino Antonio Lucca Simple F# Event-Sourcing Library