Skip to content

Commit

Permalink
feat(Cosmos): Use Azure.Cosmos STJ impl (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jan 31, 2025
1 parent 58dc069 commit 831b69e
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `Equinox.CosmosStore`: Use `Microsoft.Azure.Cosmos` integrated `System.Text.Json` support; added ability to specify `serializerOptions` [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore`: Group metrics by Container Name [#449](https://github.com/jet/equinox/pull/449)
- `Equinox.CosmosStore`: Group metrics by Category; split out `Tip` activity [#453](https://github.com/jet/equinox/pull/453)
- `Equinox.CosmosStore`: Support Ingesting unfolds [#460](https://github.com/jet/equinox/pull/460)
Expand All @@ -21,6 +22,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Equinox.*Store`,`Equinox.*Store.Prometheus`: Pin `Equinox` dependencies to `[4.0.0, 5.0.0)`] [#448](https://github.com/jet/equinox/pull/448)
- `Equinox.CosmosStore`: Update `System.Text.Json` dep to `6.0.10` per [CVE-2024-43485](https://github.com/advisories/GHSA-8g4q-xg66-9fp4) [#470](https://github.com/jet/equinox/pull/470)
- `Equinox.CosmosStore`: Minimum `Microsoft.Azure.Cosmos` requirement updated to `3.43.1` to avail of integrated `System.Text.Json` support [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.CosmosStore.CosmosStoreConnector`: Removed mandatory `requestTimeout` argument [#467](https://github.com/jet/equinox/pull/467)
- `Equinox.MessageDb`: Up min `Npgsql` to v `7.0.7` as `7.0.0` is on CVE blacklist

### Removed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Hosts generic utility types frequently useful alongside Equinox: [`TaskCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/TaskCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.35.4`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox`, `Equinox`, `Microsoft.Azure.Cosmos` >= `3.43.1`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
- `Equinox.DynamoStore` [![DynamoStore NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.svg)](https://www.nuget.org/packages/Equinox.DynamoStore/): Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after `Equinox.CosmosStore`. ([depends](https://www.fuget.org/packages/Equinox.DynamoStore) on `Equinox`, `FSharp.AWS.DynamoDB` >= `0.12.0-beta`, `FSharp.Control.TaskSeq`)
- `Equinox.DynamoStore.Prometheus` [![DynamoStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.DynamoStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.DynamoStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.DynamoStore`, `prometheus-net >= 3.6.0`)
Expand Down
2 changes: 1 addition & 1 deletion samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ module Cosmos =
// - In hot-warm scenarios, the Archive Container will frequently be within the same account and hence can share a CosmosClient
// For these typical purposes, CosmosStoreClient.Connect should be used to establish the Client, not custom wiring as we have here
let createConnector (a: Arguments) connectionString =
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Timeout, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode)
CosmosStoreConnector(Discovery.ConnectionString connectionString, a.Retries, a.MaxRetryWaitTime, ?mode = a.Mode, timeout = a.Timeout)
let connect (log: ILogger) (a: Arguments) =
let primaryConnector, primaryDatabase, primaryContainer as primary = createConnector a a.Connection, a.Database, a.Container
logContainer log "Primary" (a.Mode, primaryConnector.Endpoint, primaryDatabase, primaryContainer)
Expand Down
79 changes: 48 additions & 31 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ type EventBody = JsonElement

/// A single Domain Event from the array held in a Batch
[<NoEquality; NoComparison>]
type Event = // TODO for STJ v5: All fields required unless explicitly optional
type Event =
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: DateTimeOffset // ISO 8601
t: DateTimeOffset // Will be saved ISO 8601 formatted

/// The Case (Event Type); used to drive deserialization
c: string // required
/// The Case (Event Type); used to drive deserialization (Required)
c: string

/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB
d: EventBody // TODO for STJ v5: Required, but can be null so Nullary cases can work
/// Event body, as UTF-8 encoded json ready to be injected into the JSON being rendered for CosmosDB
d: EventBody // Can be JSON Null for Nullary cases

/// Optional metadata, as UTF-8 encoded json, ready to emit directly
m: EventBody // TODO for STJ v5: Optional, not serialized if missing
m: EventBody

/// Optional correlationId
correlationId: string // TODO for STJ v5: Optional, not serialized if missing
correlationId: string

/// Optional causationId
causationId: string } // TODO for STJ v5: Optional, not serialized if missing
causationId: string }
interface IEventData<EventBody> with
member x.EventType = x.c
member x.Data = x.d
Expand All @@ -42,20 +42,20 @@ type Event = // TODO for STJ v5: All fields required unless explicitly optional

/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
[<NoEquality; NoComparison>]
type Batch = // TODO for STJ v5: All fields required unless explicitly optional
type Batch =
{ /// CosmosDB-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// Technically not required if running in single partition mode, but being over simplistic would be more confusing
p: string // "{streamName}"

/// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string
/// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it
/// NB Tip uses a well known value here while it's actively 'open'
/// There's no way to usefully ORDER BY on it; hence we have i shadowing it and use that instead
/// NB Tip uses a well known value ("-1") for the `id`; that document lives for the life of the stream
id: string // "{index}"

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64 // {index}
Expand Down Expand Up @@ -84,31 +84,31 @@ type Unfold =
/// The Case (Event Type) of this compaction/snapshot, used to drive deserialization
c: string // required

/// Event body - Json -> Deflate -> Base64 -> JsonElement
/// Event body - JSON -> Deflate -> Base64 -> JsonElement
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
d: EventBody // required

/// Optional metadata, same encoding as `d` (can be null; not written if missing)
[<Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
m: EventBody } // TODO for STJ v5: Optional, not serialized if missing
m: EventBody }
// Arrays are not indexed by default. 1. enable filtering by `c`ase 2. index uncompressed fields within unfolds for filtering
static member internal IndexedPaths = [| "/u/[]/c/?"; "/u/[]/d/*" |]

/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
/// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc
[<NoEquality; NoComparison>]
type Tip = // TODO for STJ v5: All fields required unless explicitly optional
type Tip =
{ /// Partition key, as per Batch
p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
p: string // "{streamName}"

/// Document Id within partition, as per Batch
id: string // "{-1}" - Well known IdConstant used while this remains the pending batch
id: string // "{-1}" - Well known Id Constant used for the tail document (the only one that get mutated)

/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
_etag: string // TODO for STJ v5: Optional, not serialized if missing
_etag: string

/// base 'i' value for the Events held herein
i: int64
Expand Down Expand Up @@ -1161,14 +1161,17 @@ type DiscoveryMode =
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container
type CosmosClientFactory(options) =
[<Obsolete "Will be removed in V5; please use the overload that includes `serializerOptions`">]
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
UseSystemTextJsonSerializerWithOptions = JsonSerializerOptions())
/// Default when rendering/parsing Batch/Tip/Event/Unfold - omitting null values
static member val DefaultJsonSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = Serialization.JsonIgnoreCondition.WhenWritingNull)
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
Expand Down Expand Up @@ -1200,28 +1203,42 @@ type Discovery =
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
( // CosmosDB endpoint/credentials specification.
type CosmosStoreConnector(discovery: Discovery, factory: CosmosClientFactory) =
let discoveryMode = discovery.ToDiscoveryMode()
new(// CosmosDB endpoint/credentials specification.
discovery: Discovery,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 6s
// NOTE Per CosmosDB Client guidance, it's recommended to leave this at its default
[<O; D null>] ?timeout: TimeSpan,
// System.Text.Json SerializerOptions used for rendering Batches, Events and Unfolds internally
// NOTE as Events and/or Unfolds are serialized to `JsonElement`, there should rarely be a need to control the options at this level
[<O; D null>] ?serializerOptions: System.Text.Json.JsonSerializerOptions,
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let factory =
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
let o =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
UseSystemTextJsonSerializerWithOptions = defaultArg serializerOptions CosmosClientFactory.DefaultJsonSerializerOptions)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
timeout |> Option.iter (fun x -> o.RequestTimeout <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o
CosmosStoreConnector(discovery, CosmosClientFactory o)

[<Obsolete "For backcompat only; will be removed in V5">]
new(discovery, requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests,
?mode, ?defaultConsistencyLevel, ?customize) =
CosmosStoreConnector(discovery, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?defaultConsistencyLevel = defaultConsistencyLevel, timeout = requestTimeout, ?customize = customize)

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options
Expand Down
1 change: 1 addition & 0 deletions src/Equinox.CosmosStore/CosmosStoreSerialization.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module JsonElement =
if value.ValueKind = JsonValueKind.Null then value
else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement

[<System.Obsolete "Unused internal type; will be removed in V5">]
type CosmosJsonSerializer(options : JsonSerializerOptions) =
inherit Microsoft.Azure.Cosmos.CosmosSerializer()

Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.CosmosStore/Equinox.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<PackageReference Include="FSharp.Core" Version="6.0.7" ExcludeAssets="contentfiles" />

<PackageReference Include="FSharp.Control.TaskSeq" Version="0.4.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.43.1" />
<PackageReference Include="System.Text.Json" Version="6.0.10" />
</ItemGroup>

Expand Down
3 changes: 1 addition & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ let discoverConnection () =

let createConnector (log: Serilog.ILogger) =
let name, discovery = discoverConnection ()
let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3.,
maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.)
let connector = CosmosStoreConnector(discovery, 9, TimeSpan.FromMinutes 1.)
log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint)
connector

Expand Down

0 comments on commit 831b69e

Please sign in to comment.