Skip to content

Commit

Permalink
Cosmos.Prometheus Backport from master #266 (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Dec 3, 2020
1 parent 9b27c23 commit a7d5115
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 158 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Prometheus integration package as `master` [#266](https://github.com/jet/equinox/pull/266) [#267](https://github.com/jet/equinox/pull/267)

### Changed
### Removed
### Fixed
Expand Down
6 changes: 6 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Cosmos.Prometheus", "src\Equinox.Cosmos.Prometheus\Equinox.Cosmos.Prometheus.fsproj", "{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -182,6 +184,10 @@ Global
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU
{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BB1CB46F-B3A7-4F1B-BB03-FE74F93ECFC4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<Exec Command="dotnet pack src/Equinox $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.Core $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.Cosmos $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.Cosmos.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.MemoryStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.SqlStreamStore $(Cfg) $(PackOptions)" />
Expand Down
30 changes: 15 additions & 15 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ module EquinoxCosmosInterop =
[<NoEquality; NoComparison>]
type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
let flatten (evt : Log.Event) : FlatMetric =
let flatten (evt : Log.Metric) : FlatMetric =
let action, metric, batches, ru =
match evt with
| Log.Tip m -> "CosmosTip", m, None, m.ru
| Log.TipNotFound m -> "CosmosTip404", m, None, m.ru
| Log.TipNotModified m -> "CosmosTip302", m, None, m.ru
| Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
| Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
| Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru
| Log.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
| Log.Delete m -> "CosmosDelete", m, None, m.ru
| Log.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
| Log.Metric.Tip m -> "CosmosTip", m, None, m.ru
| Log.Metric.TipNotFound m -> "CosmosTip404", m, None, m.ru
| Log.Metric.TipNotModified m -> "CosmosTip302", m, None, m.ru
| Log.Metric.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
| Log.Metric.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Metric.QueryResponse(Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Metric.QueryResponse (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
| Log.Metric.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.Metric.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.Metric.SyncResync m -> "CosmosSyncResync", m, None, m.ru
| Log.Metric.PruneResponse m -> "CosmosPruneResponse", m, None, m.ru
| Log.Metric.Delete m -> "CosmosDelete", m, None, m.ru
| Log.Metric.Prune (events, m) -> "CosmosPrune", m, Some events, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }

Expand All @@ -65,7 +65,7 @@ type SerilogMetricsExtractor(emit : string -> unit) =
logEvent.Properties
|> Seq.tryPick (function
| KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Event as m)) -> Some <| Choice1Of3 (k,m)
| KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as m)) -> Some <| Choice2Of3 (k,m)
| KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Metric as m)) -> Some <| Choice2Of3 (k,m)
| _ -> None)
|> Option.defaultValue (Choice3Of3 ())
let handleLogEvent logEvent =
Expand Down
2 changes: 0 additions & 2 deletions samples/Tutorial/Counter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ let store = Equinox.MemoryStore.VolatileStore()
let _ = store.Committed.Subscribe(fun (s, xs) -> logEvents s xs)
let codec = FsCodec.Box.Codec.Create()
let resolve = Equinox.MemoryStore.Resolver(store, codec, fold, initial).Resolve
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let service = Service(log, resolve, maxAttempts=3)
let clientId = "ClientA"
service.Read(clientId) |> Async.RunSynchronously
Expand Down
116 changes: 116 additions & 0 deletions src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
namespace Equinox.CosmosStore.Prometheus

module private Impl =

let baseName stat = "equinox_" + stat
let baseDesc desc = "Equinox CosmosDB " + desc

module private Histograms =

let labelNames = [| "facet"; "op"; "app"; "db"; "con"; "cat" |]
let private mkHistogram (cfg : Prometheus.HistogramConfiguration) name desc =
let h = Prometheus.Metrics.CreateHistogram(name, desc, cfg)
fun (facet : string, op : string) app (db, con, cat : string) s ->
h.WithLabels(facet, op, app, db, con, cat).Observe(s)
// Given we also have summary metrics with equivalent labels, we focus the bucketing on LAN latencies
let private sHistogram =
let sBuckets = [| 0.0005; 0.001; 0.002; 0.004; 0.008; 0.016; 0.5; 1.; 2.; 4.; 8. |]
let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames)
mkHistogram sCfg
let private ruHistogram =
let ruBuckets = Prometheus.Histogram.ExponentialBuckets(1., 2., 11) // 1 .. 1024
let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames)
mkHistogram ruCfg
let sAndRuPair stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = sHistogram (baseName + "_seconds") (baseDesc + " latency")
let observeRu = ruHistogram (baseName + "_ru") (baseDesc + " charge")
fun (facet, op) app (db, con, cat, s : System.TimeSpan, ru) ->
observeS (facet, op) app (db, con, cat) s.TotalSeconds
observeRu (facet, op) app (db, con, cat) ru

module private Summaries =

