Skip to content

Commit

Permalink
conn wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 13, 2023
1 parent 47e91a4 commit cc3f1e1
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 97 deletions.
11 changes: 4 additions & 7 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,6 @@ following key benefits:
### Example Code

```fsharp
open Equinox.CosmosStore.Core
// open MyCodecs.Json // example of using specific codec which can yield UTF-8
// byte arrays from a type using `Json.toBytes` via Fleece
Expand All @@ -1872,22 +1871,20 @@ type EventData with
// Load connection string from your Key Vault (example here is the CosmosDB
// simulator's well known key)
// see https://github.com/jet/equinox-provisioning-cosmosdb
// see https://github.com/jet/equinox#provisioning-cosmosdb
let connectionString: string =
"AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
// Forward to Log (you can use `Log.Logger` and/or `Log.ForContext` if your app
// uses Serilog already)
let outputLog = LoggerConfiguration().WriteTo.NLog().CreateLogger()
// Forward to Log (use `Log.Logger` if your app already uses Serilog)
let outputLog = LoggerConfiguration().WriteTo.Console().CreateLogger()
// Serilog has a `ForContext<T>()`, but if you are using a `module` for the
// wiring, you might create a tagged logger like this:
let gatewayLog =
outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")
let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector: Equinox.CosmosStore.CosmosStoreConnector =
CosmosStoreConnector(
discovery,
Equinox.CosmosStore.Discovery.ConnectionString connectionString,
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
Expand Down
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,21 +665,27 @@ For more complete instructions, follow https://developers.eventstore.com/server/
#### Using Azure Cosmos DB Service
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```bash
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```
#### Using Cosmos Emulator on an Intel Mac
NOTE There's [no Apple Silicon emulator available as yet](https://github.com/Azure/azure-cosmos-db-emulator-docker/issues/54#issuecomment-1399067365).
NOTE Have not tested with the Windows Emulator, but it should work with analogous steps.
docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```bash
# magic connection string value that CosmosStoreConnector supports to a) avoid having to copy values b) having to add code to trust cert
export EQUINOX_COSMOS_CONNECTION="TrustLocalEmulator=true"
docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```
### Provisioning SqlStreamStore
Expand Down
11 changes: 8 additions & 3 deletions docker-compose-cosmos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

set -e # exit on any non-zero exit code

tmpf=$(mktemp)
curl -k https://localhost:8081/_explorer/emulator.pem > $tmpf
sudo security add-trusted-cert -d -r trustRoot -k ~/Library/Keychains/login.keychain $tmpf
if [ "$EQUINOX_COSMOS_CONNECTION" == "TrustLocalEmulator=true" ]; then
echo "Skipping downloading/trusting CosmosDb Emulator Certificate as \$EQUINOX_COSMOS_CONNECTION == \"TrustLocalEmulator=true\""
else
echo "Downloading/trusting CosmosDb Emulator Certificate as \$EQUINOX_COSMOS_CONNECTION is not \"TrustLocalEmulator=true\""
tmpf=$(mktemp)
curl -k https://localhost:8081/_explorer/emulator.pem > $tmpf
sudo security add-trusted-cert -d -r trustRoot -k ~/Library/Keychains/login.keychain $tmpf
fi

dotnet run --project tools/Equinox.Tool -- init cosmos
dotnet run --project tools/Equinox.Tool -- init cosmos -c equinox-test-archive
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ services:
restart: unless-stopped
environment:
- AZURE_COSMOS_EMULATOR_PARTITION_COUNT=3
- AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE=127.0.0.1
ports:
- "8081:8081" # so docker-cosmos-init.sh can get the cert and/or humans can use https://localhost:8081/_explorer/index.html
- "10250-10256:10250-10256" # tests connect using Direct mode
- "10250-10255:10250-10255" # tests connect using Direct mode

equinox-mssql:
container_name: equinox-mssql
Expand Down
142 changes: 70 additions & 72 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,85 +1102,74 @@ type internal StoreCategory<'event, 'state, 'req>

module ConnectionString =

let [<Literal>] DefaultEmulatorEndpoint = "https://localhost:8081"
let [<Literal>] DefaultEmulatorAccountKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
let (|AccountEndpoint|) connectionString =
match System.Data.Common.DbConnectionStringBuilder(ConnectionString = connectionString).TryGetValue "AccountEndpoint" with
| true, (:? string as s) when not (String.IsNullOrEmpty s) -> s
| _ -> invalidOp "Connection string does not contain an \"AccountEndpoint\""

[<RequireQualifiedAccess; NoComparison>]
type DiscoveryMode =
| AccountUriAndKey of accountUri: string * key: string
| ConnectionString of connectionString: string
member x.Endpoint = x |> function
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosClientFactory(options) =
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: DiscoveryMode) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(uri, key, x.Options)
| DiscoveryMode.ConnectionString cs -> new CosmosClient(cs, x.Options)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: DiscoveryMode, containers, ct) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(uri, key, containers, x.Options, ct)
| DiscoveryMode.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)

namespace Equinox.CosmosStore

open Equinox.Core
open Equinox.CosmosStore.Core
open Microsoft.Azure.Cosmos
open System

module Discovery =

/// <summary>Use <c>Equinox.CosmosStore.Core.ConnectionString.defaultEmulatorEndpoint</c> and <c>Equinox.CosmosStore.Core.ConnectionString.defaultEmulatorAccountKey</c>; force <c>bypassCertificateValidation</c>.</summary>
let [<Literal>] TrustLocalEmulatorConnectionString = "TrustLocalEmulator=true"

[<RequireQualifiedAccess; NoComparison>]
type Discovery =
/// Separated Account Uri and Key (for interop with previous versions)
| AccountUriAndKey of accountUri: Uri * key: string
/// Cosmos SDK Connection String
/// <summary>Cosmos SDK Connection String<nbr/>
/// NOTE the magic value <c>TrustLocalEmulator=true</c> overrides the mode to <c>Discovery.TrustLocalEmulator</c></summary>
| ConnectionString of connectionString: string
member x.Endpoint: Uri = x |> function
| Discovery.AccountUriAndKey (u, _k) -> u
| Discovery.ConnectionString (ConnectionString.AccountEndpoint e) -> Uri e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
[<System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)>]
type CosmosClientFactory
( // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

/// CosmosClientOptions for this CosmosClientFactory as configured
member val Options =
let co = CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(System.Text.Json.JsonSerializerOptions()))
match mode with
| None | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct
| Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // only supports Https
match gatewayModeMaxConnectionLimit with
| Some _ when co.ConnectionMode = ConnectionMode.Direct -> invalidArg "gatewayModeMaxConnectionLimit" "Not admissible in Direct mode"
| x -> if co.ConnectionMode = ConnectionMode.Gateway then co.GatewayModeMaxConnectionLimit <- defaultArg x 50
match defaultConsistencyLevel with
| Some x -> co.ConsistencyLevel <- x
| None -> ()
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if bypassCertificateValidation = Some true && co.ConnectionMode = ConnectionMode.Gateway then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
co.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
co

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: Discovery) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(string uri, key, x.Options)
| Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: Discovery, containers, ct) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct)
| Discovery.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)
/// <summary>Use <c>Equinox.CosmosStore.Core.ConnectionString.DefaultEmulatorEndpoint</c> and <c>Equinox.CosmosStore.Core.ConnectionString.DefaultEmulatorAccountKey</c>; force <c>bypassCertificateValidation</c>.<br/>
/// See https://learn.microsoft.com/en-us/azure/cosmos-db/local-emulator for details.</summary>
| TrustLocalEmulator
member x.ToDiscoveryMode() = x |> function
| Discovery.AccountUriAndKey (u, k) -> DiscoveryMode.AccountUriAndKey (string u, k)
| Discovery.ConnectionString Discovery.TrustLocalEmulatorConnectionString
| Discovery.TrustLocalEmulator -> DiscoveryMode.AccountUriAndKey (Core.ConnectionString.DefaultEmulatorEndpoint, Core.ConnectionString.DefaultEmulatorAccountKey)
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
( // CosmosDB endpoint/credentials specification.
// NOTE using the Default Local Emulator endpoint causes bypassCertificateValidation to default to true
discovery: Discovery,
// Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
Expand All @@ -1191,32 +1180,41 @@ type CosmosStoreConnector
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

// Inhibits certificate verification when set to `true`.
// Defaults to `true` when targeting Local Emulator, otherwise false
[<O; D null>] ?bypassCertificateValidation: bool,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let bypassCertificateValidation =
defaultArg bypassCertificateValidation false
|| (match discoveryMode with DiscoveryMode.AccountUriAndKey (ConnectionString.DefaultEmulatorEndpoint, _) -> true | _ -> false)
let factory =
CosmosClientFactory
( requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?gatewayModeMaxConnectionLimit = gatewayModeMaxConnectionLimit, ?defaultConsistencyLevel = defaultConsistencyLevel,
?bypassCertificateValidation = bypassCertificateValidation)
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if bypassCertificateValidation then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
o.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options

/// The Endpoint Uri for the target CosmosDB
member _.Endpoint = discovery.Endpoint
member val Endpoint = discoveryMode.Endpoint |> Uri

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>Connect</c> and/or <c>CreateAndInitialize</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member _.CreateUninitialized() = factory.CreateUninitialized(discovery)
member _.CreateUninitialized() = factory.CreateUninitialized(discoveryMode)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discovery, containers, ct)
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitialize(databaseAndContainerIds: struct (string * string)[]) =
Async.call (fun ct -> x.CreateAndInitializeAsync(databaseAndContainerIds, ct))
Expand All @@ -1226,7 +1224,7 @@ type CosmosStoreConnector

/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.ConnectAsync(containers, ct): Task<CosmosStoreClient> = task {
let! cosmosClient = factory.CreateAndInitializeAsync(discovery, containers, ct)
let! cosmosClient = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
return CosmosStoreClient(cosmosClient) }
/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseAndContainerIds: struct (string * string)[]) =
Expand Down
Loading

0 comments on commit cc3f1e1

Please sign in to comment.