Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
dj-nitehawk committed Sep 28, 2020
2 parents 1ad0f11 + c7c3300 commit e3bbc7e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
15 changes: 9 additions & 6 deletions MongoDB.Entities/Commands/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ public Task<UpdateResult> SavePreservingAsync<T>(T entity, Expression<Func<T, ob
/// </summary>
/// <typeparam name="T">The type of entity</typeparam>
/// <param name="ID">The Id of the entity to delete</param>
public Task<DeleteResult> DeleteAsync<T>(string ID) where T : IEntity
/// <param name="cancellation">An optional cancellation token</param>
public Task<DeleteResult> DeleteAsync<T>(string ID, CancellationToken cancellation = default) where T : IEntity
{
return DB.DeleteAsync<T>(ID, Session);
return DB.DeleteAsync<T>(ID, Session, cancellation);
}

/// <summary>
Expand All @@ -261,9 +262,10 @@ public Task<DeleteResult> DeleteAsync<T>(string ID) where T : IEntity
/// </summary>
/// <typeparam name="T">The type of entity</typeparam>
/// <param name="expression">A lambda expression for matching entities to delete.</param>
public Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>> expression) where T : IEntity
/// <param name="cancellation">An optional cancellation token</param>
public Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>> expression, CancellationToken cancellation = default) where T : IEntity
{
return DB.DeleteAsync(expression, Session);
return DB.DeleteAsync(expression, Session, cancellation);
}

/// <summary>
Expand All @@ -273,9 +275,10 @@ public Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>> expression) w
/// </summary>
/// <typeparam name="T">The type of entity</typeparam>
/// <param name="IDs">An IEnumerable of entity IDs</param>
public Task<DeleteResult> DeleteAsync<T>(IEnumerable<string> IDs) where T : IEntity
/// <param name="cancellation">An optional cancellation token</param>
public Task<DeleteResult> DeleteAsync<T>(IEnumerable<string> IDs, CancellationToken cancellation = default) where T : IEntity
{
return DB.DeleteAsync<T>(IDs, Session);
return DB.DeleteAsync<T>(IDs, Session, cancellation);
}

/// <summary>
Expand Down
56 changes: 41 additions & 15 deletions MongoDB.Entities/DB.Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;

