Skip to content

Commit

Permalink
Style/consistency changes; add .editorconfig (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
CumpsD authored Feb 3, 2020
1 parent 7755946 commit 278a7b0
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 128 deletions.
9 changes: 9 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
root = true


[*.fs]
indent_style = space
indent_size = 4
trim_trailing_whitespace = true
insert_final_newline = true
end_of_line = lf
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ This covers what the most complete possible implementation of the JS Stored Proc
The `sync` stored procedure takes as input, a document that is almost identical to the format of the _`Tip`_ batch (in fact, if the stream is found to be empty, it is pretty much the template for the first document created in the stream). The request includes the following elements:

- `expectedVersion`: the position the requester has based their [proposed] events on (no, [providing an `etag` to save on Request Charges is not possible in the Stored Proc](https://stackoverflow.com/questions/53355886/azure-cosmosdb-stored-procedure-ifmatch-predicate))
- `e`: array of Events (see Event, above) to append iff the expectedVersion check is fulfilled
- `e`: array of Events (see Event, above) to append if, and only if, the expectedVersion check is fulfilled
- `u`: array of `unfold`ed events (aka snapshots) that supersede items with equivalent `c`ase values
- `maxEvents`: the maximum number of events in an individual batch prior to starting a new one. For example:

Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.Core/AsyncCacheCell.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type AsyncLazy<'T>(workflow : Async<'T>) =
type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) =
let mutable currentCell = AsyncLazy workflow

let initializationFailed (value: System.Threading.Tasks.Task<_>) =
let initializationFailed (value : System.Threading.Tasks.Task<_>) =
// for TMI on this, see https://stackoverflow.com/a/33946166/11635
value.IsCompleted && value.Status <> System.Threading.Tasks.TaskStatus.RanToCompletion

Expand Down Expand Up @@ -44,4 +44,4 @@ type AsyncCacheCell<'T>(workflow : Async<'T>, ?isExpired : 'T -> bool) =
match isExpired with
| Some f when f current -> return! update cell
| _ -> return current
}
}
11 changes: 6 additions & 5 deletions src/Equinox.Core/Cache.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ type CacheItemOptions =
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, supersedes: StreamToken -> StreamToken -> bool) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer(other: CacheEntry<'state>) =
member __.UpdateIfNewer(other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> supersedes currentToken then
currentToken <- otherToken
currentState <- otherState

member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type ICache =
abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> Async<unit>
abstract member TryGet: key: string -> Async<(StreamToken * 'state) option>
abstract member TryGet : key: string -> Async<(StreamToken * 'state) option>

namespace Equinox

Expand All @@ -40,16 +41,16 @@ type Cache(name, sizeMb : int) =
| RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative)

interface ICache with
member this.UpdateIfNewer(key, options, entry) = async {
member __.UpdateIfNewer(key, options, entry) = async {
let policy = toPolicy options
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x }

member this.TryGet key = async {
member __.TryGet key = async {
return
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x }
| x -> failwithf "TryGet Incompatible cache entry %A" x }
20 changes: 11 additions & 9 deletions src/Equinox.Core/Infrastructure.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module internal Equinox.Core.Infrastructure
open FSharp.Control
open System
open System.Diagnostics
open System.Threading.Tasks

type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
Expand Down Expand Up @@ -42,26 +43,27 @@ type Async with
/// </summary>
/// <param name="task">Task to be awaited.</param>
[<DebuggerStepThrough>]
static member AwaitTaskCorrect(task : System.Threading.Tasks.Task<'T>) : Async<'T> =
Async.FromContinuations(fun (sc,ec,_) ->
task.ContinueWith(fun (t : System.Threading.Tasks.Task<'T>) ->
static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> =
Async.FromContinuations(fun (sc, ec, _) ->
task.ContinueWith(fun (t : Task<'T>) ->
if t.IsFaulted then
let e = t.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif t.IsCanceled then ec(new System.Threading.Tasks.TaskCanceledException())
elif t.IsCanceled then ec(new TaskCanceledException())
else sc t.Result)
|> ignore)

[<DebuggerStepThrough>]
static member AwaitTaskCorrect(task : System.Threading.Tasks.Task) : Async<unit> =
Async.FromContinuations(fun (sc,ec,_) ->
task.ContinueWith(fun (task : System.Threading.Tasks.Task) ->
static member AwaitTaskCorrect(task : Task) : Async<unit> =
Async.FromContinuations(fun (sc, ec, _) ->
task.ContinueWith(fun (task : Task) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(System.Threading.Tasks.TaskCanceledException())
ec(TaskCanceledException())
else
sc ())
|> ignore)
Expand Down Expand Up @@ -98,4 +100,4 @@ module Regex =
/// Active pattern for branching on successful regex matches
let (|Match|_|) (pattern : string) (input : string) =
let m = (mkRegex pattern).Match input
if m.Success then Some m else None
if m.Success then Some m else None
4 changes: 3 additions & 1 deletion src/Equinox.Core/Retry.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Retry =
/// (until `attempts` exhausted) on an exception matching the `filter`, waiting for the timespan chosen by `backoff` before retrying
let withBackoff (maxAttempts : int) (backoff : int -> System.TimeSpan option) (f : int -> Async<'a>) : Async<'a> =
if maxAttempts < 1 then raise (invalidArg "maxAttempts" "Should be >= 1")

let rec go attempt = async {
try
let! res = f attempt
Expand All @@ -21,4 +22,5 @@ module Retry =
| Some timespan -> do! Async.Sleep (int timespan.TotalMilliseconds)
| None -> ()
return! go (attempt + 1) }
go 1

go 1
4 changes: 3 additions & 1 deletion src/Equinox.Core/Stream.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'e
interface IStream<'event, 'state> with
member __.Load log =
category.Load(log, streamId, opt)

member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
category.TrySync(log, token, originState, events, context)

Expand All @@ -19,7 +20,8 @@ type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>,
match preloadedTokenAndState with
| Some value -> async { preloadedTokenAndState <- None; return value }
| None -> inner.Load log

member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
inner.TrySync(log, token, originState, events)

let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _
let ofMemento (memento : StreamToken * 'state) (inner : IStream<'event,'state>) : IStream<'event, 'state> = InitializedStream(inner, memento) :> _
13 changes: 7 additions & 6 deletions src/Equinox.Core/Types.fs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ open System.Diagnostics
type ICategory<'event, 'state, 'streamId, 'context> =
/// Obtain the state from the target stream
abstract Load : log: ILogger * 'streamId * ResolveOption option -> Async<StreamToken * 'state>

/// Given the supplied `token`, attempt to sync to the proposed updated `state'` by appending the supplied `events` to the underlying stream, yielding:
/// - Written: signifies synchronization has succeeded, implying the included StreamState should now be assumed to be the state of the stream
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
Expand All @@ -23,7 +24,7 @@ type StopwatchInterval (startTicks : int64, endTicks : int64) =

// Converts a tick count as measured by stopwatch into a TimeSpan value
let timeSpanFromStopwatchTicks (ticks : int64) =
let ticksPerSecond = double System.Diagnostics.Stopwatch.Frequency
let ticksPerSecond = double Stopwatch.Frequency
let totalSeconds = double ticks / ticksPerSecond
TimeSpan.FromSeconds totalSeconds

Expand All @@ -39,9 +40,9 @@ type Stopwatch =
/// <param name="f">Function to execute & time.</param>
[<DebuggerStepThrough>]
static member Time(f : unit -> 'T) : StopwatchInterval * 'T =
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let startTicks = Stopwatch.GetTimestamp()
let result = f ()
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let endTicks = Stopwatch.GetTimestamp()
let tr = StopwatchInterval(startTicks, endTicks)
tr, result

Expand All @@ -51,9 +52,9 @@ type Stopwatch =
/// <param name="f">Function to execute & time.</param>
[<DebuggerStepThrough>]
static member Time(f : Async<'T>) : Async<StopwatchInterval * 'T> = async {
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let startTicks = Stopwatch.GetTimestamp()
let! result = f
let endTicks = System.Diagnostics.Stopwatch.GetTimestamp()
let endTicks = Stopwatch.GetTimestamp()
let tr = StopwatchInterval(startTicks, endTicks)
return tr, result
}
}
Loading

0 comments on commit 278a7b0

Please sign in to comment.