From 278a7b07707ed0fddabd24bb9c7b8ae4d4ecccd4 Mon Sep 17 00:00:00 2001 From: David Cumps Date: Mon, 3 Feb 2020 22:05:23 +0100 Subject: [PATCH] Style/consistency changes; add .editorconfig (#191) --- .editorconfig | 9 ++ DOCUMENTATION.md | 2 +- src/Equinox.Core/AsyncCacheCell.fs | 4 +- src/Equinox.Core/Cache.fs | 11 +- src/Equinox.Core/Infrastructure.fs | 20 +-- src/Equinox.Core/Retry.fs | 4 +- src/Equinox.Core/Stream.fs | 4 +- src/Equinox.Core/Types.fs | 13 +- src/Equinox.EventStore/EventStore.fs | 224 +++++++++++++++++---------- src/Equinox/Equinox.fs | 35 +++-- src/Equinox/Flow.fs | 23 +-- 11 files changed, 221 insertions(+), 128 deletions(-) create mode 100644 .editorconfig mode change 100644 => 100755 DOCUMENTATION.md mode change 100644 => 100755 src/Equinox.Core/AsyncCacheCell.fs mode change 100644 => 100755 src/Equinox.Core/Cache.fs mode change 100644 => 100755 src/Equinox.Core/Infrastructure.fs mode change 100644 => 100755 src/Equinox.Core/Retry.fs mode change 100644 => 100755 src/Equinox.Core/Stream.fs mode change 100644 => 100755 src/Equinox.Core/Types.fs mode change 100644 => 100755 src/Equinox.EventStore/EventStore.fs mode change 100644 => 100755 src/Equinox/Equinox.fs mode change 100644 => 100755 src/Equinox/Flow.fs diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..eaac9500c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +root = true + + +[*.fs] +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true +insert_final_newline = true +end_of_line = lf diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md old mode 100644 new mode 100755 index 7fcc363a2..bff57604a --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -884,7 +884,7 @@ This covers what the most complete possible implementation of the JS Stored Proc The `sync` stored procedure takes as input, a document that is almost identical to the format of the _`Tip`_ batch (in fact, if the stream is found to be empty, it is pretty much the template for the first document created in the stream). The request includes the following elements: - `expectedVersion`: the position the requester has based their [proposed] events on (no, [providing an `etag` to save on Request Charges is not possible in the Stored Proc](https://stackoverflow.com/questions/53355886/azure-cosmosdb-stored-procedure-ifmatch-predicate)) -- `e`: array of Events (see Event, above) to append iff the expectedVersion check is fulfilled +- `e`: array of Events (see Event, above) to append if, and only if, the expectedVersion check is fulfilled - `u`: array of `unfold`ed events (aka snapshots) that supersede items with equivalent `c`ase values - `maxEvents`: the maximum number of events in an individual batch prior to starting a new one. For example: diff --git a/src/Equinox.Core/AsyncCacheCell.fs b/src/Equinox.Core/AsyncCacheCell.fs old mode 100644 new mode 100755 index d0004a9c9..4333ab9f8 --- a/src/Equinox.Core/AsyncCacheCell.fs +++ b/src/Equinox.Core/AsyncCacheCell.fs @@ -11,7 +11,7 @@ type AsyncLazy<'T>(workflow : Async<'T>) = type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) = let mutable currentCell = AsyncLazy workflow - let initializationFailed (value: System.Threading.Tasks.Task<_>) = + let initializationFailed (value : System.Threading.Tasks.Task<_>) = // for TMI on this, see https://stackoverflow.com/a/33946166/11635 value.IsCompleted && value.Status <> System.Threading.Tasks.TaskStatus.RanToCompletion @@ -44,4 +44,4 @@ type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) = match isExpired with | Some f when f current -> return! update cell | _ -> return current - } \ No newline at end of file + } diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs old mode 100644 new mode 100755 index 5b4309dc3..ca0595124 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -9,19 +9,20 @@ type CacheItemOptions = [] type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, supersedes: StreamToken -> StreamToken -> bool) = let mutable currentToken, currentState = initialToken, initialState - member __.UpdateIfNewer(other: CacheEntry<'state>) = + member __.UpdateIfNewer(other : CacheEntry<'state>) = lock __ <| fun () -> let otherToken, otherState = other.Value if otherToken |> supersedes currentToken then currentToken <- otherToken currentState <- otherState + member __.Value : StreamToken * 'state = lock __ <| fun () -> currentToken, currentState type ICache = abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> Async - abstract member TryGet: key: string -> Async<(StreamToken * 'state) option> + abstract member TryGet : key: string -> Async<(StreamToken * 'state) option> namespace Equinox @@ -40,16 +41,16 @@ type Cache(name, sizeMb : int) = | RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative) interface ICache with - member this.UpdateIfNewer(key, options, entry) = async { + member __.UpdateIfNewer(key, options, entry) = async { let policy = toPolicy options match cache.AddOrGetExisting(key, box entry, policy) with | null -> () | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x } - member this.TryGet key = async { + member __.TryGet key = async { return match cache.Get key with | null -> None | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x } \ No newline at end of file + | x -> failwithf "TryGet Incompatible cache entry %A" x } diff --git a/src/Equinox.Core/Infrastructure.fs b/src/Equinox.Core/Infrastructure.fs old mode 100644 new mode 100755 index e201a8a0a..b8003e109 --- a/src/Equinox.Core/Infrastructure.fs +++ b/src/Equinox.Core/Infrastructure.fs @@ -5,6 +5,7 @@ module internal Equinox.Core.Infrastructure open FSharp.Control open System open System.Diagnostics +open System.Threading.Tasks type OAttribute = System.Runtime.InteropServices.OptionalAttribute type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute @@ -42,26 +43,27 @@ type Async with /// /// Task to be awaited. [] - static member AwaitTaskCorrect(task : System.Threading.Tasks.Task<'T>) : Async<'T> = - Async.FromContinuations(fun (sc,ec,_) -> - task.ContinueWith(fun (t : System.Threading.Tasks.Task<'T>) -> + static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> = + Async.FromContinuations(fun (sc, ec, _) -> + task.ContinueWith(fun (t : Task<'T>) -> if t.IsFaulted then let e = t.Exception if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0] else ec e - elif t.IsCanceled then ec(new System.Threading.Tasks.TaskCanceledException()) + elif t.IsCanceled then ec(new TaskCanceledException()) else sc t.Result) |> ignore) + [] - static member AwaitTaskCorrect(task : System.Threading.Tasks.Task) : Async = - Async.FromContinuations(fun (sc,ec,_) -> - task.ContinueWith(fun (task : System.Threading.Tasks.Task) -> + static member AwaitTaskCorrect(task : Task) : Async = + Async.FromContinuations(fun (sc, ec, _) -> + task.ContinueWith(fun (task : Task) -> if task.IsFaulted then let e = task.Exception if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0] else ec e elif task.IsCanceled then - ec(System.Threading.Tasks.TaskCanceledException()) + ec(TaskCanceledException()) else sc ()) |> ignore) @@ -98,4 +100,4 @@ module Regex = /// Active pattern for branching on successful regex matches let (|Match|_|) (pattern : string) (input : string) = let m = (mkRegex pattern).Match input - if m.Success then Some m else None \ No newline at end of file + if m.Success then Some m else None diff --git a/src/Equinox.Core/Retry.fs b/src/Equinox.Core/Retry.fs old mode 100644 new mode 100755 index a4a4b3bfc..d46614d25 --- a/src/Equinox.Core/Retry.fs +++ b/src/Equinox.Core/Retry.fs @@ -10,6 +10,7 @@ module Retry = /// (until `attempts` exhausted) on an exception matching the `filter`, waiting for the timespan chosen by `backoff` before retrying let withBackoff (maxAttempts : int) (backoff : int -> System.TimeSpan option) (f : int -> Async<'a>) : Async<'a> = if maxAttempts < 1 then raise (invalidArg "maxAttempts" "Should be >= 1") + let rec go attempt = async { try let! res = f attempt @@ -21,4 +22,5 @@ module Retry = | Some timespan -> do! Async.Sleep (int timespan.TotalMilliseconds) | None -> () return! go (attempt + 1) } - go 1 \ No newline at end of file + + go 1 diff --git a/src/Equinox.Core/Stream.fs b/src/Equinox.Core/Stream.fs old mode 100644 new mode 100755 index c8bccd17f..a69a9b079 --- a/src/Equinox.Core/Stream.fs +++ b/src/Equinox.Core/Stream.fs @@ -6,6 +6,7 @@ type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'e interface IStream<'event, 'state> with member __.Load log = category.Load(log, streamId, opt) + member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) = category.TrySync(log, token, originState, events, context) @@ -19,7 +20,8 @@ type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, match preloadedTokenAndState with | Some value -> async { preloadedTokenAndState <- None; return value } | None -> inner.Load log + member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) = inner.TrySync(log, token, originState, events) -let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _ \ No newline at end of file +let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _ diff --git a/src/Equinox.Core/Types.fs b/src/Equinox.Core/Types.fs old mode 100644 new mode 100755 index 1e415faae..37f3c470a --- a/src/Equinox.Core/Types.fs +++ b/src/Equinox.Core/Types.fs @@ -9,6 +9,7 @@ open System.Diagnostics type ICategory<'event, 'state, 'streamId, 'context> = /// Obtain the state from the target stream abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async + /// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding: /// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream /// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State @@ -23,7 +24,7 @@ type StopwatchInterval (startTicks : int64, endTicks : int64) = // Converts a tick count as measured by stopwatch into a TimeSpan value let timeSpanFromStopwatchTicks (ticks : int64) = - let ticksPerSecond = double System.Diagnostics.Stopwatch.Frequency + let ticksPerSecond = double Stopwatch.Frequency let totalSeconds = double ticks / ticksPerSecond TimeSpan.FromSeconds totalSeconds @@ -39,9 +40,9 @@ type Stopwatch = /// Function to execute & time. [] static member Time(f : unit -> 'T) : StopwatchInterval * 'T = - let startTicks = System.Diagnostics.Stopwatch.GetTimestamp() + let startTicks = Stopwatch.GetTimestamp() let result = f () - let endTicks = System.Diagnostics.Stopwatch.GetTimestamp() + let endTicks = Stopwatch.GetTimestamp() let tr = StopwatchInterval(startTicks, endTicks) tr, result @@ -51,9 +52,9 @@ type Stopwatch = /// Function to execute & time. [] static member Time(f : Async<'T>) : Async = async { - let startTicks = System.Diagnostics.Stopwatch.GetTimestamp() + let startTicks = Stopwatch.GetTimestamp() let! result = f - let endTicks = System.Diagnostics.Stopwatch.GetTimestamp() + let endTicks = Stopwatch.GetTimestamp() let tr = StopwatchInterval(startTicks, endTicks) return tr, result - } \ No newline at end of file + } diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs old mode 100644 new mode 100755 index deec92de3..53825d59d --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -13,21 +13,26 @@ type Direction = Forward | Backward with module Log = [] type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int } + [] type Event = | WriteSuccess of Measurement | WriteConflict of Measurement | Slice of Direction * Measurement | Batch of Direction * slices: int * Measurement + let prop name value (log : ILogger) = log.ForContext(name, value) + let propEvents name (kvps : System.Collections.Generic.KeyValuePair seq) (log : ILogger) = let items = seq { for kv in kvps do yield sprintf "{\"%s\": %s}" kv.Key kv.Value } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) + let propEventData name (events : EventData[]) (log : ILogger) = log |> propEvents name (seq { for x in events do if x.IsJson then yield System.Collections.Generic.KeyValuePair<_,_>(x.Type, System.Text.Encoding.UTF8.GetString x.Data) }) + let propResolvedEvents name (events : ResolvedEvent[]) (log : ILogger) = log |> propEvents name (seq { for x in events do @@ -36,12 +41,14 @@ module Log = yield System.Collections.Generic.KeyValuePair<_,_>(e.EventType, System.Text.Encoding.UTF8.GetString e.Data) }) open Serilog.Events + /// Attach a property to the log context to hold the metrics // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 let event (value : Event) (log : ILogger) = let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("esEvt", ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt }) - let withLoggedRetries<'t> retryPolicy (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> = + + let withLoggedRetries<'t> retryPolicy (contextLabel : string) (f : ILogger -> Async<'t>) log : Async<'t> = match retryPolicy with | None -> f log | Some retryPolicy -> @@ -49,6 +56,7 @@ module Log = let log = if count = 1 then log else log |> prop contextLabel count f log retryPolicy withLoggingContextWrapping + let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length /// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and @@ -56,27 +64,31 @@ module Log = module InternalMetrics = module Stats = - let inline (|Stats|) ({ interval = i }: Measurement) = let e = i.Elapsed in int64 e.TotalMilliseconds + let inline (|Stats|) ({ interval = i } : Measurement) = let e = i.Elapsed in int64 e.TotalMilliseconds let (|Read|Write|Resync|Rollup|) = function - | Slice (_,(Stats s)) -> Read s + | Slice (_, (Stats s)) -> Read s | WriteSuccess (Stats s) -> Write s | WriteConflict (Stats s) -> Resync s // slices are rolled up into batches so be sure not to double-count - | Batch (_,_,(Stats s)) -> Rollup s + | Batch (_, _, (Stats s)) -> Rollup s + let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function | (:? ScalarValue as x) -> Some x.Value | _ -> None + let (|EsMetric|_|) (logEvent : LogEvent) : Event option = match logEvent.Properties.TryGetValue("esEvt") with | true, SerilogScalar (:? Event as e) -> Some e | _ -> None + type Counter = - { mutable count: int64; mutable ms: int64 } + { mutable count : int64; mutable ms : int64 } static member Create() = { count = 0L; ms = 0L } member __.Ingest(ms) = System.Threading.Interlocked.Increment(&__.count) |> ignore System.Threading.Interlocked.Add(&__.ms, ms) |> ignore + type LogSink() = static let epoch = System.Diagnostics.Stopwatch.StartNew() static member val Read = Counter.Create() with get, set @@ -99,7 +111,7 @@ module Log = /// Relies on feeding of metrics from Log through to Stats.LogSink /// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant - let dump (log: Serilog.ILogger) = + let dump (log : Serilog.ILogger) = let stats = [ "Read", Stats.LogSink.Read "Write", Stats.LogSink.Write @@ -135,9 +147,11 @@ module private Write = log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}", [| for x in events -> x.Type |], ex.ActualVersion) return EsSyncResult.Conflict (let v = ex.ActualVersion in v.Value) } + let eventDataBytes events = let eventDataLen (x : EventData) = match x.Data, x.Metadata with Log.BlobLen bytes, Log.BlobLen metaBytes -> bytes + metaBytes events |> Array.sumBy eventDataLen + let private writeEventsLogged (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) (log : ILogger) : Async = async { let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEventData "Json" events @@ -155,6 +169,7 @@ module private Write = (resultLog |> Log.event evt).Information("Ges{action:l} count={count} conflict={conflict}", "Write", events.Length, match evt with Log.WriteConflict _ -> true | _ -> false) return result } + let writeEvents (log : ILogger) retryPolicy (conn : IEventStoreConnection) (streamName : string) (version : int64) (events : EventData[]) : Async = let call = writeEventsLogged conn streamName version events @@ -162,23 +177,27 @@ module private Write = module private Read = open FSharp.Control + let private readSliceAsync (conn : IEventStoreConnection) (streamName : string) (direction : Direction) (batchSize : int) (startPos : int64) : Async = async { let call = match direction with - | Direction.Forward -> conn.ReadStreamEventsForwardAsync(streamName, startPos, batchSize, resolveLinkTos = false) + | Direction.Forward -> conn.ReadStreamEventsForwardAsync(streamName, startPos, batchSize, resolveLinkTos = false) | Direction.Backward -> conn.ReadStreamEventsBackwardAsync(streamName, startPos, batchSize, resolveLinkTos = false) return! call |> Async.AwaitTaskCorrect } + let (|ResolvedEventLen|) (x : ResolvedEvent) = match x.Event.Data, x.Event.Metadata with Log.BlobLen bytes, Log.BlobLen metaBytes -> bytes + metaBytes + let private loggedReadSlice conn streamName direction batchSize startPos (log : ILogger) : Async = async { let! t, slice = readSliceAsync conn streamName direction batchSize startPos |> Stopwatch.Time let bytes, count = slice.Events |> Array.sumBy (|ResolvedEventLen|), slice.Events.Length - let reqMetric : Log.Measurement ={ stream = streamName; interval = t; bytes = bytes; count = count} + let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} let evt = Log.Slice (direction, reqMetric) let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propResolvedEvents "Json" slice.Events (log |> Log.prop "startPos" startPos |> Log.prop "bytes" bytes |> Log.event evt).Information("Ges{action:l} count={count} version={version}", "Read", count, slice.LastEventNumber) return slice } + let private readBatches (log : ILogger) (readSlice : int64 -> ILogger -> Async) (maxPermittedBatchReads : int option) (startPosition : int64) : AsyncSeq = @@ -199,16 +218,19 @@ module private Read = yield! loop (batchCount + 1) slice.NextEventNumber | x -> raise <| System.ArgumentOutOfRangeException("SliceReadStatus", x, "Unknown result value") } loop 0 startPosition + let resolvedEventBytes events = events |> Array.sumBy (|ResolvedEventLen|) + let logBatchRead direction streamName t events batchSize version (log : ILogger) = let bytes, count = resolvedEventBytes events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} - let batches = (events.Length - 1)/batchSize + 1 + let batches = (events.Length - 1) / batchSize + 1 let action = match direction with Direction.Forward -> "LoadF" | Direction.Backward -> "LoadB" let evt = Log.Event.Batch (direction, batches, reqMetric) (log |> Log.prop "bytes" bytes |> Log.event evt).Information( "Ges{action:l} stream={stream} count={count}/{batches} version={version}", action, streamName, count, batches, version) + let loadForwardsFrom (log : ILogger) retryPolicy conn batchSize maxPermittedBatchReads streamName startPosition : Async = async { let mergeBatches (batches: AsyncSeq) = async { @@ -220,6 +242,7 @@ module private Read = |> AsyncSeq.toArrayAsync let version = match versionFromStream with Some version -> version | None -> invalidOp "no version encountered in event batch stream" return version, events } + let call pos = loggedReadSlice conn streamName Direction.Forward batchSize pos let retryingLoggingReadSlice pos = Log.withLoggedRetries retryPolicy "readAttempt" (call pos) let direction = Direction.Forward @@ -228,13 +251,15 @@ module private Read = let! t, (version, events) = mergeBatches batches |> Stopwatch.Time log |> logBatchRead direction streamName t events batchSize version return version, events } + let partitionPayloadFrom firstUsedEventNumber : ResolvedEvent[] -> int * int = - let acc (tu,tr) ((ResolvedEventLen bytes) as y) = if y.Event.EventNumber < firstUsedEventNumber then tu, tr + bytes else tu + bytes, tr - Array.fold acc (0,0) - let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy conn batchSize maxPermittedBatchReads streamName (tryDecode,isOrigin) + let acc (tu, tr) ((ResolvedEventLen bytes) as y) = if y.Event.EventNumber < firstUsedEventNumber then tu, tr + bytes else tu + bytes, tr + Array.fold acc (0, 0) + + let loadBackwardsUntilCompactionOrStart (log : ILogger) retryPolicy conn batchSize maxPermittedBatchReads streamName (tryDecode, isOrigin) : Async = async { let mergeFromCompactionPointOrStartFromBackwardsStream (log : ILogger) (batchesBackward : AsyncSeq) - : Async = async { + : Async = async { let versionFromStream, lastBatch = ref None, ref None let! tempBackward = batchesBackward @@ -257,6 +282,7 @@ module private Read = let eventsForward = Array.Reverse(tempBackward); tempBackward // sic - relatively cheap, in-place reverse of something we own let version = match !versionFromStream with Some version -> version | None -> invalidOp "no version encountered in event batch stream" return version, eventsForward } + let call pos = loggedReadSlice conn streamName Direction.Backward batchSize pos let retryingLoggingReadSlice pos = Log.withLoggedRetries retryPolicy "readAttempt" (call pos) let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" streamName @@ -275,15 +301,16 @@ module UnionEncoderAdapters = let ts = DateTimeOffset.FromUnixTimeMilliseconds(e.CreatedEpoch) // TOCONSIDER wire e.Metadata.["$correlationId"] and .["$causationId"] into correlationId and causationId // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata - FsCodec.Core.TimelineEvent.Create(e.EventNumber, e.EventType, e.Data, e.Metadata, null, null, ts) + FsCodec.Core.TimelineEvent.Create(e.EventNumber, e.EventType, e.Data, e.Metadata, correlationId = null, causationId = null, timestamp = ts) + let eventDataOfEncodedEvent (x : FsCodec.IEventData) = // TOCONSIDER wire x.CorrelationId, x.CausationId into x.Meta.["$correlationId"] and .["$causationId"] // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata - EventData(Guid.NewGuid(), x.EventType, (*isJson*) true, x.Data, x.Meta) + EventData(Guid.NewGuid(), x.EventType, isJson = true, data = x.Data, metadata = x.Meta) -type Stream = { name: string } -type Position = { streamVersion: int64; compactionEventNumber: int64 option; batchCapacityLimit: int option } -type Token = { stream: Stream; pos: Position } +type Stream = { name : string } +type Position = { streamVersion : int64; compactionEventNumber : int64 option; batchCapacityLimit : int option } +type Token = { stream : Stream; pos : Position } module Token = let private create compactionEventNumber batchCapacityLimit streamName streamVersion : StreamToken = @@ -291,43 +318,53 @@ module Token = stream = { name = streamName} pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } version = streamVersion } + /// No batching / compaction; we only need to retain the StreamVersion let ofNonCompacting streamName streamVersion : StreamToken = create None None streamName streamVersion + // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize : int) (streamVersion : int64) : int = match compactedEventNumberOption with | Some (compactionEventNumber : int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 + let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken = let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion + /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom let ofUncompactedVersion batchSize streamName streamVersion : StreamToken = ofCompactionEventNumber None 0 batchSize streamName streamVersion + let (|Unpack|) (x : StreamToken) : Token = unbox x.value + /// Use previousToken plus the data we are adding and the position we are adding it to infer a headroom let ofPreviousTokenAndEventsLength (Unpack previousToken) eventsLength batchSize streamVersion : StreamToken = let compactedEventNumber = previousToken.pos.compactionEventNumber ofCompactionEventNumber compactedEventNumber eventsLength batchSize previousToken.stream.name streamVersion + /// Use an event just read from the stream to infer headroom let ofCompactionResolvedEventAndVersion (compactionEvent: ResolvedEvent) batchSize streamName streamVersion : StreamToken = ofCompactionEventNumber (Some compactionEvent.Event.EventNumber) 0 batchSize streamName streamVersion + /// Use an event we are about to write to the stream to infer headroom let ofPreviousStreamVersionAndCompactionEventDataIndex (Unpack token) compactionEventDataIndex eventsLength batchSize streamVersion' : StreamToken = ofCompactionEventNumber (Some (token.pos.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize token.stream.name streamVersion' + let (|StreamPos|) (Unpack token) : Stream * Position = token.stream, token.pos + let supersedes (Unpack current) (Unpack x) = let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion newVersion > currentVersion -type Connection(readConnection, []?writeConnection, []?readRetryPolicy, []?writeRetryPolicy) = +type Connection(readConnection, [] ?writeConnection, [] ?readRetryPolicy, [] ?writeRetryPolicy) = member __.ReadConnection = readConnection member __.ReadRetryPolicy = readRetryPolicy member __.WriteConnection = defaultArg writeConnection readConnection member __.WriteRetryPolicy = writeRetryPolicy -type BatchingPolicy(getMaxBatchSize : unit -> int, []?batchCountLimit) = +type BatchingPolicy(getMaxBatchSize : unit -> int, [] ?batchCountLimit) = new (maxBatchSize) = BatchingPolicy(fun () -> maxBatchSize) member __.BatchSize = getMaxBatchSize() member __.MaxBatches = batchCountLimit @@ -336,10 +373,12 @@ type BatchingPolicy(getMaxBatchSize : unit -> int, []?batchCountLimi type GatewaySyncResult = Written of StreamToken | ConflictUnknown of StreamToken type Context(conn : Connection, batching : BatchingPolicy) = - let isResolvedEventEventType (tryDecode,predicate) (x:ResolvedEvent) = predicate (tryDecode (x.Event.Data)) + let isResolvedEventEventType (tryDecode, predicate) (x : ResolvedEvent) = predicate (tryDecode (x.Event.Data)) let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType + member internal __.LoadEmpty streamName = Token.ofUncompactedVersion batching.BatchSize streamName -1L - member __.LoadBatched streamName log (tryDecode,isCompactionEventType): Async = async { + + member __.LoadBatched streamName log (tryDecode, isCompactionEventType) : Async = async { let! version, events = Read.loadForwardsFrom log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName 0L match tryIsResolvedEventEventType isCompactionEventType with | None -> return Token.ofNonCompacting streamName version, Array.choose tryDecode events @@ -347,13 +386,15 @@ type Context(conn : Connection, batching : BatchingPolicy) = match events |> Array.tryFindBack isCompactionEvent with | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } - member __.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode,isOrigin): Async = async { + + member __.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode, isOrigin) : Async = async { let! version, events = - Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName (tryDecode,isOrigin) + Read.loadBackwardsUntilCompactionOrStart log conn.ReadRetryPolicy conn.ReadConnection batching.BatchSize batching.MaxBatches streamName (tryDecode, isOrigin) match Array.tryHead events |> Option.filter (function _, Some e -> isOrigin e | _ -> false) with | None -> return Token.ofUncompactedVersion batching.BatchSize streamName version, Array.choose snd events - | Some (resolvedEvent,_) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose snd events } - member __.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) (tryDecode,isCompactionEventType) + | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose snd events } + + member __.LoadFromToken useWriteConn streamName log (Token.Unpack token as streamToken) (tryDecode, isCompactionEventType) : Async = async { let streamPosition = token.pos.streamVersion + 1L let connToUse = if useWriteConn then conn.WriteConnection else conn.ReadConnection @@ -364,6 +405,7 @@ type Context(conn : Connection, batching : BatchingPolicy) = match events |> Array.tryFindBack (fun re -> match tryDecode re with Some e -> isCompactionEvent e | _ -> false) with | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize streamName version, Array.choose tryDecode events } + member __.TrySync log (Token.Unpack token as streamToken) (events, encodedEvents: EventData array) (isCompactionEventType) : Async = async { let streamVersion = token.pos.streamVersion let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection token.stream.name streamVersion encodedEvents @@ -371,17 +413,17 @@ type Context(conn : Connection, batching : BatchingPolicy) = | EsSyncResult.Conflict actualVersion -> return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting token.stream.name actualVersion) | EsSyncResult.Written wr -> + let version' = wr.NextExpectedVersion + let token = + match isCompactionEventType with + | None -> Token.ofNonCompacting token.stream.name version' + | Some isCompactionEvent -> + match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with + | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' + | Some compactionEventIndex -> + Token.ofPreviousStreamVersionAndCompactionEventDataIndex streamToken compactionEventIndex encodedEvents.Length batching.BatchSize version' + return GatewaySyncResult.Written token } - let version' = wr.NextExpectedVersion - let token = - match isCompactionEventType with - | None -> Token.ofNonCompacting token.stream.name version' - | Some isCompactionEvent -> - match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with - | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' - | Some compactionEventIndex -> - Token.ofPreviousStreamVersionAndCompactionEventDataIndex streamToken compactionEventIndex encodedEvents.Length batching.BatchSize version' - return GatewaySyncResult.Written token } member __.Sync(log, streamName, streamVersion, events: FsCodec.IEventData[]) : Async = async { let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents @@ -403,53 +445,60 @@ type AccessStrategy<'event,'state> = /// (embedded in the stream as an event), generated every batchSize events using the supplied toSnapshot function /// Scanning for events concludes when any event passes the isOrigin test. /// See https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html - | RollingSnapshots of isOrigin: ('event -> bool) * toSnapshot: ('state -> 'event) + | RollingSnapshots of isOrigin : ('event -> bool) * toSnapshot : ('state -> 'event) type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = /// Determines whether writing a Compaction event is warranted (based on the existing state and the current accumulated changes) member __.IsCompactionDue = eventsLen > capacityBeforeCompaction -type private Category<'event, 'state, 'context>(context : Context, codec : FsCodec.IEventCodec<_,_,'context>, ?access : AccessStrategy<'event,'state>) = - let tryDecode (e: ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode +type private Category<'event, 'state, 'context>(context : Context, codec : FsCodec.IEventCodec<_, _, 'context>, ?access : AccessStrategy<'event, 'state>) = + let tryDecode (e : ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode + let compactionPredicate = match access with | None -> None | Some AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) - | Some (AccessStrategy.RollingSnapshots (isValid,_)) -> Some isValid + | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> Some isValid + let isOrigin = match access with | None | Some AccessStrategy.LatestKnownEvent -> fun _ -> true - | Some (AccessStrategy.RollingSnapshots (isValid,_)) -> isValid + | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> isValid + let loadAlgorithm load streamName initial log = let batched = load initial (context.LoadBatched streamName log (tryDecode,None)) - let compacted = load initial (context.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode,isOrigin)) + let compacted = load initial (context.LoadBackwardsStoppingAtCompactionEvent streamName log (tryDecode, isOrigin)) match access with | None -> batched | Some AccessStrategy.LatestKnownEvent | Some (AccessStrategy.RollingSnapshots _) -> compacted - let load (fold: 'state -> 'event seq -> 'state) initial f = async { + + let load (fold : 'state -> 'event seq -> 'state) initial f = async { let! token, events = f return token, fold initial events } - member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) (streamName : string) (log : ILogger) : Async = + + member __.Load (fold : 'state -> 'event seq -> 'state) (initial : 'state) (streamName : string) (log : ILogger) : Async = loadAlgorithm (load fold) streamName initial log - member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) (streamName : string) token (log : ILogger) : Async = - (load fold) state (context.LoadFromToken false streamName log token (tryDecode,compactionPredicate)) + + member __.LoadFromToken (fold : 'state -> 'event seq -> 'state) (state : 'state) (streamName : string) token (log : ILogger) : Async = + (load fold) state (context.LoadFromToken false streamName log token (tryDecode, compactionPredicate)) + member __.TrySync<'context> ( log : ILogger, fold: 'state -> 'event seq -> 'state, - (Token.StreamPos (stream,pos) as streamToken), state : 'state, events : 'event list, ctx : 'context option): Async> = async { - let encode e = codec.Encode(ctx,e) + (Token.StreamPos (stream, pos) as streamToken), state : 'state, events : 'event list, ctx : 'context option) : Async> = async { + let encode e = codec.Encode(ctx, e) let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events - | Some (AccessStrategy.RollingSnapshots (_,compact)) -> + | Some (AccessStrategy.RollingSnapshots (_, compact)) -> let cc = CompactionContext(List.length events, pos.batchCapacityLimit.Value) if cc.IsCompactionDue then events @ [fold state events |> compact] else events let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq - let! syncRes = context.TrySync log streamToken (events,encodedEvents) compactionPredicate + let! syncRes = context.TrySync log streamToken (events, encodedEvents) compactionPredicate match syncRes with | GatewaySyncResult.ConflictUnknown _ -> - return SyncResult.Conflict (load fold state (context.LoadFromToken true stream.name log streamToken (tryDecode,compactionPredicate))) + return SyncResult.Conflict (load fold state (context.LoadFromToken true stream.name log streamToken (tryDecode, compactionPredicate))) | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state (Seq.ofList events)) } @@ -459,27 +508,30 @@ module Caching = let intercept streamName tokenAndState = async { let! _ = tee streamName tokenAndState return tokenAndState } + let loadAndIntercept load streamName = async { let! tokenAndState = load return! intercept streamName tokenAndState } + interface ICategory<'event, 'state, string, 'context> with member __.Load(log, streamName : string, opt) : Async = loadAndIntercept (inner.Load(log, streamName, opt)) streamName + member __.TrySync(log : ILogger, (Token.StreamPos (stream,_) as token), state, events : 'event list, context) : Async> = async { let! syncRes = inner.TrySync(log, token, state, events, context) match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict (loadAndIntercept resync stream.name) - | SyncResult.Written (token',state') -> + | SyncResult.Written (token', state') -> let! intercepted = intercept stream.name (token', state') return SyncResult.Written intercepted } let applyCacheUpdatesWithSlidingExpiration - (cache: ICache) - (prefix: string) + (cache : ICache) + (prefix : string) (slidingExpiration : TimeSpan) - (category: ICategory<'event, 'state, string, 'context>) + (category : ICategory<'event, 'state, string, 'context>) : ICategory<'event, 'state, string, 'context> = - let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) + let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) let options = CacheItemOptions.RelativeExpiration slidingExpiration let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value) CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ @@ -495,6 +547,7 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state | None -> return! batched log streamName | Some tokenAndState when opt = Some AllowStale -> return tokenAndState | Some (token, state) -> return! category.LoadFromToken fold state streamName token log } + member __.TrySync(log : ILogger, token, initialState, events : 'event list, context) : Async> = async { let! syncRes = category.TrySync(log, fold, token, initialState, events, context) match syncRes with @@ -503,16 +556,17 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state [] type CachingStrategy = - | SlidingWindow of ICache * window: TimeSpan + | SlidingWindow of ICache * window : TimeSpan /// Prefix is used to segregate multiple folds per stream when they are stored in the cache - | SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string + | SlidingWindowPrefixed of ICache * window : TimeSpan * prefix : string type Resolver<'event, 'state, 'context> - ( context : Context, codec : FsCodec.IEventCodec<_,_,'context>, fold, initial, + ( context : Context, codec : FsCodec.IEventCodec<_, _, 'context>, fold, initial, /// Caching can be overkill for EventStore esp considering the degree to which its intrinsic caching is a first class feature /// e.g., A key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load - []?caching, - []?access) = + [] ?caching, + [] ?access) = + do match access with | Some AccessStrategy.LatestKnownEvent when Option.isSome caching -> "Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)" @@ -526,33 +580,37 @@ type Resolver<'event, 'state, 'context> | None -> None | Some (CachingStrategy.SlidingWindow(cache, _)) -> Some(cache, null) | Some (CachingStrategy.SlidingWindowPrefixed(cache, _, prefix)) -> Some(cache, prefix) + let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption) - let category : ICategory<_,_,_,'context> = + + let category : ICategory<_, _, _, 'context> = match caching with | None -> folder :> _ | Some (CachingStrategy.SlidingWindow(cache, window)) -> Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder | Some (CachingStrategy.SlidingWindowPrefixed(cache, window, prefix)) -> Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder + let resolveStream = Stream.create category - let loadEmpty sn = context.LoadEmpty sn,initial - member __.Resolve(streamName : FsCodec.StreamName, []?option, []?context) = + let loadEmpty sn = context.LoadEmpty sn, initial + + member __.Resolve(streamName : FsCodec.StreamName, [] ?option, [] ?context) = match FsCodec.StreamName.toString streamName, option with | sn, (None|Some AllowStale) -> resolveStream sn option context | sn, Some AssumeEmpty -> Stream.ofMemento (loadEmpty sn) (resolveStream sn option context) /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] member __.FromMemento(Token.Unpack token as streamToken, state, ?context) = - Stream.ofMemento (streamToken,state) (resolveStream token.stream.name context None) + Stream.ofMemento (streamToken, state) (resolveStream token.stream.name context None) type private SerilogAdapter(log : ILogger) = interface EventStore.ClientAPI.ILogger with - member __.Debug(format: string, args: obj []) = log.Debug(format, args) - member __.Debug(ex: exn, format: string, args: obj []) = log.Debug(ex, format, args) - member __.Info(format: string, args: obj []) = log.Information(format, args) - member __.Info(ex: exn, format: string, args: obj []) = log.Information(ex, format, args) - member __.Error(format: string, args: obj []) = log.Error(format, args) - member __.Error(ex: exn, format: string, args: obj []) = log.Error(ex, format, args) + member __.Debug(format : string, args : obj []) = log.Debug(format, args) + member __.Debug(ex : exn, format : string, args : obj []) = log.Debug(ex, format, args) + member __.Info(format : string, args : obj []) = log.Information(format, args) + member __.Info(ex : exn, format : string, args : obj []) = log.Information(ex, format, args) + member __.Error(format : string, args : obj []) = log.Error(format, args) + member __.Error(ex : exn, format : string, args : obj []) = log.Error(ex, format, args) [] type Logger = @@ -594,15 +652,19 @@ module private Discovery = ClusterSettings.Create().DiscoverClusterViaDns().KeepDiscovering() |> fun s -> match np with NodePreference.Random -> s.PreferRandomNode() | NodePreference.PreferSlave -> s.PreferSlaveNode() | _ -> s |> f |> fun s -> s.Build() + let buildSeeded np (f : GossipSeedClusterSettingsBuilder -> GossipSeedClusterSettingsBuilder) = ClusterSettings.Create().DiscoverClusterViaGossipSeeds().KeepDiscovering() |> fun s -> match np with NodePreference.Random -> s.PreferRandomNode() | NodePreference.PreferSlave -> s.PreferSlaveNode() | _ -> s |> f |> fun s -> s.Build() + let configureDns clusterDns maybeManagerPort (x : DnsClusterSettingsBuilder) = x.SetClusterDns(clusterDns) |> fun s -> match maybeManagerPort with Some port -> s.SetClusterGossipPort(port) | None -> s + let inline configureSeeded (seedEndpoints : System.Net.IPEndPoint []) (x : GossipSeedClusterSettingsBuilder) = x.SetGossipSeedEndPoints(seedEndpoints) + // converts a Discovery mode to a ClusterSettings or a Uri as appropriate let (|DiscoverViaUri|DiscoverViaGossip|) : Discovery * NodePreference -> Choice = function | (Discovery.Uri uri), _ -> DiscoverViaUri uri @@ -620,12 +682,12 @@ type ConnectionStrategy = type Connector ( username, password, reqTimeout: TimeSpan, reqRetries: int, - []?log : Logger, []?heartbeatTimeout: TimeSpan, []?concurrentOperationsLimit, - []?readRetryPolicy, []?writeRetryPolicy, - []?gossipTimeout, []?clientConnectionTimeout, + [] ?log : Logger, [] ?heartbeatTimeout: TimeSpan, [] ?concurrentOperationsLimit, + [] ?readRetryPolicy, [] ?writeRetryPolicy, + [] ?gossipTimeout, [] ?clientConnectionTimeout, /// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster /// NB as this will enter server and client logs, it should not contain sensitive information - []?tags : (string*string) seq) = + [] ?tags : (string*string) seq) = let connSettings node = ConnectionSettings.Create().SetDefaultUserCredentials(SystemData.UserCredentials(username, password)) .KeepReconnecting() // ES default: .LimitReconnectionsTo(10) @@ -635,12 +697,12 @@ type Connector .LimitRetriesForOperationTo(reqRetries) // ES default: 10 |> fun s -> match node with - | NodePreference.Master -> s.PerformOnMasterOnly() // explicitly use ES default of requiring master, use default Node preference of Master - | NodePreference.PreferMaster -> s.PerformOnAnyNode() // override default [implied] PerformOnMasterOnly(), use default Node preference of Master + | NodePreference.Master -> s.PerformOnMasterOnly() // explicitly use ES default of requiring master, use default Node preference of Master + | NodePreference.PreferMaster -> s.PerformOnAnyNode() // override default [implied] PerformOnMasterOnly(), use default Node preference of Master // NB .PreferSlaveNode/.PreferRandomNode setting is ignored if using EventStoreConneciton.Create(ConnectionSettings, ClusterSettings) overload but - // this code is necessary for cases where people are using the discover:// and related URI schemes - | NodePreference.PreferSlave -> s.PerformOnAnyNode().PreferSlaveNode() // override default PerformOnMasterOnly(), override Master Node preference - | NodePreference.Random -> s.PerformOnAnyNode().PreferRandomNode() // override default PerformOnMasterOnly(), override Master Node preference + // this code is necessary for cases where people are using the discover :// and related URI schemes + | NodePreference.PreferSlave -> s.PerformOnAnyNode().PreferSlaveNode() // override default PerformOnMasterOnly(), override Master Node preference + | NodePreference.Random -> s.PerformOnAnyNode().PreferRandomNode() // override default PerformOnMasterOnly(), override Master Node preference |> fun s -> match concurrentOperationsLimit with Some col -> s.LimitConcurrentOperationsTo(col) | None -> s // ES default: 5000 |> fun s -> match heartbeatTimeout with Some v -> s.SetHeartbeatTimeout v | None -> s // default: 1500 ms |> fun s -> match gossipTimeout with Some v -> s.SetGossipTimeout v | None -> s // default: 1000 ms @@ -685,4 +747,4 @@ type Connector let! masterInParallel = Async.StartChild (__.Connect(name + "-TwinW", discovery, NodePreference.Master)) let! slave = __.Connect(name + "-TwinR", discovery, NodePreference.PreferSlave) let! master = masterInParallel - return Connection(readConnection=slave, writeConnection=master, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy) } \ No newline at end of file + return Connection(readConnection = slave, writeConnection = master, ?readRetryPolicy = readRetryPolicy, ?writeRetryPolicy = writeRetryPolicy) } diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs old mode 100644 new mode 100755 index 4679b75fd..fe0511191 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -10,32 +10,41 @@ type MaxResyncsExhaustedException(count) = /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Stream<'event, 'state> ( log, stream : IStream<'event, 'state>, maxAttempts : int, - []?mkAttemptsExhaustedException, - []?resyncPolicy) = + [] ?mkAttemptsExhaustedException, + [] ?resyncPolicy) = + let transact f = let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber f -> async { return! f }) let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException - Flow.transact (maxAttempts,resyncPolicy,handleResyncsExceeded) (stream, log) f + Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) f - /// 0. Invoke the supplied `interpret` function with the present state 1. attempt to sync the accumulated events to the stream - /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. + /// 0. Invoke the supplied `interpret` function with the present state + /// 1. Attempt to sync the accumulated events to the stream + /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. member __.Transact(interpret : 'state -> 'event list) : Async = transact (fun state -> async { return (), interpret state }) - /// 0. Invoke the supplied `decide` function with the present state 1. attempt to sync the accumulated events to the stream 2. yield result - /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. - member __.Transact(decide : 'state -> 'result*'event list) : Async<'result> = transact (fun state -> async { return decide state }) - /// 0. Invoke the supplied _Async_ `decide` function with the present state 1. attempt to sync the accumulated events to the stream 2. yield result - /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. - member __.TransactAsync(decide : 'state -> Async<'result*'event list>) : Async<'result> = transact decide + + /// 0. Invoke the supplied `decide` function with the present state + /// 1. Attempt to sync the accumulated events to the stream + /// 2. Yield result + /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. + member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = transact (fun state -> async { return decide state }) + + /// 0. Invoke the supplied _Async_ `decide` function with the present state + /// 1. Attempt to sync the accumulated events to the stream + /// 2. Yield result + /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. + member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = transact decide /// Project from the folded `State` without executing a decision flow as `Decide` does member __.Query(projection : 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.State) + /// Project from the folded `State` (with the current version of the stream supplied for context) without executing a decision flow as `Decide` does member __.QueryEx(projection : int64 -> 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.Version syncState.State) /// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application /// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state - member __.CreateMemento(): Async = Flow.query(stream, log, fun syncState -> syncState.Memento) + member __.CreateMemento() : Async = Flow.query(stream, log, fun syncState -> syncState.Memento) /// Store-agnostic Context.Resolve Options type ResolveOption = @@ -44,4 +53,4 @@ type ResolveOption = /// If the Cache holds a value, use that without checking the backing store for updates, implying: /// - maximizing use of OCC for `Stream.Transact` /// - enabling potentially stale reads [in the face of multiple writers)] (for `Stream.Query`) - | AllowStale \ No newline at end of file + | AllowStale diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs old mode 100644 new mode 100755 index dbc9d3fce..3141bc453 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -11,21 +11,22 @@ type StreamToken = { value : obj; version: int64 } /// Internal type used to represent the outcome of a TrySync operation [] type SyncResult<'state> = - /// The write succeeded (the supplied token and state can be used to efficiently continue the processing iff desired) + /// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired) | Written of StreamToken * 'state /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store - /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (iff required) needs an extra trip to obtain + /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain | Conflict of Async /// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. type IStream<'event, 'state> = /// Obtain the state from the target stream - abstract Load: log: ILogger + abstract Load : log: ILogger -> Async + /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream /// SyncResult.Written: implies the state is now the value represented by the Result's value /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry - abstract TrySync: log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async> + abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async> /// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs. module internal Flow = @@ -48,6 +49,7 @@ module internal Flow = | SyncResult.Written (token', streamState') -> tokenAndState <- token', streamState' return true } + member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async = let resyncInPreparationForRetry resync = async { let! streamState' = runResync log attemptNumber resync @@ -64,9 +66,11 @@ module internal Flow = (syncState : SyncState<'event, 'state>) (decide : 'state -> Async<'result * 'event list>) : Async<'result> = + if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") + /// Run a decision cycle - decide what events should be appended given the presented state - let rec loop attempt: Async<'result> = async { + let rec loop attempt : Async<'result> = async { let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) let! result, events = decide syncState.State if List.isEmpty events then @@ -88,15 +92,16 @@ module internal Flow = return! loop (attempt + 1) else return result } + /// Commence, processing based on the incoming state loop 1 - let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async { + let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide : Async<'result> = async { let! streamState = stream.Load log let syncState = SyncState(streamState, stream.TrySync) - return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide } + return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide } - let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async { + let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async { let! streamState = stream.Load log let syncState = SyncState(streamState, stream.TrySync) - return project syncState } \ No newline at end of file + return project syncState }