let labelNames = [| "facet"; "app"; "db"; "con" |]
let private mkSummary (cfg : Prometheus.SummaryConfiguration) name desc =
let s = Prometheus.Metrics.CreateSummary(name, desc, cfg)
fun (facet : string) app (db, con) o -> s.WithLabels(facet, app, db, con).Observe(o)
let config =
let inline qep q e = Prometheus.QuantileEpsilonPair(q, e)
let objectives = [| qep 0.50 0.05; qep 0.95 0.01; qep 0.99 0.01 |]
Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames, MaxAge = System.TimeSpan.FromMinutes 1.)
let sAndRuPair stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = mkSummary config (baseName + "_seconds") (baseDesc + " latency")
let observeRu = mkSummary config (baseName + "_ru") (baseDesc + " charge")
fun facet app (db, con, s : System.TimeSpan, ru) ->
observeS facet app (db, con) s.TotalSeconds
observeRu facet app (db, con) ru

module private Counters =

let labelNames = [| "facet"; "op"; "outcome"; "app"; "db"; "con"; "cat" |]
let private mkCounter (cfg : Prometheus.CounterConfiguration) name desc =
let h = Prometheus.Metrics.CreateCounter(name, desc, cfg)
fun (facet : string, op : string, outcome : string) app (db, con, cat) c ->
h.WithLabels(facet, op, outcome, app, db, con, cat).Inc(c)
let config = Prometheus.CounterConfiguration(LabelNames = labelNames)
let total stat desc =
let name = Impl.baseName (stat + "_total")
let desc = Impl.baseDesc desc
mkCounter config name desc
let eventsAndBytesPair stat desc =
let observeE = total (stat + "_events") (desc + "Events")
let observeB = total (stat + "_bytes") (desc + "Bytes")
fun ctx app (db, con, cat, e, b) ->
observeE ctx app (db, con, cat) e
match b with None -> () | Some b -> observeB ctx app (db, con, cat) b

module private Stats =

let opHistogram = Histograms.sAndRuPair "op" "Operation"
let roundtripHistogram = Histograms.sAndRuPair "roundtrip" "Fragment"
let opSummary = Summaries.sAndRuPair "op_summary" "Operation Summary"
let roundtripSummary = Summaries.sAndRuPair "roundtrip_summary" "Fragment Summary"
let payloadCounters = Counters.eventsAndBytesPair "payload" "Payload, "
let cacheCounter = Counters.total "cache" "Cache"

let observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) =
opHistogram (facet, op) app (db, con, cat, s, ru)
opSummary facet app (db, con, s, ru)
let observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, count, bytes) =
observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru)
payloadCounters (facet, op, outcome) app (db, con, cat, float count, if bytes = -1 then None else Some (float bytes))

let inline (|CatSRu|) ({ interval = i; ru = ru } : Equinox.Cosmos.Store.Log.Measurement as m) =
let cat, _id = FsCodec.StreamName.splitCategoryAndId (FSharp.UMX.UMX.tag m.stream)
m.database, m.container, cat, i.Elapsed, ru
let observeRes (facet, _op as stat) app (CatSRu (db, con, cat, s, ru)) =
roundtripHistogram stat app (db, con, cat, s, ru)
roundtripSummary facet app (db, con, s, ru)
let observe_ stat app (CatSRu (db, con, cat, s, ru)) =
observeLatencyAndCharge stat app (db, con, cat, s, ru)
let observe (facet, op, outcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
let observeTip (facet, op, outcome, cacheOutcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
cacheCounter (facet, op, cacheOutcome) app (db, con, cat) 1.

open Equinox.Cosmos.Store.Log

type LogSink(app) =
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| MetricEvent cm -> cm |> function
| Op (Operation.Tip, m) -> Stats.observeTip ("query", "tip", "ok", "200") app m
| Op (Operation.Tip404, m) -> Stats.observeTip ("query", "tip", "ok", "404") app m
| Op (Operation.Tip302, m) -> Stats.observeTip ("query", "tip", "ok", "302") app m
| Op (Operation.Query, m) -> Stats.observe ("query", "query", "ok") app m
| QueryRes (_direction, m) -> Stats.observeRes ("query", "queryPage") app m
| Op (Operation.Write, m) -> Stats.observe ("transact", "sync", "ok") app m
| Op (Operation.Conflict, m) -> Stats.observe ("transact", "conflict", "conflict") app m
| Op (Operation.Resync, m) -> Stats.observe ("transact", "resync", "conflict") app m
| Op (Operation.Prune, m) -> Stats.observe_ ("prune", "pruneQuery") app m
| PruneRes ( m) -> Stats.observeRes ("prune", "pruneQueryPage") app m
| Op (Operation.Delete, m) -> Stats.observe ("prune", "delete", "ok") app m
| _ -> ()
30 changes: 30 additions & 0 deletions src/Equinox.Cosmos.Prometheus/Equinox.Cosmos.Prometheus.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
<WarningLevel>5</WarningLevel>
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<DefineConstants Condition=" '$(TargetFramework)' == 'net461' ">$(DefineConstants);NET461</DefineConstants>
</PropertyGroup>

<ItemGroup>
<Compile Include="CosmosPrometheus.fs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Equinox.Cosmos\Equinox.Cosmos.fsproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.0.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="prometheus-net" Version="3.6.0" />
</ItemGroup>

</Project>
Loading

0 comments on commit a7d5115

Please sign in to comment.