Skip to content

Commit

Permalink
watcher feature complete
Browse files Browse the repository at this point in the history
  • Loading branch information
dj-nitehawk committed Jul 27, 2020
1 parent 8a796c8 commit 35bb0eb
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 19 deletions.
36 changes: 22 additions & 14 deletions MongoDB.Entities/Commands/Watcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public class Watcher<T> where T : IEntity
public string Name { get; }

/// <summary>
/// Returns true if watching can be restarted if it's stopped due to an error or invalidate event.
/// Returns true if watching can be restarted if it was stopped due to an error or invalidate event.
/// Will always return false after cancellation is requested via the cancellation token.
/// </summary>
public bool CanRestart { get => !cancelToken.IsCancellationRequested; }

/// <summary>
/// The last resume token received from mongodb server. Can be used to resume watching with .StartWithToken() method
/// The last resume token received from mongodb server. Can be used to resume watching with .StartWithToken() method.
/// </summary>
public BsonDocument ResumeToken => options?.ResumeAfter;

Expand All @@ -57,14 +57,14 @@ public class Watcher<T> where T : IEntity
internal Watcher(string name) => Name = name;

/// <summary>
/// Starts the watcher instance with the supplied configuration
/// Starts the watcher instance with the supplied parameters
/// </summary>
/// <param name="eventTypes">Type of event to watch for. Multiple can be specified as: EventType.Created | EventType.Updated | EventType.Deleted</param>
/// <param name="eventTypes">Type of event to watch for. Specify multiple like: EventType.Created | EventType.Updated | EventType.Deleted</param>
/// <param name="filter">x => x.FullDocument.Prop1 == "SomeValue"</param>
/// <param name="batchSize">The max number of entities to receive for a single event occurence</param>
/// <param name="onlyGetIDs">Set to true if you don't want the complete entity details. All properties except the ID will then be null.</param>
/// <param name="autoResume">Set to false if you'd like to skip the changes that happened while the watching was stopped</param>
/// <param name="cancellation">A cancellation token for ending the watch/ change stream</param>
/// <param name="cancellation">A cancellation token for ending the watching/change stream</param>
public void Start(
EventType eventTypes,
Expression<Func<ChangeStreamDocument<T>, bool>> filter = null,
Expand All @@ -78,11 +78,11 @@ public void Start(
/// Starts the watcher instance with the supplied configuration
/// </summary>
/// <param name="resumeToken">A resume token to start receiving changes after some point back in time</param>
/// <param name="eventTypes">Type of event to watch for. Multiple can be specified as: EventType.Created | EventType.Updated | EventType.Deleted</param>
/// <param name="eventTypes">Type of event to watch for. Specify multiple like: EventType.Created | EventType.Updated | EventType.Deleted</param>
/// <param name="filter">x => x.FullDocument.Prop1 == "SomeValue"</param>
/// <param name="batchSize">The max number of entities to receive for a single event occurence</param>
/// <param name="onlyGetIDs">Set to true if you don't want the complete entity details. All properties except the ID will then be null.</param>
/// <param name="cancellation">A cancellation token for ending the watch/ change stream</param>
/// <param name="cancellation">A cancellation token for ending the watching/change stream</param>
public void StartWithToken(
BsonDocument resumeToken,
EventType eventTypes,
Expand All @@ -95,11 +95,11 @@ public void StartWithToken(
private void Init(
BsonDocument resumeToken,
EventType eventTypes,
Expression<Func<ChangeStreamDocument<T>, bool>> filter = null,
int batchSize = 25,
bool onlyGetIDs = false,
bool autoResume = true,
CancellationToken cancellation = default)
Expression<Func<ChangeStreamDocument<T>, bool>> filter,
int batchSize,
bool onlyGetIDs,
bool autoResume,
CancellationToken cancellation)
{
if (initialized)
throw new InvalidOperationException("This watcher has already been initialized!");
Expand All @@ -121,6 +121,14 @@ private void Init(
if ((eventTypes & EventType.Deleted) != 0)
ops.Add(ChangeStreamOperationType.Delete);

if (ops.Contains(ChangeStreamOperationType.Delete) && filter != null)
{
throw new ArgumentException(
"Filtering is not supported when watching for deletions " +
"as the entity data no longer exists in the db " +
"at the time of receiving the event.");
}

var filters = Builders<ChangeStreamDocument<T>>.Filter.Where(x => ops.Contains(x.OperationType));

if (filter != null)
Expand Down Expand Up @@ -183,9 +191,9 @@ private void StartWatching()
if (resume) options.StartAfter = cursor.Current.Last().ResumeToken;
OnChanges?.Invoke(cursor.Current.Select(x => x.FullDocument));
}
else
else if (resume)
{
if (resume) options.StartAfter = cursor.Current.First().ResumeToken;
options.StartAfter = cursor.Current.First().ResumeToken;
}
}
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions MongoDB.Entities/MongoDB.Entities.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
<AssemblyVersion>13.6.0.0</AssemblyVersion>
<FileVersion>13.6.0.0</FileVersion>
<Copyright>Đĵ ΝιΓΞΗΛψΚ</Copyright>
<PackageReleaseNotes>- simplified change-stream watcher api
- ability to resume change streams with tokens
- ability to filter change events with an expression
<PackageReleaseNotes>- simplified change-stream api
- ability to resume change-streams with tokens
- ability to filter insert &amp; update events with an expression
- misc. code changes</PackageReleaseNotes>
<PackageId>MongoDB.Entities</PackageId>
<Product>MongoDB.Entities</Product>
Expand Down
7 changes: 5 additions & 2 deletions Tests/TestWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ public void watching_works()
var watcher = DB.Watcher<Flower>("test");
var allFlowers = new List<Flower>();

watcher.Start(EventType.Created | EventType.Deleted, 5);
watcher.Start(
EventType.Created | EventType.Updated,
f => f.FullDocument.Name == "test");

Task.Delay(1000).Wait();

Expand All @@ -29,11 +31,12 @@ public void watching_works()

var flower = new Flower { Name = "test" };
flower.Save();

flower.Delete();

Task.Delay(1000).Wait();

Assert.AreEqual(5, allFlowers.Count);
Assert.AreEqual(4, allFlowers.Count);
}
}
}

0 comments on commit 35bb0eb

Please sign in to comment.