From 35bb0eb6f4bcaf258632beab33d21bb5c4c6bfe3 Mon Sep 17 00:00:00 2001 From: Damith Gunner Date: Mon, 27 Jul 2020 20:07:15 +0530 Subject: [PATCH] watcher feature complete --- MongoDB.Entities/Commands/Watcher.cs | 36 +++++++++++-------- .../{Utilities => Core}/Attributes.cs | 0 MongoDB.Entities/{Utilities => Core}/Date.cs | 0 .../{Utilities => Core}/FileEntity.cs | 0 .../{Utilities => Core}/FuzzyString.cs | 0 .../{Utilities => Core}/GeoNear.cs | 0 .../{Utilities => Core}/SequenceCounter.cs | 0 .../{Utilities => Core}/Template.cs | 0 MongoDB.Entities/MongoDB.Entities.csproj | 6 ++-- Tests/TestWatcher.cs | 7 ++-- 10 files changed, 30 insertions(+), 19 deletions(-) rename MongoDB.Entities/{Utilities => Core}/Attributes.cs (100%) rename MongoDB.Entities/{Utilities => Core}/Date.cs (100%) rename MongoDB.Entities/{Utilities => Core}/FileEntity.cs (100%) rename MongoDB.Entities/{Utilities => Core}/FuzzyString.cs (100%) rename MongoDB.Entities/{Utilities => Core}/GeoNear.cs (100%) rename MongoDB.Entities/{Utilities => Core}/SequenceCounter.cs (100%) rename MongoDB.Entities/{Utilities => Core}/Template.cs (100%) diff --git a/MongoDB.Entities/Commands/Watcher.cs b/MongoDB.Entities/Commands/Watcher.cs index e40b81b58..59ea14edf 100644 --- a/MongoDB.Entities/Commands/Watcher.cs +++ b/MongoDB.Entities/Commands/Watcher.cs @@ -38,13 +38,13 @@ public class Watcher where T : IEntity public string Name { get; } /// - /// Returns true if watching can be restarted if it's stopped due to an error or invalidate event. + /// Returns true if watching can be restarted if it was stopped due to an error or invalidate event. /// Will always return false after cancellation is requested via the cancellation token. /// public bool CanRestart { get => !cancelToken.IsCancellationRequested; } /// - /// The last resume token received from mongodb server. Can be used to resume watching with .StartWithToken() method + /// The last resume token received from mongodb server. Can be used to resume watching with .StartWithToken() method. /// public BsonDocument ResumeToken => options?.ResumeAfter; @@ -57,14 +57,14 @@ public class Watcher where T : IEntity internal Watcher(string name) => Name = name; /// - /// Starts the watcher instance with the supplied configuration + /// Starts the watcher instance with the supplied parameters /// - /// Type of event to watch for. Multiple can be specified as: EventType.Created | EventType.Updated | EventType.Deleted + /// Type of event to watch for. Specify multiple like: EventType.Created | EventType.Updated | EventType.Deleted /// x => x.FullDocument.Prop1 == "SomeValue" /// The max number of entities to receive for a single event occurence /// Set to true if you don't want the complete entity details. All properties except the ID will then be null. /// Set to false if you'd like to skip the changes that happened while the watching was stopped - /// A cancellation token for ending the watch/ change stream + /// A cancellation token for ending the watching/change stream public void Start( EventType eventTypes, Expression, bool>> filter = null, @@ -78,11 +78,11 @@ public void Start( /// Starts the watcher instance with the supplied configuration /// /// A resume token to start receiving changes after some point back in time - /// Type of event to watch for. Multiple can be specified as: EventType.Created | EventType.Updated | EventType.Deleted + /// Type of event to watch for. Specify multiple like: EventType.Created | EventType.Updated | EventType.Deleted /// x => x.FullDocument.Prop1 == "SomeValue" /// The max number of entities to receive for a single event occurence /// Set to true if you don't want the complete entity details. All properties except the ID will then be null. - /// A cancellation token for ending the watch/ change stream + /// A cancellation token for ending the watching/change stream public void StartWithToken( BsonDocument resumeToken, EventType eventTypes, @@ -95,11 +95,11 @@ public void StartWithToken( private void Init( BsonDocument resumeToken, EventType eventTypes, - Expression, bool>> filter = null, - int batchSize = 25, - bool onlyGetIDs = false, - bool autoResume = true, - CancellationToken cancellation = default) + Expression, bool>> filter, + int batchSize, + bool onlyGetIDs, + bool autoResume, + CancellationToken cancellation) { if (initialized) throw new InvalidOperationException("This watcher has already been initialized!"); @@ -121,6 +121,14 @@ private void Init( if ((eventTypes & EventType.Deleted) != 0) ops.Add(ChangeStreamOperationType.Delete); + if (ops.Contains(ChangeStreamOperationType.Delete) && filter != null) + { + throw new ArgumentException( + "Filtering is not supported when watching for deletions " + + "as the entity data no longer exists in the db " + + "at the time of receiving the event."); + } + var filters = Builders>.Filter.Where(x => ops.Contains(x.OperationType)); if (filter != null) @@ -183,9 +191,9 @@ private void StartWatching() if (resume) options.StartAfter = cursor.Current.Last().ResumeToken; OnChanges?.Invoke(cursor.Current.Select(x => x.FullDocument)); } - else + else if (resume) { - if (resume) options.StartAfter = cursor.Current.First().ResumeToken; + options.StartAfter = cursor.Current.First().ResumeToken; } } } diff --git a/MongoDB.Entities/Utilities/Attributes.cs b/MongoDB.Entities/Core/Attributes.cs similarity index 100% rename from MongoDB.Entities/Utilities/Attributes.cs rename to MongoDB.Entities/Core/Attributes.cs diff --git a/MongoDB.Entities/Utilities/Date.cs b/MongoDB.Entities/Core/Date.cs similarity index 100% rename from MongoDB.Entities/Utilities/Date.cs rename to MongoDB.Entities/Core/Date.cs diff --git a/MongoDB.Entities/Utilities/FileEntity.cs b/MongoDB.Entities/Core/FileEntity.cs similarity index 100% rename from MongoDB.Entities/Utilities/FileEntity.cs rename to MongoDB.Entities/Core/FileEntity.cs diff --git a/MongoDB.Entities/Utilities/FuzzyString.cs b/MongoDB.Entities/Core/FuzzyString.cs similarity index 100% rename from MongoDB.Entities/Utilities/FuzzyString.cs rename to MongoDB.Entities/Core/FuzzyString.cs diff --git a/MongoDB.Entities/Utilities/GeoNear.cs b/MongoDB.Entities/Core/GeoNear.cs similarity index 100% rename from MongoDB.Entities/Utilities/GeoNear.cs rename to MongoDB.Entities/Core/GeoNear.cs diff --git a/MongoDB.Entities/Utilities/SequenceCounter.cs b/MongoDB.Entities/Core/SequenceCounter.cs similarity index 100% rename from MongoDB.Entities/Utilities/SequenceCounter.cs rename to MongoDB.Entities/Core/SequenceCounter.cs diff --git a/MongoDB.Entities/Utilities/Template.cs b/MongoDB.Entities/Core/Template.cs similarity index 100% rename from MongoDB.Entities/Utilities/Template.cs rename to MongoDB.Entities/Core/Template.cs diff --git a/MongoDB.Entities/MongoDB.Entities.csproj b/MongoDB.Entities/MongoDB.Entities.csproj index 65e8d0b87..9b8a5373b 100644 --- a/MongoDB.Entities/MongoDB.Entities.csproj +++ b/MongoDB.Entities/MongoDB.Entities.csproj @@ -12,9 +12,9 @@ 13.6.0.0 13.6.0.0 Đĵ ΝιΓΞΗΛψΚ - - simplified change-stream watcher api -- ability to resume change streams with tokens -- ability to filter change events with an expression + - simplified change-stream api +- ability to resume change-streams with tokens +- ability to filter insert & update events with an expression - misc. code changes MongoDB.Entities MongoDB.Entities diff --git a/Tests/TestWatcher.cs b/Tests/TestWatcher.cs index 67ce08b51..99b10e5aa 100644 --- a/Tests/TestWatcher.cs +++ b/Tests/TestWatcher.cs @@ -14,7 +14,9 @@ public void watching_works() var watcher = DB.Watcher("test"); var allFlowers = new List(); - watcher.Start(EventType.Created | EventType.Deleted, 5); + watcher.Start( + EventType.Created | EventType.Updated, + f => f.FullDocument.Name == "test"); Task.Delay(1000).Wait(); @@ -29,11 +31,12 @@ public void watching_works() var flower = new Flower { Name = "test" }; flower.Save(); + flower.Delete(); Task.Delay(1000).Wait(); - Assert.AreEqual(5, allFlowers.Count); + Assert.AreEqual(4, allFlowers.Count); } } }