namespace MongoDB.Entities
Expand All @@ -11,7 +12,7 @@ public static partial class DB
{
private static readonly int deleteBatchSize = 100000;

private static async Task<DeleteResult> DeleteCascadingAsync<T>(IEnumerable<string> IDs, IClientSessionHandle session = null) where T : IEntity
private static async Task<DeleteResult> DeleteCascadingAsync<T>(IEnumerable<string> IDs, IClientSessionHandle session = null, CancellationToken cancellation = default) where T : IEntity
{
// note: cancellation should not be enabled because multiple collections are involved
// and premature cancellation could cause data inconsistencies.
Expand All @@ -24,18 +25,22 @@ private static async Task<DeleteResult> DeleteCascadingAsync<T>(IEnumerable<stri

var tasks = new HashSet<Task>();

foreach (var cName in await db.ListCollectionNames(options).ToListAsync().ConfigureAwait(false))
// note: db.listCollections() does not support transactions.
// so don't add session support here:
var collNamesCursor = await db.ListCollectionNamesAsync(options, cancellation).ConfigureAwait(false);

foreach (var cName in await collNamesCursor.ToListAsync(cancellation).ConfigureAwait(false))
{
tasks.Add(
session == null
? db.GetCollection<JoinRecord>(cName).DeleteManyAsync(r => IDs.Contains(r.ChildID) || IDs.Contains(r.ParentID))
: db.GetCollection<JoinRecord>(cName).DeleteManyAsync(session, r => IDs.Contains(r.ChildID) || IDs.Contains(r.ParentID), null));
: db.GetCollection<JoinRecord>(cName).DeleteManyAsync(session, r => IDs.Contains(r.ChildID) || IDs.Contains(r.ParentID), null, cancellation));
}

var delResTask =
session == null
? Collection<T>().DeleteManyAsync(x => IDs.Contains(x.ID))
: Collection<T>().DeleteManyAsync(session, x => IDs.Contains(x.ID), null);
: Collection<T>().DeleteManyAsync(session, x => IDs.Contains(x.ID), null, cancellation);

tasks.Add(delResTask);

Expand All @@ -44,7 +49,7 @@ private static async Task<DeleteResult> DeleteCascadingAsync<T>(IEnumerable<stri
tasks.Add(
session == null
? db.GetCollection<FileChunk>(CollectionName<FileChunk>()).DeleteManyAsync(x => IDs.Contains(x.FileID))
: db.GetCollection<FileChunk>(CollectionName<FileChunk>()).DeleteManyAsync(session, x => IDs.Contains(x.FileID), null));
: db.GetCollection<FileChunk>(CollectionName<FileChunk>()).DeleteManyAsync(session, x => IDs.Contains(x.FileID), null, cancellation));
}

await Task.WhenAll(tasks).ConfigureAwait(false);
Expand All @@ -58,10 +63,12 @@ private static async Task<DeleteResult> DeleteCascadingAsync<T>(IEnumerable<stri
/// </summary>
/// <typeparam name="T">Any class that implements IEntity</typeparam>
/// <param name="ID">The Id of the entity to delete</param>
/// <param name = "session" > An optional session if using within a transaction</param>
public static Task<DeleteResult> DeleteAsync<T>(string ID, IClientSessionHandle session = null) where T : IEntity
/// <param name = "session" >An optional session if using within a transaction</param>
/// <param name="cancellation">An optional cancellation token</param>
public static Task<DeleteResult> DeleteAsync<T>(string ID, IClientSessionHandle session = null, CancellationToken cancellation = default) where T : IEntity
{
return DeleteCascadingAsync<T>(new[] { ID }, session);
ThrowIfCancellationNotSupported(session, cancellation);
return DeleteCascadingAsync<T>(new[] { ID }, session, cancellation);
}

/// <summary>
Expand All @@ -72,16 +79,26 @@ public static Task<DeleteResult> DeleteAsync<T>(string ID, IClientSessionHandle
/// <typeparam name="T">Any class that implements IEntity</typeparam>
/// <param name="expression">A lambda expression for matching entities to delete.</param>
/// <param name = "session" >An optional session if using within a transaction</param>
public static async Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>> expression, IClientSessionHandle session = null) where T : IEntity
/// <param name="cancellation">An optional cancellation token</param>
public static async Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>> expression, IClientSessionHandle session = null, CancellationToken cancellation = default) where T : IEntity
{
ThrowIfCancellationNotSupported(session, cancellation);

long deletedCount = 0;

using (var cursor = await new Find<T, string>(session).Match(expression).Project(e => e.ID).Option(o => o.BatchSize = deleteBatchSize).ExecuteCursorAsync().ConfigureAwait(false))
var cursor = await new Find<T, string>(session)
.Match(expression)
.Project(e => e.ID)
.Option(o => o.BatchSize = deleteBatchSize)
.ExecuteCursorAsync(cancellation)
.ConfigureAwait(false);

using (cursor)
{
while (await cursor.MoveNextAsync().ConfigureAwait(false))
while (await cursor.MoveNextAsync(cancellation).ConfigureAwait(false))
{
if (cursor.Current.Any())
deletedCount += (await DeleteCascadingAsync<T>(cursor.Current, session).ConfigureAwait(false)).DeletedCount;
deletedCount += (await DeleteCascadingAsync<T>(cursor.Current, session, cancellation).ConfigureAwait(false)).DeletedCount;
}
}

Expand All @@ -96,19 +113,28 @@ public static async Task<DeleteResult> DeleteAsync<T>(Expression<Func<T, bool>>
/// <typeparam name="T">Any class that implements IEntity</typeparam>
/// <param name="IDs">An IEnumerable of entity IDs</param>
/// <param name = "session" > An optional session if using within a transaction</param>
public static async Task<DeleteResult> DeleteAsync<T>(IEnumerable<string> IDs, IClientSessionHandle session = null) where T : IEntity
/// <param name="cancellation">An optional cancellation token</param>
public static async Task<DeleteResult> DeleteAsync<T>(IEnumerable<string> IDs, IClientSessionHandle session = null, CancellationToken cancellation = default) where T : IEntity
{
ThrowIfCancellationNotSupported(session, cancellation);

if (IDs.Count() <= deleteBatchSize)
return await DeleteCascadingAsync<T>(IDs, session).ConfigureAwait(false);
return await DeleteCascadingAsync<T>(IDs, session, cancellation).ConfigureAwait(false);

long deletedCount = 0;

foreach (var batch in IDs.ToBatches(deleteBatchSize))
{
deletedCount += (await DeleteCascadingAsync<T>(batch, session).ConfigureAwait(false)).DeletedCount;
deletedCount += (await DeleteCascadingAsync<T>(batch, session, cancellation).ConfigureAwait(false)).DeletedCount;
}

return new DeleteResult.Acknowledged(deletedCount);
}

private static void ThrowIfCancellationNotSupported(IClientSessionHandle session = null, CancellationToken cancellation = default)
{
if (cancellation != default && session == null)
throw new NotSupportedException("Cancellation is only supported within transactions for delete operations!");
}
}
}
5 changes: 3 additions & 2 deletions MongoDB.Entities/MongoDB.Entities.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
<Description>A data access library for MongoDB with an elegant api, LINQ support and built-in entity relationship management.</Description>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageProjectUrl>https://mongodb-entities.com</PackageProjectUrl>
<Version>20.1.0-rc</Version>
<Version>20.1.0</Version>
<Copyright>Đĵ ΝιΓΞΗΛψΚ</Copyright>
<PackageReleaseNotes>- added methods for counting entities to the DB class
- added transaction support for the new count methods
- added cancellation support for deletions
- improved deletions by performing high volume deletes in batches
- improved high concurrency handling
- improved deletes by performing high volume deletes in batches
- new tests</PackageReleaseNotes>
<PackageId>MongoDB.Entities</PackageId>
<Product>MongoDB.Entities</Product>
Expand Down

0 comments on commit e3bbc7e

Please sign in to comment.