generated from Avanade/avanade-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathCosmosDb.cs
269 lines (224 loc) · 16.9 KB
/
CosmosDb.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/CoreEx
using CoreEx.Cosmos.Model;
using CoreEx.Cosmos.Extended;
using CoreEx.Entities;
using CoreEx.Json;
using CoreEx.Mapping;
using CoreEx.Results;
using Microsoft.Azure.Cosmos;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace CoreEx.Cosmos
{
/// <summary>
/// Provides extended <b>CosmosDb</b> data access.
/// </summary>
/// <param name="database">The <see cref="Microsoft.Azure.Cosmos.Database"/>.</param>
/// <param name="mapper">The <see cref="IMapper"/>.</param>
/// <param name="invoker">Enables the <see cref="Invoker"/> to be overridden; defaults to <see cref="CosmosDbInvoker"/>.</param>
/// <remarks>It is recommended that the <see cref="CosmosDb"/> is registered as a scoped service to enable capabilities such as <see cref="CosmosDbArgs.FilterByTenantId"/> that <i>must</i> be scoped.
/// Use <see cref="Microsoft.Extensions.DependencyInjection.CosmosDbServiceCollectionExtensions.AddCosmosDb{TCosmosDb}(Microsoft.Extensions.DependencyInjection.IServiceCollection, Func{IServiceProvider, TCosmosDb}, string?)"/> to
/// register the scoped <see cref="CosmosDb"/> instance.
/// <para>The dependent <see cref="CosmosClient"/> should however be registered as a singleton as is <see href="https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/best-practice-dotnet">best practice</see>.</para></remarks>
public class CosmosDb(Database database, IMapper mapper, CosmosDbInvoker? invoker = null) : ICosmosDb
{
private static CosmosDbInvoker? _invoker;
private readonly ConcurrentDictionary<Key, Func<IQueryable, IQueryable>> _filters = new();
/// <summary>
/// Provides key as combination of model type and container identifier.
/// </summary>
private readonly struct Key(Type modelType, string containerId)
{
public Type ModelType { get; } = modelType;
public string ContainerId { get; } = containerId;
}
/// <inheritdoc/>
public Database Database { get; } = database.ThrowIfNull(nameof(database));
/// <inheritdoc/>
public IMapper Mapper { get; } = mapper.ThrowIfNull(nameof(mapper));
/// <inheritdoc/>
public CosmosDbInvoker Invoker { get; } = invoker ?? (_invoker ??= new CosmosDbInvoker());
/// <inheritdoc/>
public CosmosDbArgs DbArgs { get; set; } = new CosmosDbArgs();
/// <inheritdoc/>
public Container GetCosmosContainer(string containerId) => Database.GetContainer(containerId);
/// <inheritdoc/>
public CosmosDbContainer<T, TModel> Container<T, TModel>(string containerId, CosmosDbArgs? dbArgs = null) where T : class, IEntityKey, new() where TModel : class, IEntityKey, new() => new(this, containerId, dbArgs);
/// <inheritdoc/>
public CosmosDbValueContainer<T, TModel> ValueContainer<T, TModel>(string containerId, CosmosDbArgs? dbArgs = null) where T : class, IEntityKey, new() where TModel : class, IEntityKey, new() => new(this, containerId, dbArgs);
/// <inheritdoc/>
public CosmosDbModelContainer<TModel> ModelContainer<TModel>(string containerId, CosmosDbArgs? dbArgs = null) where TModel : class, IEntityKey, new() => new(this, containerId, dbArgs);
/// <inheritdoc/>
public CosmosDbValueModelContainer<TModel> ValueModelContainer<TModel>(string containerId, CosmosDbArgs? dbArgs = null) where TModel : class, IEntityKey, new() => new(this, containerId, dbArgs);
/// <summary>
/// Sets the filter for all operations performed on the <typeparamref name="TModel"/> for the specified <paramref name="containerId"/> to ensure authorisation is applied. Applies automatically
/// to all queries, plus create, update, delete and get operations.
/// </summary>
/// <typeparam name="TModel">The model <see cref="Type"/> persisted within the container.</typeparam>
/// <param name="containerId">The <see cref="Microsoft.Azure.Cosmos.Container"/> identifier.</param>
/// <param name="filter">The filter query.</param>
/// <remarks>The <see cref="CosmosDb"/> instance to support fluent-style method-chaining.</remarks>
public CosmosDb UseAuthorizeFilter<TModel>(string containerId, Func<IQueryable, IQueryable> filter)
{
if (!_filters.TryAdd(new Key(typeof(TModel), containerId.ThrowIfNull(nameof(containerId))), filter.ThrowIfNull(nameof(filter))))
throw new InvalidOperationException("A filter cannot be overridden.");
return this;
}
/// <inheritdoc/>
public Func<IQueryable, IQueryable>? GetAuthorizeFilter<TModel>(string containerId) => _filters.TryGetValue(new Key(typeof(TModel), containerId.ThrowIfNull(nameof(containerId))), out var filter) ? filter : null;
/// <inheritdoc/>
public Result? HandleCosmosException(CosmosException cex) => OnCosmosException(cex);
/// <summary>
/// Provides the <see cref="CosmosException"/> handling as a result of <see cref="HandleCosmosException(CosmosException)"/>.
/// </summary>
/// <param name="cex">The <see cref="CosmosException"/>.</param>
/// <returns>The <see cref="Result"/> containing the appropriate <see cref="IResult.Error"/>.</returns>
/// <remarks>Where overridding and the <see cref="CosmosException"/> is not specifically handled then invoke the base to ensure any standard handling is executed.</remarks>
protected virtual Result? OnCosmosException(CosmosException cex) => cex.ThrowIfNull(nameof(cex)).StatusCode switch
{
System.Net.HttpStatusCode.NotFound => Result.Fail(new NotFoundException(null, cex)),
System.Net.HttpStatusCode.Conflict => Result.Fail(new DuplicateException(null, cex)),
System.Net.HttpStatusCode.PreconditionFailed => Result.Fail(new ConcurrencyException(null, cex)),
_ => Result.Fail(cex)
};
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <remarks>See <see cref="SelectMultiSetWithResultAsync(PartitionKey, string?, IEnumerable{IMultiSetArgs}, CancellationToken)"/> for further details.</remarks>
public Task SelectMultiSetAsync(PartitionKey partitionKey, params IMultiSetArgs[] multiSetArgs) => SelectMultiSetAsync(partitionKey, multiSetArgs, default);
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <remarks>See <see cref="SelectMultiSetWithResultAsync(PartitionKey, string?, IEnumerable{IMultiSetArgs}, CancellationToken)"/> for further details.</remarks>
public Task SelectMultiSetAsync(PartitionKey partitionKey, IEnumerable<IMultiSetArgs> multiSetArgs, CancellationToken cancellationToken = default) => SelectMultiSetAsync(partitionKey, null, multiSetArgs, cancellationToken);
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="sql">The override SQL statement; will default where not specified.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <remarks>See <see cref="SelectMultiSetWithResultAsync(PartitionKey, string?, IEnumerable{IMultiSetArgs}, CancellationToken)"/> for further details.</remarks>
public async Task SelectMultiSetAsync(PartitionKey partitionKey, string? sql, IEnumerable<IMultiSetArgs> multiSetArgs, CancellationToken cancellationToken = default)
=> (await SelectMultiSetWithResultAsync(partitionKey, sql, multiSetArgs, cancellationToken).ConfigureAwait(false)).ThrowOnError();
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <remarks>See <see cref="SelectMultiSetWithResultAsync(PartitionKey, string?, IEnumerable{IMultiSetArgs}, CancellationToken)"/> for further details.</remarks>
public Task<Result> SelectMultiSetWithResultAsync(PartitionKey partitionKey, params IMultiSetArgs[] multiSetArgs) => SelectMultiSetWithResultAsync(partitionKey, multiSetArgs, default);
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <remarks>See <see cref="SelectMultiSetWithResultAsync(PartitionKey, string?, IEnumerable{IMultiSetArgs}, CancellationToken)"/> for further details.</remarks>
public Task<Result> SelectMultiSetWithResultAsync(PartitionKey partitionKey, IEnumerable<IMultiSetArgs> multiSetArgs, CancellationToken cancellationToken = default) => SelectMultiSetWithResultAsync(partitionKey, null, multiSetArgs, cancellationToken);
/// <summary>
/// Executes a multi-dataset query command with one or more <see cref="IMultiSetArgs"/> with a <see cref="Result{T}"/>.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/>.</param>
/// <param name="sql">The override SQL statement; will default where not specified.</param>
/// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <remarks>The <paramref name="multiSetArgs"/> must all be from the same <see cref="CosmosDb"/>, be of type <see cref="CosmosDbValueContainer{T, TModel}"/>, and reference the same <see cref="Container.Id"/>. Each
/// <paramref name="multiSetArgs"/> is verified and executed in the order specified.
/// <para>The underlying SQL will be automatically created from the specified <paramref name="multiSetArgs"/> where not explicitly supplied. Essentially, it is a simple query where all <i>types</i> inferred from the <paramref name="multiSetArgs"/>
/// are included, for example: <c>SELECT * FROM c WHERE c.type in ("TypeNameA", "TypeNameB")</c></para>
/// <para>Example usage is:
/// <code>
/// private async Task<Result<MemberDetail?>> GetDetailOnImplementationAsync(int id)
/// {
/// MemberDetail? md = null;
/// return await Result.GoAsync(() => _cosmos.SelectMultiSetWithResultAsync(new AzCosmos.PartitionKey(id.ToString()),
/// _cosmos.Members.CreateMultiSetSingleArgs(m => md = m.CreateCopyFromAs<MemberDetail>(), isMandatory: false, stopOnNull: true),
/// _cosmos.MemberAddresses.CreateMultiSetCollArgs(mac => md.Adjust(x => x.Addresses = new (mac)))))
/// .ThenAs(() => md).ConfigureAwait(false);
/// }
/// </code></para></remarks>
public async Task<Result> SelectMultiSetWithResultAsync(PartitionKey partitionKey, string? sql, IEnumerable<IMultiSetArgs> multiSetArgs, CancellationToken cancellationToken = default)
{
// Verify that the multi set arguments are valid for this type of get query.
var multiSetList = multiSetArgs?.ToArray() ?? null;
if (multiSetList == null || multiSetList.Length == 0)
throw new ArgumentException($"At least one {nameof(IMultiSetArgs)} must be supplied.", nameof(multiSetArgs));
if (multiSetList.Any(x => x.Container.CosmosDb != this))
throw new ArgumentException($"All {nameof(IMultiSetArgs)} containers must be from this same database.", nameof(multiSetArgs));
if (multiSetList.Any(x => !x.Container.IsCosmosDbValueModel))
throw new ArgumentException($"All {nameof(IMultiSetArgs)} containers must be of type CosmosDbValueContainer.", nameof(multiSetArgs));
// Build the Cosmos SQL statement.
var container = multiSetList[0].Container;
var types = new Dictionary<string, IMultiSetArgs>([ new KeyValuePair<string, IMultiSetArgs>(container.ModelType.Name, multiSetList[0]) ]);
var sb = string.IsNullOrEmpty(sql) ? new StringBuilder($"SELECT * FROM c WHERE c.type in (\"{container.ModelType.Name}\"") : null;
for (int i = 1; i < multiSetList.Length; i++)
{
if (multiSetList[i].Container.Container.Id != container.Container.Id)
throw new ArgumentException($"All {nameof(IMultiSetArgs)} containers must reference the same container id.", nameof(multiSetArgs));
if (!types.TryAdd(multiSetList[i].Container.ModelType.Name, multiSetList[i]))
throw new ArgumentException($"All {nameof(IMultiSetArgs)} containers must be of different model type.", nameof(multiSetArgs));
sb?.Append($", \"{multiSetList[i].Container.ModelType.Name}\"");
}
sb?.Append(')');
// Execute the Cosmos DB query.
var result = await Invoker.InvokeAsync(this, container, sb?.ToString() ?? sql, types, async (_, container, sql, types, ct) =>
{
// Set up for work.
var da = new CosmosDbArgs(container.DbArgs, partitionKey);
var qsi = container.Container.GetItemQueryStreamIterator(sql, requestOptions: da.GetQueryRequestOptions());
IJsonSerializer js = ExecutionContext.GetService<IJsonSerializer>() ?? CoreEx.Json.JsonSerializer.Default;
var isStj = js is Text.Json.JsonSerializer;
while (qsi.HasMoreResults)
{
var rm = await qsi.ReadNextAsync(ct).ConfigureAwait(false);
if (!rm.IsSuccessStatusCode)
return Result.Fail(new InvalidOperationException(rm.ErrorMessage));
var json = JsonDocument.Parse(rm.Content);
if (!json.RootElement.TryGetProperty("Documents", out var jds) || jds.ValueKind != JsonValueKind.Array)
return Result.Fail(new InvalidOperationException("Cosmos response JSON 'Documents' property either not found in result or is not an array."));
foreach (var jd in jds.EnumerateArray())
{
if (!jd.TryGetProperty("type", out var jt) || jt.ValueKind != JsonValueKind.String)
return Result.Fail(new InvalidOperationException("Cosmos response documents item 'type' property either not found in result or is not a string."));
if (!types.TryGetValue(jt.GetString()!, out var msa))
continue; // Ignore any unexpected type.
var model = isStj
? jd.Deserialize(msa.Container.ModelValueType, (JsonSerializerOptions)js.Options)
: js.Deserialize(jd.ToString(), msa.Container.ModelValueType);
if (!msa.Container.IsModelValid(model, msa.Container.DbArgs, true))
continue;
var result = msa.AddItem(msa.Container.MapToValue(model));
if (result.IsFailure)
return result;
}
}
return Result.Success;
}, cancellationToken).ConfigureAwait(false);
if (result.IsFailure)
return result;
// Validate the multi-set args and action each accordingly.
foreach (var msa in multiSetList)
{
var r = msa.Verify();
if (r.IsFailure)
return r.AsResult();
if (!r.Value && msa.StopOnNull)
break;
msa.Invoke();
}
return Result.Success;
}
}
}