Skip to content

Commit

Permalink
MemoryStore: Serialize Committed events re #265 (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Dec 3, 2020
1 parent a7d5115 commit 2eb5cdb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Removed
### Fixed

- `MemoryStore`: Serialize `Committed` events to guarantee consumption in event `Index` order re [#265](https://github.com/jet/equinox/issues/265) [#269](https://github.com/jet/equinox/pull/269)
- `Cosmos`: Fix defaulting for `compressUnfolds` in C# [#261](https://github.com/jet/equinox/pull/261)

<a name="2.3.0"></a>
Expand Down
46 changes: 30 additions & 16 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ open Equinox
open Equinox.Core
open System.Runtime.InteropServices

/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
exception private WrongVersionException of streamName: string * expected: int * value: obj

/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
Expand All @@ -20,10 +20,21 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't

/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,FsCodec.ITimelineEvent<'Format>[]>()

let streams = System.Collections.Concurrent.ConcurrentDictionary<string, FsCodec.ITimelineEvent<'Format>[]>()

// Where TrySync attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' resulting from a successful Sync
// If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order
let committed = Event<_>()
// Here we neuter that effect - the BatchingGate can end up with commits submitted out of order, but we serialize the raising of the events per stream
let publishBatches (commits : (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])[]) = async {
for streamName, events in commits |> Seq.groupBy fst do
committed.Trigger(streamName, events |> Seq.collect snd |> Seq.sortBy (fun x -> x.Index) |> Seq.toArray) }
let publishCommit = AsyncBatchingGate(publishBatches, System.TimeSpan.FromMilliseconds 2.)

[<CLIEvent>]
/// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event<br/>
/// NOTE in some cases, two or more overlapping commits can be coalesced into a single <c>Committed</c> event
member __.Committed : IEvent<FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[]> = committed.Publish

/// Loads state from a given stream
Expand All @@ -33,20 +44,22 @@ type VolatileStore<'Format>() =
member __.TrySync
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult<FsCodec.ITimelineEvent<'Format>[]>,
events: FsCodec.ITimelineEvent<'Format>[])
: ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]> =
: Async<ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]>> = async {
let seedStream _streamName = events
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times
res
with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
// we publish the event here, once, as `updateValue` can be invoked multiple times
do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
return Written res
with WrongVersionException(_, _, conflictingValue) ->
return Conflict (unbox conflictingValue) }

type Token = { streamVersion: int; streamName: string }

/// Internal implementation detail of MemoryStreamStore
/// Internal implementation detail of MemoryStore
module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
Expand All @@ -64,36 +77,37 @@ module private Token =

/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
let (|Decode|) = Array.choose codec.TryDecode
interface ICategory<'event, 'state, string, 'context> with
member __.Load(_log, streamName, _opt) = async {
match store.TryLoad streamName with
| None -> return Token.ofEmpty streamName initial
| Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
| Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) }
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
match store.TrySync(token.streamName, trySyncValue, encoded) with
match! store.TrySync(token.streamName, trySyncValue, encoded) with
| ConcurrentArraySyncResult.Written _ ->
return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
let resync = async {
let version = Token.tokenOfArray token.streamName conflictingEvents
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
return SyncResult.Conflict resync
| ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }
return SyncResult.Conflict resync }

type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
let resolveStream streamName context = Stream.create category streamName None context

member __.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
match FsCodec.StreamName.toString streamName, option with
| sn, (None|Some AllowStale) -> resolveStream sn context
| sn, (None | Some AllowStale) -> resolveStream sn context
| sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) =
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context)

0 comments on commit 2eb5cdb

Please sign in to comment.