From abb33668c78e566b10f14a686938c3c3ff248598 Mon Sep 17 00:00:00 2001 From: John Belcher Date: Wed, 17 Nov 2021 17:43:51 -0500 Subject: [PATCH] Equinox.Tool: Add support for Cosmos Autoscaling (#302) Adds an `--autoscale`/`-A` switch to the `eqx` tool to enable support for the Autoscaling feature of Cosmos Containers and Databases. Specifying this flag causes the `--rus` parameter to be interpreted as "Maximum RU/s" and changes the default value to 4000 RU/s, the minimum value allowed by Cosmos. --- CHANGELOG.md | 1 + src/Equinox.CosmosStore/CosmosStore.fs | 60 ++++++++++++++------------ tools/Equinox.Tool/Program.fs | 28 ++++++------ 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62f70924d..1c408067e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - Stores: Expose `.Log.PropertyTag` Literals to enable log filtering [#298](https://github.com/jet/equinox/pull/298~~~~) +- `Equinox.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#302](https://github.com/jet/equinox/pull/302) :pray: [@belcher-rok](https://github.com/belcher-rok) ### Changed ### Removed diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 92a1b3c31..4e5a86e9c 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -565,43 +565,49 @@ module internal Sync = module Initialization = - type [] Provisioning = Container of rus: int | Database of rus: int | Serverless - let adjustOfferC (c:Container) (rus : int) = async { + // Note: the Cosmos SDK does not (currently) support changing the Throughput mode of an existing Database or Container. + type [] Throughput = Manual of rus: int | Autoscale of maxRus: int + type [] Provisioning = Container of Throughput | Database of Throughput | Serverless + let (|ThroughputProperties|) = function + | Throughput.Manual rus -> ThroughputProperties.CreateManualThroughput(rus) + | Throughput.Autoscale maxRus -> ThroughputProperties.CreateAutoscaleThroughput(maxRus) + let (|MaybeThroughputProperties|) = Option.map (function ThroughputProperties tp -> tp) >> Option.toObj + let adjustOfferC (c:Container) (ThroughputProperties tp) = async { let! ct = Async.CancellationToken - let! _ = c.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } - let adjustOfferD (d:Database) (rus : int) = async { + let! _ = c.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } + let adjustOfferD (d : Database) (ThroughputProperties tp) = async { let! ct = Async.CancellationToken - let! _ = d.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } - let private createDatabaseIfNotExists (client:CosmosClient) dName maybeRus = async { + let! _ = d.ReplaceThroughputAsync(tp, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } + let private createDatabaseIfNotExists (client : CosmosClient) dName (MaybeThroughputProperties tpOrNull) = async { let! ct = Async.CancellationToken - let! dbr = client.CreateDatabaseIfNotExistsAsync(id = dName, throughput = Option.toNullable maybeRus, cancellationToken = ct) |> Async.AwaitTaskCorrect + let! dbr = client.CreateDatabaseIfNotExistsAsync(dName, throughputProperties = tpOrNull, cancellationToken = ct) |> Async.AwaitTaskCorrect return dbr.Database } - let private createOrProvisionDatabase (client:CosmosClient) dName mode = async { + let private createOrProvisionDatabase (client : CosmosClient) dName mode = async { match mode with - | Provisioning.Database rus -> - let! db = createDatabaseIfNotExists client dName (Some rus) - do! adjustOfferD db rus + | Provisioning.Database throughput -> + let! db = createDatabaseIfNotExists client dName (Some throughput) + do! adjustOfferD db throughput | Provisioning.Container _ | Provisioning.Serverless -> let! _ = createDatabaseIfNotExists client dName None in () } - let private createContainerIfNotExists (d:Database) (cp:ContainerProperties) maybeRus = async { + let private createContainerIfNotExists (d : Database) (cp : ContainerProperties) (MaybeThroughputProperties tpOrNull) = async { let! ct = Async.CancellationToken - let! c = d.CreateContainerIfNotExistsAsync(cp, throughput = Option.toNullable maybeRus, cancellationToken = ct) |> Async.AwaitTaskCorrect + let! c = d.CreateContainerIfNotExistsAsync(cp, throughputProperties = tpOrNull, cancellationToken = ct) |> Async.AwaitTaskCorrect return c.Container } - let private createOrProvisionContainer (d:Database) (cp:ContainerProperties) mode = async { + let private createOrProvisionContainer (d : Database) (cp : ContainerProperties) mode = async { match mode with - | Provisioning.Container rus -> - let! c = createContainerIfNotExists d cp (Some rus) - do! adjustOfferC c rus + | Provisioning.Container throughput -> + let! c = createContainerIfNotExists d cp (Some throughput) + do! adjustOfferC c throughput return c | Provisioning.Database _ | Provisioning.Serverless -> return! createContainerIfNotExists d cp None } - let private createStoredProcIfNotExists (c:Container) (name, body): Async = async { + let private createStoredProcIfNotExists (c : Container) (name, body): Async = async { try let! r = c.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(id = name, body = body)) |> Async.AwaitTaskCorrect return r.RequestCharge with (:? Microsoft.Azure.Cosmos.CosmosException as ce) when ce.StatusCode = System.Net.HttpStatusCode.Conflict -> return ce.RequestCharge } let private mkContainerProperties containerName partitionKeyFieldName = ContainerProperties(id = containerName, partitionKeyPath = sprintf "/%s" partitionKeyFieldName) - let private createBatchAndTipContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async = + let private createBatchAndTipContainerIfNotExists (client : CosmosClient) (dName, cName) mode : Async = let def = mkContainerProperties cName Batch.PartitionKeyField def.IndexingPolicy.IndexingMode <- IndexingMode.Consistent def.IndexingPolicy.Automatic <- true @@ -611,27 +617,27 @@ module Initialization = // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors for k in Batch.IndexedFields do def.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = sprintf "/%s/?" k)) createOrProvisionContainer (client.GetDatabase dName) def mode - let createSyncStoredProcIfNotExists (log: ILogger option) container = async { - let! t, ru = createStoredProcIfNotExists container (SyncStoredProc.name,SyncStoredProc.body) |> Stopwatch.Time + let createSyncStoredProcIfNotExists (log : ILogger option) container = async { + let! t, ru = createStoredProcIfNotExists container (SyncStoredProc.name, SyncStoredProc.body) |> Stopwatch.Time match log with | None -> () | Some log -> log.Information("Created stored procedure {procName} in {ms}ms {ru}RU", SyncStoredProc.name, (let e = t.Elapsed in e.TotalMilliseconds), ru) } - let private createAuxContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async = + let private createAuxContainerIfNotExists (client: CosmosClient) (dName, cName) mode : Async = let def = mkContainerProperties cName "id" // as per Cosmos team, Partition Key must be "/id" // TL;DR no indexing of any kind; see https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/issues/142 def.IndexingPolicy.Automatic <- false def.IndexingPolicy.IndexingMode <- IndexingMode.None createOrProvisionContainer (client.GetDatabase dName) def mode - let init log (client: CosmosClient) (dName,cName) mode skipStoredProc = async { + let init log (client : CosmosClient) (dName, cName) mode skipStoredProc = async { do! createOrProvisionDatabase client dName mode - let! container = createBatchAndTipContainerIfNotExists client (dName,cName) mode + let! container = createBatchAndTipContainerIfNotExists client (dName, cName) mode if not skipStoredProc then do! createSyncStoredProcIfNotExists (Some log) container } - let initAux (client: CosmosClient) (dName,cName) rus = async { + let initAux (client : CosmosClient) (dName, cName) rus = async { // Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partition keys) - let mode = Provisioning.Container rus + let mode = Provisioning.Container (Throughput.Manual rus) do! createOrProvisionDatabase client dName mode - return! createAuxContainerIfNotExists client (dName,cName) mode } + return! createAuxContainerIfNotExists client (dName, cName) mode } /// Holds Container state, coordinating initialization activities type internal ContainerInitializerGuard(container : Container, fallback : Container option, ?initContainer : Container -> Async) = diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 7f0f0a5ff..9a071a971 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -13,7 +13,7 @@ open System open System.Net.Http open System.Threading -type Provisioning = Equinox.CosmosStore.Core.Initialization.Provisioning +module CosmosInit = Equinox.CosmosStore.Core.Initialization let [] appName = "equinox-tool" @@ -41,23 +41,29 @@ type Arguments = | Dump _ -> "Load and show events in a specified stream (supports all stores)." and []InitArguments = | [] Rus of int + | [] Autoscale | [] Mode of CosmosModeType | [] SkipStoredProc | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = a |> function - | Rus _ -> "Specify RU/s level to provision for the Container (Default: 400 RU/s)." + | Rus _ -> "Specify RU/s level to provision for the Container (Default: 400 RU/s or 4000 RU/s if autoscaling)." + | Autoscale -> "Autoscale provisioned throughput. Use --rus to specify the maximum RU/s." | Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)." | SkipStoredProc -> "Inhibit creation of stored procedure in specified Container." | Cosmos _ -> "Cosmos Connection parameters." and CosmosInitInfo(args : ParseResults) = member __.ProvisioningMode = + let throughput () = + if args.Contains Autoscale + then CosmosInit.Throughput.Autoscale (args.GetResult(Rus, 4000)) + else CosmosInit.Throughput.Manual (args.GetResult(Rus, 400)) match args.GetResult(Mode, CosmosModeType.Container) with - | CosmosModeType.Container -> Provisioning.Container (args.GetResult(Rus, 400)) - | CosmosModeType.Db -> Provisioning.Database (args.GetResult(Rus, 400)) + | CosmosModeType.Container -> CosmosInit.Provisioning.Container (throughput ()) + | CosmosModeType.Db -> CosmosInit.Provisioning.Database (throughput ()) | CosmosModeType.Serverless -> - if args.Contains Rus then raise (Storage.MissingArg "Cannot specify RU/s in Serverless mode") - Provisioning.Serverless + if args.Contains Rus || args.Contains Autoscale then raise (Storage.MissingArg "Cannot specify RU/s or Autoscale in Serverless mode") + CosmosInit.Provisioning.Serverless and []ConfigArguments = | [] MsSql of ParseResults | [] MySql of ParseResults @@ -312,8 +318,6 @@ let createDomainLog verbose verboseConsole maybeSeqEndpoint = module CosmosInit = - open Equinox.CosmosStore.Core.Initialization - let connect log (sargs : ParseResults) = Storage.Cosmos.connect log (Storage.Cosmos.Info sargs) |> fst @@ -324,16 +328,16 @@ module CosmosInit = let client,dName,cName = connect log sargs let mode = (CosmosInitInfo iargs).ProvisioningMode match mode with - | Provisioning.Container ru -> + | CosmosInit.Provisioning.Container ru -> let modeStr = "Container" log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru) - | Provisioning.Database ru -> + | CosmosInit.Provisioning.Database ru -> let modeStr = "Database" log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, ru) - | Provisioning.Serverless -> + | CosmosInit.Provisioning.Serverless -> let modeStr = "Serverless" log.Information("Provisioning `Equinox.CosmosStore` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr) - return! init log client (dName,cName) mode skipStoredProc + return! CosmosInit.init log client (dName,cName) mode skipStoredProc | _ -> failwith "please specify a `cosmos` endpoint" } module SqlInit =