Skip to content

Commit

Permalink
Add MemoryStore ChangeFeed re #205 (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 21, 2020
1 parent 2d7e2cb commit ea5c42d
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- Add `eqx dump -b`, enabling overriding of Max Events per Batch
- `MemoryStore`: Add `Committed` event to enable simulating Change Feeds in integration tests re [#205](https://github.com/jet/equinox/issues/205) [#221](https://github.com/jet/equinox/pull/221)

### Changed

Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Backend/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Service(log, resolve, ?maxAttempts) =
member __.Favorite(clientId, skus) =
execute clientId (Command.Favorite(DateTimeOffset.Now, skus))

member __.Unfavorite(clientId, skus) =
execute clientId (Command.Unfavorite skus)
member __.Unfavorite(clientId, sku) =
execute clientId (Command.Unfavorite sku)

member __.List clientId : Async<Events.Favorited []> =
read clientId
8 changes: 2 additions & 6 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@

open Equinox
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial = Domain.Cart.Fold.fold, Domain.Cart.Fold.initial
let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot

let createMemoryStore () =
// we want to validate that the JSON UTF8 is working happily
VolatileStore<byte[]>()
let createMemoryStore () = MemoryStore.VolatileStore<byte[]>()
let createServiceMemory log store =
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))

let codec = Domain.Cart.Events.codec

let resolveGesStreamWithRollingSnapshots gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
let resolveGesStreamWithoutCustomAccessStrategy gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt)

Expand Down
11 changes: 5 additions & 6 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
open Equinox
open Equinox.Cosmos.Integration
open Swensen.Unquote
open Xunit

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)

let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial

let createMemoryStore () =
new MemoryStore.VolatileStore<_>()
let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

Expand Down Expand Up @@ -41,7 +39,8 @@ type Tests(testOutputHelper) =

[<AutoData>]
let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async {
let service = let log, store = createLog (), createMemoryStore () in createServiceMemory log store
let log, store = createLog (), createMemoryStore ()
let service = createServiceMemory log store
do! act service args
}

Expand Down Expand Up @@ -74,9 +73,9 @@ type Tests(testOutputHelper) =
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosWithLatestKnownEventSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds
do! act service args
}
}
11 changes: 5 additions & 6 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ open Swensen.Unquote
let fold, initial = Domain.Favorites.Fold.fold, Domain.Favorites.Fold.initial
let snapshot = Domain.Favorites.Fold.isOrigin, Domain.Favorites.Fold.snapshot

let createMemoryStore () =
new MemoryStore.VolatileStore<_>()
let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

Expand Down Expand Up @@ -44,8 +43,8 @@ type Tests(testOutputHelper) =

[<AutoData>]
let ``Can roundtrip in Memory, correctly folding the events`` args = Async.RunSynchronously <| async {
let store = createMemoryStore ()
let service = let log = createLog () in createServiceMemory log store
let log, store = createLog (), createMemoryStore ()
let service = createServiceMemory log store
do! act service args
}

Expand All @@ -66,12 +65,12 @@ type Tests(testOutputHelper) =
let service = createServiceCosmos gateway log
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async {
let log = createLog ()
let! conn = connectToSpecifiedCosmosOrSimulator log
let gateway = createCosmosContext conn defaultBatchSize
let service = createServiceCosmosRollingState gateway log
do! act service args
}
}
22 changes: 19 additions & 3 deletions samples/Tutorial/Counter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type Command =
| Clear of int

(* Decide consumes a command and the current state to decide what events actually happened.
This particular counter allows numbers from 0 to 100.*)
let decide command (State state) =
This particular counter allows numbers from 0 to 100. *)

let decide command (State state) =
match command with
| Increment ->
if state > 100 then [] else [Incremented]
Expand All @@ -73,9 +74,24 @@ type Service(log, resolve, ?maxAttempts) =
execute instanceId (Clear value)

member __.Read instanceId : Async<int> =
read instanceId
let stream = resolve instanceId
stream.Query(fun (State value) -> value)

(* Out of the box, logging is via Serilog (can be wired to anything imaginable).
We wire up logging for demo purposes using MemoryStore.VolatileStore's Committed event
MemoryStore itself, by design, has no intrinsic logging
(other store bindings have rich relevant logging about roundtrips to physical stores etc) *)

open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let logEvents stream (events : FsCodec.ITimelineEvent<_>[]) =
log.Information("Committed to {stream}, events: {@events}", stream, seq { for x in events -> x.EventType })

(* We can integration test using an in-memory store
See other examples such as Cosmos.fsx to see how we integrate with CosmosDB and/or other concrete stores *)

let store = Equinox.MemoryStore.VolatileStore()
let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs)
let codec = FsCodec.Box.Codec.Create()
let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
open Serilog
Expand Down
5 changes: 5 additions & 0 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ let clientAFavoritesStreamId = FsCodec.StreamName.create categoryId "ClientA"

// For test purposes, we use the in-memory store
let store = Equinox.MemoryStore.VolatileStore()
// MemoryStore (as with most Event Stores) provides a way to observe events that have been persisted to a stream
// For demo purposes we emit those to the log (which emits to the console)
let logEvents stream (events : FsCodec.ITimelineEvent<_>[]) =
log.Information("Committed to {stream}, events: {@events}", stream, seq { for x in events -> x.EventType })
let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs)

