From 2eb5cdb64cf99af88cb17e4a3a63c87124ab61f1 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 3 Dec 2020 10:19:53 +0000 Subject: [PATCH] MemoryStore: Serialize Committed events re #265 (#268) --- CHANGELOG.md | 1 + src/Equinox.MemoryStore/MemoryStore.fs | 46 +++++++++++++++++--------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5202c9bfb..72c0c00fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed ### Fixed +- `MemoryStore`: Serialize `Committed` events to guarantee consumption in event `Index` order re [#265](https://github.com/jet/equinox/issues/265) [#269](https://github.com/jet/equinox/pull/269) - `Cosmos`: Fix defaulting for `compressUnfolds` in C# [#261](https://github.com/jet/equinox/pull/261) diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 56cb5d10a..76d32a6b7 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -7,7 +7,7 @@ open Equinox open Equinox.Core open System.Runtime.InteropServices -/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary +/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary exception private WrongVersionException of streamName: string * expected: int * value: obj /// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary @@ -20,10 +20,21 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't /// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance type VolatileStore<'Format>() = - let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() + + let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() + + // Where TrySync attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' resulting from a successful Sync + // If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order let committed = Event<_>() + // Here we neuter that effect - the BatchingGate can end up with commits submitted out of order, but we serialize the raising of the events per stream + let publishBatches (commits : (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])[]) = async { + for streamName, events in commits |> Seq.groupBy fst do + committed.Trigger(streamName, events |> Seq.collect snd |> Seq.sortBy (fun x -> x.Index) |> Seq.toArray) } + let publishCommit = AsyncBatchingGate(publishBatches, System.TimeSpan.FromMilliseconds 2.) [] + /// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event
+ /// NOTE in some cases, two or more overlapping commits can be coalesced into a single Committed event member __.Committed : IEvent[]> = committed.Publish /// Loads state from a given stream @@ -33,20 +44,22 @@ type VolatileStore<'Format>() = member __.TrySync ( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult[]>, events: FsCodec.ITimelineEvent<'Format>[]) - : ConcurrentArraySyncResult[]> = + : Async[]>> = async { let seedStream _streamName = events let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) = match trySyncValue currentValue with | ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue) | ConcurrentDictionarySyncResult.Written value -> value - try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written - committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times - res - with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict + try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) + // we publish the event here, once, as `updateValue` can be invoked multiple times + do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events)) + return Written res + with WrongVersionException(_, _, conflictingValue) -> + return Conflict (unbox conflictingValue) } type Token = { streamVersion: int; streamName: string } -/// Internal implementation detail of MemoryStreamStore +/// Internal implementation detail of MemoryStore module private Token = let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken = @@ -64,36 +77,37 @@ module private Token = /// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!). type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) = - let (|Decode|) = Array.choose codec.TryDecode interface ICategory<'event, 'state, string, 'context> with member __.Load(_log, streamName, _opt) = async { match store.TryLoad streamName with | None -> return Token.ofEmpty streamName initial - | Some (Decode events) -> return Token.ofEventArray streamName fold initial events } + | Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) } member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async { let inline map i (e : FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp) - let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq + let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq let trySyncValue currentValue = if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion) else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq) - match store.TrySync(token.streamName, trySyncValue, encoded) with + match! store.TrySync(token.streamName, trySyncValue, encoded) with + | ConcurrentArraySyncResult.Written _ -> + return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events | ConcurrentArraySyncResult.Conflict conflictingEvents -> let resync = async { let version = Token.tokenOfArray token.streamName conflictingEvents let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq return version, fold state (successorEvents |> Seq.choose codec.TryDecode) } - return SyncResult.Conflict resync - | ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events } + return SyncResult.Conflict resync } type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) = let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial) let resolveStream streamName context = Stream.create category streamName None context + member __.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context : 'context) = match FsCodec.StreamName.toString streamName, option with - | sn, (None|Some AllowStale) -> resolveStream sn context + | sn, (None | Some AllowStale) -> resolveStream sn context | sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context) /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) = - Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context) + Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context)