diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a1f1919..29e266d97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - Add `eqx dump -b`, enabling overriding of Max Events per Batch +- `MemoryStore`: Add `Committed` event to enable simulating Change Feeds in integration tests re [#205](https://github.com/jet/equinox/issues/205) [#221](https://github.com/jet/equinox/pull/221) ### Changed diff --git a/samples/Store/Backend/Favorites.fs b/samples/Store/Backend/Favorites.fs index 209bc0f61..e20c1e6ee 100644 --- a/samples/Store/Backend/Favorites.fs +++ b/samples/Store/Backend/Favorites.fs @@ -20,8 +20,8 @@ type Service(log, resolve, ?maxAttempts) = member __.Favorite(clientId, skus) = execute clientId (Command.Favorite(DateTimeOffset.Now, skus)) - member __.Unfavorite(clientId, skus) = - execute clientId (Command.Unfavorite skus) + member __.Unfavorite(clientId, sku) = + execute clientId (Command.Unfavorite sku) member __.List clientId : Async = read clientId \ No newline at end of file diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index cbd377a58..d82276ab5 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -2,8 +2,6 @@ open Equinox open Equinox.Cosmos.Integration -open Equinox.EventStore -open Equinox.MemoryStore open Swensen.Unquote #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -11,16 +9,14 @@ open Swensen.Unquote let fold, initial = Domain.Cart.Fold.fold, Domain.Cart.Fold.initial let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot -let createMemoryStore () = - // we want to validate that the JSON UTF8 is working happily - VolatileStore() +let createMemoryStore () = MemoryStore.VolatileStore() let createServiceMemory log store = Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) let codec = Domain.Cart.Events.codec let resolveGesStreamWithRollingSnapshots gateway = - fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) + fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) let resolveGesStreamWithoutCustomAccessStrategy gateway = fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt) diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index b592a9d90..69e6f1eb1 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -3,14 +3,12 @@ open Equinox open Equinox.Cosmos.Integration open Swensen.Unquote -open Xunit #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial -let createMemoryStore () = - new MemoryStore.VolatileStore<_>() +let createMemoryStore () = MemoryStore.VolatileStore<_>() let createServiceMemory log store = Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) @@ -41,7 +39,8 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async { - let service = let log, store = createLog (), createMemoryStore () in createServiceMemory log store + let log, store = createLog (), createMemoryStore () + let service = createServiceMemory log store do! act service args } @@ -74,9 +73,9 @@ type Tests(testOutputHelper) = let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosWithLatestKnownEventSemantics do! act service args } - + [] let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async { let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 424e4c42d..cddb596dd 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -9,8 +9,7 @@ open Swensen.Unquote let fold, initial = Domain.Favorites.Fold.fold, Domain.Favorites.Fold.initial let snapshot = Domain.Favorites.Fold.isOrigin, Domain.Favorites.Fold.snapshot -let createMemoryStore () = - new MemoryStore.VolatileStore<_>() +let createMemoryStore () = MemoryStore.VolatileStore<_>() let createServiceMemory log store = Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) @@ -44,8 +43,8 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async { - let store = createMemoryStore () - let service = let log = createLog () in createServiceMemory log store + let log, store = createLog (), createMemoryStore () + let service = createServiceMemory log store do! act service args } @@ -66,7 +65,7 @@ type Tests(testOutputHelper) = let service = createServiceCosmos gateway log do! act service args } - + [] let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async { let log = createLog () @@ -74,4 +73,4 @@ type Tests(testOutputHelper) = let gateway = createCosmosContext conn defaultBatchSize let service = createServiceCosmosRollingState gateway log do! act service args - } \ No newline at end of file + } diff --git a/samples/Tutorial/Counter.fsx b/samples/Tutorial/Counter.fsx index 6490871b0..9cd5128cf 100644 --- a/samples/Tutorial/Counter.fsx +++ b/samples/Tutorial/Counter.fsx @@ -46,8 +46,9 @@ type Command = | Clear of int (* Decide consumes a command and the current state to decide what events actually happened. - This particular counter allows numbers from 0 to 100.*) -let decide command (State state) = + This particular counter allows numbers from 0 to 100. *) + +let decide command (State state) = match command with | Increment -> if state > 100 then [] else [Incremented] @@ -73,9 +74,24 @@ type Service(log, resolve, ?maxAttempts) = execute instanceId (Clear value) member __.Read instanceId : Async = - read instanceId + let stream = resolve instanceId + stream.Query(fun (State value) -> value) + +(* Out of the box, logging is via Serilog (can be wired to anything imaginable). + We wire up logging for demo purposes using MemoryStore.VolatileStore's Committed event + MemoryStore itself, by design, has no intrinsic logging + (other store bindings have rich relevant logging about roundtrips to physical stores etc) *) + +open Serilog +let log = LoggerConfiguration().WriteTo.Console().CreateLogger() +let logEvents stream (events : FsCodec.ITimelineEvent<_>[]) = + log.Information("Committed to {stream}, events: {@events}", stream, seq { for x in events -> x.EventType }) + +(* We can integration test using an in-memory store + See other examples such as Cosmos.fsx to see how we integrate with CosmosDB and/or other concrete stores *) let store = Equinox.MemoryStore.VolatileStore() +let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs) let codec = FsCodec.Box.Codec.Create() let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve open Serilog diff --git a/samples/Tutorial/Favorites.fsx b/samples/Tutorial/Favorites.fsx index 790024b31..9d1bfce03 100644 --- a/samples/Tutorial/Favorites.fsx +++ b/samples/Tutorial/Favorites.fsx @@ -92,6 +92,11 @@ let clientAFavoritesStreamId = FsCodec.StreamName.create categoryId "ClientA" // For test purposes, we use the in-memory store let store = Equinox.MemoryStore.VolatileStore() +// MemoryStore (as with most Event Stores) provides a way to observe events that have been persisted to a stream +// For demo purposes we emit those to the log (which emits to the console) +let logEvents stream (events : FsCodec.ITimelineEvent<_>[]) = + log.Information("Committed to {stream}, events: {@events}", stream, seq { for x in events -> x.EventType }) +let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs) let codec = // For this example, we hand-code; normally one uses one of the FsCodec auto codecs, which codegen something similar diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 74e6fc994..56cb5d10a 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -21,6 +21,10 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't /// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance type VolatileStore<'Format>() = let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() + let committed = Event<_>() + + [] + member __.Committed : IEvent[]> = committed.Publish /// Loads state from a given stream member __.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed @@ -35,7 +39,9 @@ type VolatileStore<'Format>() = match trySyncValue currentValue with | ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue) | ConcurrentDictionarySyncResult.Written value -> value - try streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written + try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written + committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times + res with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict type Token = { streamVersion: int; streamName: string } @@ -67,7 +73,7 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async { let inline map i (e : FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp) - let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i) (codec.Encode(context,e))) |> Array.ofSeq + let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq let trySyncValue currentValue = if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion) else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq) diff --git a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs index 6cc73bfc1..d5976fec6 100644 --- a/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs +++ b/tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs @@ -1,11 +1,9 @@ module Equinox.MemoryStore.Integration.MemoryStoreIntegration -open Swensen.Unquote open Equinox.MemoryStore +open Swensen.Unquote -let createMemoryStore () = - new VolatileStore<_>() - +let createMemoryStore () = VolatileStore<_>() let createServiceMemory log store = let resolve (id,opt) = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Fold.fold, Domain.Cart.Fold.initial).Resolve(id,?option=opt) Backend.Cart.Service(log, resolve) @@ -16,6 +14,10 @@ type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper let createLog () = createLogger testOutput + let (|NonZero|) = function + | None -> Some 1 + | Some c -> Some (max 1 c) + [] let ``Basic tracer bullet, sending a command and verifying the folded result directly and via a reload`` cartId1 cartId2 ((_,skuId,quantity) as args) = Async.RunSynchronously <| async { @@ -44,4 +46,42 @@ type Tests(testOutputHelper) = | x -> x |> failwithf "Expected to find item, got %A" verifyFoldedStateReflectsCommand expected verifyFoldedStateReflectsCommand actual - } \ No newline at end of file + } + +let createFavoritesServiceMemory log store = + let resolver = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Favorites.Fold.fold, Domain.Favorites.Fold.initial) + Backend.Favorites.Service(log, resolver.Resolve) + +type ChangeFeed(testOutputHelper) = + let testOutput = TestOutputAdapter testOutputHelper + let createLog () = createLogger testOutput + + [] + let ``Commits get reported`` (clientId, sku) = Async.RunSynchronously <| async { + let log, store = createLog (), createMemoryStore () + let events = ResizeArray() + let takeCaptured () = + let xs = events.ToArray() + events.Clear() + List.ofArray xs + use _ = store.Committed.Subscribe(fun (s, xs) -> events.Add((s, List.ofArray xs))) + let service = createFavoritesServiceMemory log store + let (Domain.Favorites.Events.ForClientId expectedStream) = clientId + + do! service.Favorite(clientId, [sku]) + let written = takeCaptured () + test <@ let stream, xs = written |> List.exactlyOne + let env = xs |> List.exactlyOne + stream = expectedStream + && env.Index = 0L + && env.EventType = "Favorited" + && env.Data |> unbox |> fun x -> x.skuId = sku @> + do! service.Unfavorite(clientId, sku) + let written = takeCaptured () + test <@ let stream, xs = written |> List.exactlyOne + let env = xs |> List.exactlyOne + stream = expectedStream + && env.Index = 1L + && env.EventType = "Unfavorited" + && env.Data |> unbox |> fun x -> x.skuId = sku @> +} diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 018fe4a96..9c50dcbc4 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -359,7 +359,7 @@ module CosmosStats = log.Debug("Running query: {sql}", sql) let res = container.QueryValue(sql, Microsoft.Azure.Documents.Client.FeedOptions(EnableCrossPartitionQuery=true)) log.Information("{stat}: {result:N0}", name, res)}) - |> if inParallel then Async.Parallel else Async.Sequential + |> if inParallel then Async.Parallel else Async.ParallelThrottled 1 |> Async.Ignore |> Async.RunSynchronously | _ -> failwith "please specify a `cosmos` endpoint" } @@ -438,4 +438,4 @@ let main argv = with e -> log.Debug(e, "Fatal error; exiting"); reraise () with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | Storage.MissingArg msg -> eprintfn "%s" msg; 1 - | e -> eprintfn "%s" e.Message; 1 \ No newline at end of file + | e -> eprintfn "%s" e.Message; 1