let codec =
// For this example, we hand-code; normally one uses one of the FsCodec auto codecs, which codegen something similar
Expand Down
10 changes: 8 additions & 2 deletions src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't
/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
type VolatileStore<'Format>() =
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,FsCodec.ITimelineEvent<'Format>[]>()
let committed = Event<_>()

[<CLIEvent>]
member __.Committed : IEvent<FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[]> = committed.Publish

/// Loads state from a given stream
member __.TryLoad streamName = match streams.TryGetValue streamName with false, _ -> None | true, packed -> Some packed
Expand All @@ -35,7 +39,9 @@ type VolatileStore<'Format>() =
match trySyncValue currentValue with
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
| ConcurrentDictionarySyncResult.Written value -> value
try streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times
res
with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict

type Token = { streamVersion: int; streamName: string }
Expand Down Expand Up @@ -67,7 +73,7 @@ type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>,
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
let inline map i (e : FsCodec.IEventData<'Format>) =
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i) (codec.Encode(context,e))) |> Array.ofSeq
let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq
let trySyncValue currentValue =
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
Expand Down
50 changes: 45 additions & 5 deletions tests/Equinox.MemoryStore.Integration/MemoryStoreIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
module Equinox.MemoryStore.Integration.MemoryStoreIntegration

open Swensen.Unquote
open Equinox.MemoryStore
open Swensen.Unquote

let createMemoryStore () =
new VolatileStore<_>()

let createMemoryStore () = VolatileStore<_>()
let createServiceMemory log store =
let resolve (id,opt) = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Cart.Fold.fold, Domain.Cart.Fold.initial).Resolve(id,?option=opt)
Backend.Cart.Service(log, resolve)
Expand All @@ -16,6 +14,10 @@ type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
let createLog () = createLogger testOutput

let (|NonZero|) = function
| None -> Some 1
| Some c -> Some (max 1 c)

[<AutoData>]
let ``Basic tracer bullet, sending a command and verifying the folded result directly and via a reload``
cartId1 cartId2 ((_,skuId,quantity) as args) = Async.RunSynchronously <| async {
Expand Down Expand Up @@ -44,4 +46,42 @@ type Tests(testOutputHelper) =
| x -> x |> failwithf "Expected to find item, got %A"
verifyFoldedStateReflectsCommand expected
verifyFoldedStateReflectsCommand actual
}
}

let createFavoritesServiceMemory log store =
let resolver = Resolver(store, FsCodec.Box.Codec.Create(), Domain.Favorites.Fold.fold, Domain.Favorites.Fold.initial)
Backend.Favorites.Service(log, resolver.Resolve)

type ChangeFeed(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
let createLog () = createLogger testOutput

[<AutoData>]
let ``Commits get reported`` (clientId, sku) = Async.RunSynchronously <| async {
let log, store = createLog (), createMemoryStore ()
let events = ResizeArray()
let takeCaptured () =
let xs = events.ToArray()
events.Clear()
List.ofArray xs
use _ = store.Committed.Subscribe(fun (s, xs) -> events.Add((s, List.ofArray xs)))
let service = createFavoritesServiceMemory log store
let (Domain.Favorites.Events.ForClientId expectedStream) = clientId

do! service.Favorite(clientId, [sku])
let written = takeCaptured ()
test <@ let stream, xs = written |> List.exactlyOne
let env = xs |> List.exactlyOne
stream = expectedStream
&& env.Index = 0L
&& env.EventType = "Favorited"
&& env.Data |> unbox<Domain.Favorites.Events.Favorited> |> fun x -> x.skuId = sku @>
do! service.Unfavorite(clientId, sku)
let written = takeCaptured ()
test <@ let stream, xs = written |> List.exactlyOne
let env = xs |> List.exactlyOne
stream = expectedStream
&& env.Index = 1L
&& env.EventType = "Unfavorited"
&& env.Data |> unbox<Domain.Favorites.Events.Unfavorited> |> fun x -> x.skuId = sku @>
}
4 changes: 2 additions & 2 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ module CosmosStats =
log.Debug("Running query: {sql}", sql)
let res = container.QueryValue<int>(sql, Microsoft.Azure.Documents.Client.FeedOptions(EnableCrossPartitionQuery=true))
log.Information("{stat}: {result:N0}", name, res)})
|> if inParallel then Async.Parallel else Async.Sequential
|> if inParallel then Async.Parallel else Async.ParallelThrottled 1
|> Async.Ignore
|> Async.RunSynchronously
| _ -> failwith "please specify a `cosmos` endpoint" }
Expand Down Expand Up @@ -438,4 +438,4 @@ let main argv =
with e -> log.Debug(e, "Fatal error; exiting"); reraise ()
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
| Storage.MissingArg msg -> eprintfn "%s" msg; 1
| e -> eprintfn "%s" e.Message; 1
| e -> eprintfn "%s" e.Message; 1

0 comments on commit ea5c42d

Please sign in to comment.