diff --git a/sample/CustomerApi.Tests/MovieController/GetMovieTests.cs b/sample/CustomerApi.Tests/MovieController/GetMovieTests.cs deleted file mode 100644 index a3085c78..00000000 --- a/sample/CustomerApi.Tests/MovieController/GetMovieTests.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System.Net; -using CustomerApi.Uris; -using FluentAssertions; -using Xunit; - -namespace CustomerApi.Tests.MovieController; - -public class GetMovieTests -{ - [Fact] - public async Task Valid_ReturnsOkWhenUsingEventStream() - { - using var customerApi = new TestCustomerApi(); - await customerApi.Given.AnExistingMovie(MovieUri.Parse("/movies/ExistingMovie"), "The Matrix"); - - var client = customerApi.CreateClient(); - - var response = await client.GetAsync("/movies/ExistingMovie/by-event"); - - response.StatusCode.Should().Be(HttpStatusCode.OK); - } - - [Fact] - public async Task Valid_ReturnsNotFoundWhenUsingSnapshotThatIsNotEnabled() - { - using var customerApi = new TestCustomerApi(); - await customerApi.Given.AnExistingMovie(MovieUri.Parse("/movies/ExistingMovie"), "The Matrix"); - - var client = customerApi.CreateClient(); - - var response = await client.GetAsync("/movies/ExistingMovie/by-snapshot"); - - response.StatusCode.Should().Be(HttpStatusCode.NotFound); - } -} diff --git a/sample/CustomerApi.Tests/MovieController/MovieTests.cs b/sample/CustomerApi.Tests/MovieController/MovieTests.cs new file mode 100644 index 00000000..884f1ac1 --- /dev/null +++ b/sample/CustomerApi.Tests/MovieController/MovieTests.cs @@ -0,0 +1,82 @@ +using System.Net; +using System.Net.Http.Json; +using CustomerApi.Controllers.Movies; +using CustomerApi.Uris; +using FluentAssertions; +using Xunit; + +namespace CustomerApi.Tests.MovieController; + +public class MovieTests +{ + [Fact] + public async Task Valid_ReturnsOkWhenUsingEventStream() + { + using var customerApi = new TestCustomerApi(); + await customerApi.Given.AnExistingMovie(MovieUri.Parse("/movies/ExistingMovie"), "The Matrix"); + + var client = customerApi.CreateClient(); + + var response = await client.GetAsync("/movies/ExistingMovie/by-event"); + + response.StatusCode.Should().Be(HttpStatusCode.OK); + } + + [Fact] + public async Task Valid_ReturnsNotFoundWhenUsingSnapshotThatIsNotEnabled() + { + using var customerApi = new TestCustomerApi(); + await customerApi.Given.AnExistingMovie(MovieUri.Parse("/movies/ExistingMovie"), "The Matrix"); + + var client = customerApi.CreateClient(); + + var response = await client.GetAsync("/movies/ExistingMovie/by-snapshot"); + + response.StatusCode.Should().Be(HttpStatusCode.NotFound); + } + + [Fact] + public async Task Valid_ReturnsOkWhenUsingHybridAndSnapshotIsNotEnabled() + { + using var customerApi = new TestCustomerApi(); + await customerApi.Given.AnExistingMovie(MovieUri.Parse("/movies/ExistingMovie"), "The Matrix"); + + var client = customerApi.CreateClient(); + + var response = await client.GetAsync("/movies/ExistingMovie/by-event"); + + response.StatusCode.Should().Be(HttpStatusCode.OK); + } + + [Fact] + public async Task Valid_SavesSnapshotWhenUsingHybridRepoToApply() + { + using var customerApi = new TestCustomerApi(); + var client = customerApi.CreateClient(); + + var response = await client.PostAsJsonAsync("/movies/create-hybrid", new { name = "Hot Fuzz" }); + + response.StatusCode.Should().Be(HttpStatusCode.Created); + response.Headers.Location.Should().NotBeNull(); + var movieUri = MovieUri.Parse(response.Headers.Location!.ToString()); + await customerApi.Then.TheMovieSnapshotShouldMatch(movieUri, snapshot => snapshot.Name == "Hot Fuzz"); + } + + [Fact] + public async Task Valid_ReturnsOkWhenUsingHybridToQueryLatestChanges() + { + using var customerApi = new TestCustomerApi(); + var movieUri = MovieUri.Parse("/movies/ExistingMovie"); + await customerApi.Given.AnExistingMovieWithAProjectedSnapshot(movieUri, "The Matrix"); + await customerApi.Given.AnExistingMovieNameIsChangedButNotProjected(movieUri, "The Matrix Reloaded"); + + var client = customerApi.CreateClient(); + + var response = await client.GetAsync("/movies/ExistingMovie/by-hybrid-query"); + + response.StatusCode.Should().Be(HttpStatusCode.OK); + var movie = await response.Content.ReadFromJsonAsync(); + movie.Should().NotBeNull(); + movie!.Name.Should().Be("The Matrix Reloaded"); + } +} diff --git a/sample/CustomerApi.Tests/TestApi/GivenSteps.cs b/sample/CustomerApi.Tests/TestApi/GivenSteps.cs index 33163e44..70a6e268 100644 --- a/sample/CustomerApi.Tests/TestApi/GivenSteps.cs +++ b/sample/CustomerApi.Tests/TestApi/GivenSteps.cs @@ -41,6 +41,16 @@ public async Task AnExistingMovie(MovieUri movieUri, Discretiona return await _movieStore.GivenAnExistingMovie(movieUri, name); } + public async Task AnExistingMovieWithAProjectedSnapshot(MovieUri movieUri, Discretionary name = default) + { + return await _movieStore.GivenAnExistingMovieSnapshot(movieUri, name); + } + + public async Task AnExistingMovieNameIsChangedButNotProjected(MovieUri movieUri, string newName) + { + return await _movieStore.GivenAnExistingMovieNameIsChanged(movieUri, newName); + } + public async Task TheCustomerIsDeleted(CustomerUri customerUri) { await _customerStore.GivenTheCustomerIsDeleted(customerUri); diff --git a/sample/CustomerApi.Tests/TestApi/MovieStore.cs b/sample/CustomerApi.Tests/TestApi/MovieStore.cs index 14ff81b2..c6dd6e3d 100644 --- a/sample/CustomerApi.Tests/TestApi/MovieStore.cs +++ b/sample/CustomerApi.Tests/TestApi/MovieStore.cs @@ -1,5 +1,7 @@ -using CustomerApi.Events.Movies; +using System.Linq.Expressions; +using CustomerApi.Events.Movies; using CustomerApi.Uris; +using FluentAssertions; using LogOtter.CosmosDb.EventStore; // ReSharper disable UnusedMethodReturnValue.Local @@ -9,10 +11,18 @@ namespace CustomerApi.Tests; public class MovieStore { private readonly EventRepository _movieEventRepository; + private readonly SnapshotRepository _movieSnapshotRepository; + private readonly HybridRepository _movieHybridRepository; - public MovieStore(EventRepository movieEventRepository) + public MovieStore( + EventRepository movieEventRepository, + SnapshotRepository movieSnapshotRepository, + HybridRepository movieHybridRepository + ) { _movieEventRepository = movieEventRepository; + _movieSnapshotRepository = movieSnapshotRepository; + _movieHybridRepository = movieHybridRepository; } public async Task GivenAnExistingMovie(MovieUri movieUri, Discretionary name) @@ -27,4 +37,36 @@ public async Task GivenAnExistingMovie(MovieUri movieUri, Discre return await _movieEventRepository.ApplyEvents(movieUri.Uri, 0, movieAdded); } + + public async Task GivenAnExistingMovieSnapshot(MovieUri movieUri, Discretionary name) + { + var movieReadModel = await _movieEventRepository.Get(movieUri.Uri); + if (movieReadModel != null) + { + return movieReadModel; + } + + var movieAdded = new MovieAdded(movieUri, name.GetValueOrDefault("Dwayne Dibley in the Duke of Dork")); + + return await _movieHybridRepository.ApplyEventsAndUpdateSnapshotImmediately(movieUri.Uri, 0, CancellationToken.None, movieAdded); + } + + public async Task GivenAnExistingMovieNameIsChanged(MovieUri movieUri, string newName) + { + var movieReadModel = await _movieEventRepository.Get(movieUri.Uri); + if (movieReadModel == null) + { + throw new Exception("Movie not found, make sure you called a setup method to create it first"); + } + + var movieNameChanged = new MovieNameChanged(movieUri, newName); + return await _movieEventRepository.ApplyEvents(movieUri.Uri, movieReadModel.Revision, movieNameChanged); + } + + public async Task ThenTheMovieSnapshotShouldMatch(MovieUri movieUri, Expression> matchFunc) + { + var movie = await _movieSnapshotRepository.GetSnapshot(movieUri.Uri, MovieReadModel.StaticPartitionKey); + movie.Should().NotBeNull(); + movie.Should().Match(matchFunc); + } } diff --git a/sample/CustomerApi.Tests/TestApi/ThenSteps.cs b/sample/CustomerApi.Tests/TestApi/ThenSteps.cs index 15566197..6eeaeed6 100644 --- a/sample/CustomerApi.Tests/TestApi/ThenSteps.cs +++ b/sample/CustomerApi.Tests/TestApi/ThenSteps.cs @@ -1,5 +1,6 @@ using System.Linq.Expressions; using CustomerApi.Events.Customers; +using CustomerApi.Events.Movies; using CustomerApi.NonEventSourcedData.CustomerInterests; using CustomerApi.Uris; @@ -8,12 +9,14 @@ namespace CustomerApi.Tests; public class ThenSteps { private readonly CustomerStore _customerStore; + private readonly MovieStore _movieStore; private readonly SearchableInterestStore _searchableInterestStore; - public ThenSteps(CustomerStore customerStore, SearchableInterestStore searchableInterestStore) + public ThenSteps(CustomerStore customerStore, SearchableInterestStore searchableInterestStore, MovieStore movieStore) { _customerStore = customerStore; _searchableInterestStore = searchableInterestStore; + _movieStore = movieStore; } public async Task TheCustomerShouldBeDeleted(CustomerUri customerUri) @@ -31,6 +34,11 @@ public async Task TheMovieShouldMatch(MovieUri movieUri, Expression> matchFunc) + { + await _movieStore.ThenTheMovieSnapshotShouldMatch(movieUri, matchFunc); + } + public async Task TheSongShouldMatch(SongUri songUri, Expression> matchFunc) { await _customerStore.ThenTheSongShouldMatch(songUri, matchFunc); diff --git a/sample/CustomerApi/Controllers/Movies/CreateMovieRequest.cs b/sample/CustomerApi/Controllers/Movies/CreateMovieRequest.cs new file mode 100644 index 00000000..39abef0a --- /dev/null +++ b/sample/CustomerApi/Controllers/Movies/CreateMovieRequest.cs @@ -0,0 +1,3 @@ +namespace CustomerApi.Controllers.Movies; + +public record CreateMovieRequest(string Name); diff --git a/sample/CustomerApi/Controllers/Movies/MovieController.cs b/sample/CustomerApi/Controllers/Movies/MovieController.cs index 26447731..c51ed88f 100644 --- a/sample/CustomerApi/Controllers/Movies/MovieController.cs +++ b/sample/CustomerApi/Controllers/Movies/MovieController.cs @@ -6,22 +6,25 @@ namespace CustomerApi.Controllers.Movies; [ApiController] -[Route("movies/{movieId}")] +[Route("movies")] public class MovieController : ControllerBase { private readonly EventRepository _movieEventRepository; private readonly SnapshotRepository _movieSnapshotRepository; + private readonly HybridRepository _movieHybridRepository; public MovieController( EventRepository movieEventRepository, - SnapshotRepository movieSnapshotRepository + SnapshotRepository movieSnapshotRepository, + HybridRepository movieHybridRepository ) { _movieEventRepository = movieEventRepository; _movieSnapshotRepository = movieSnapshotRepository; + _movieHybridRepository = movieHybridRepository; } - [HttpGet("by-event")] + [HttpGet("{movieId}/by-event")] public async Task GetByEvent(string movieId, CancellationToken cancellationToken) { var movieUri = new MovieUri(movieId); @@ -35,7 +38,7 @@ public async Task GetByEvent(string movieId, CancellationToken ca return NotFound(); } - [HttpGet("by-snapshot")] + [HttpGet("{movieId}/by-snapshot")] public async Task GetBySnapshot(string movieId, CancellationToken cancellationToken) { var movieUri = new MovieUri(movieId); @@ -48,4 +51,55 @@ public async Task GetBySnapshot(string movieId, CancellationToken return NotFound(); } + + [HttpGet("{movieId}/by-hybrid")] + public async Task GetByHybrid(string movieId, CancellationToken cancellationToken) + { + var movieUri = new MovieUri(movieId); + var movie = await _movieHybridRepository.GetSnapshotWithCatchupExpensivelyAsync( + movieUri.Uri, + MovieReadModel.StaticPartitionKey, + cancellationToken: cancellationToken + ); + + if (movie != null) + { + return Ok(movie); + } + + return NotFound(); + } + + [HttpGet("{movieId}/by-hybrid-query")] + public async Task GetByHybridQuery(string movieId, CancellationToken cancellationToken) + { + var movieUri = new MovieUri(movieId); + + var movies = await _movieHybridRepository + .QuerySnapshotsWithCatchupExpensivelyAsync( + MovieReadModel.StaticPartitionKey, + q => q.Where(m => m.MovieUri.Uri == movieUri.Uri), + cancellationToken: cancellationToken + ) + .ToListAsync(); + + var movie = movies.FirstOrDefault(); + if (movie != null) + { + return Ok(new MovieQueryResponse(movie.MovieUri, movie.Name)); + } + + return NotFound(); + } + + [HttpPost("create-hybrid")] + public async Task CreateUsingHybrid(CreateMovieRequest request, CancellationToken cancellationToken) + { + var movieUri = MovieUri.Generate(); + + var movieCreated = new MovieAdded(movieUri, request.Name); + var movie = await _movieHybridRepository.ApplyEventsAndUpdateSnapshotImmediately(movieUri.Uri, 0, cancellationToken, movieCreated); + + return Created(movieUri.Uri, movie); + } } diff --git a/sample/CustomerApi/Controllers/Movies/MovieQueryResponse.cs b/sample/CustomerApi/Controllers/Movies/MovieQueryResponse.cs new file mode 100644 index 00000000..efe5c2e2 --- /dev/null +++ b/sample/CustomerApi/Controllers/Movies/MovieQueryResponse.cs @@ -0,0 +1,5 @@ +using CustomerApi.Uris; + +namespace CustomerApi.Controllers.Movies; + +public record MovieQueryResponse(MovieUri MovieUri, string Name); diff --git a/sample/CustomerApi/Events/Movies/MovieNameChanged.cs b/sample/CustomerApi/Events/Movies/MovieNameChanged.cs new file mode 100644 index 00000000..4c7df254 --- /dev/null +++ b/sample/CustomerApi/Events/Movies/MovieNameChanged.cs @@ -0,0 +1,24 @@ +using CustomerApi.Uris; + +namespace CustomerApi.Events.Movies; + +public class MovieNameChanged : MovieEvent +{ + public string Name { get; } + + public MovieNameChanged(MovieUri movieUri, string name, DateTimeOffset? timestamp = null) + : base(movieUri, timestamp) + { + Name = name; + } + + public override void Apply(MovieReadModel model) + { + model.Name = Name; + } + + public override string GetDescription() + { + return $"Movie {MovieUri} name changed to {Name}"; + } +} diff --git a/src/LogOtter.CosmosDb.EventStore/Configure/EventSourcingBuilder.cs b/src/LogOtter.CosmosDb.EventStore/Configure/EventSourcingBuilder.cs index fcca3b4b..3c96c09d 100644 --- a/src/LogOtter.CosmosDb.EventStore/Configure/EventSourcingBuilder.cs +++ b/src/LogOtter.CosmosDb.EventStore/Configure/EventSourcingBuilder.cs @@ -63,6 +63,7 @@ public EventSourcingBuilder AddEventSource(string containerName, Act var eventRepository = typeof(EventRepository<,>); var snapshotRepository = typeof(SnapshotRepository<,>); + var hybridRepository = typeof(HybridRepository<,>); foreach (var projection in config.Projections) { Services.AddSingleton(eventRepository.MakeGenericType(typeof(TBaseEvent), projection.ProjectionType)); @@ -77,6 +78,7 @@ public EventSourcingBuilder AddEventSource(string containerName, Act ); Services.AddSingleton(snapshotRepository.MakeGenericType(typeof(TBaseEvent), projection.ProjectionType)); + Services.AddSingleton(hybridRepository.MakeGenericType(typeof(TBaseEvent), projection.ProjectionType)); } } diff --git a/src/LogOtter.CosmosDb.EventStore/Repositories/HybridRepository.cs b/src/LogOtter.CosmosDb.EventStore/Repositories/HybridRepository.cs index 0719bc3f..35e09263 100644 --- a/src/LogOtter.CosmosDb.EventStore/Repositories/HybridRepository.cs +++ b/src/LogOtter.CosmosDb.EventStore/Repositories/HybridRepository.cs @@ -4,6 +4,17 @@ namespace LogOtter.CosmosDb.EventStore; +/// +/// Uses the snapshot store and event stream in tandem to produce immediately consistent results at the cost of performance +/// This repository is suitable for use when: +/// * You are working with a relatively small amount of data (but not a single entity) +/// * You are working with a single entity that has a large number of events in the stream +/// * You require immediately consistent results +/// +/// If you are working with a single entity the is possibly a better choice if the number of events in the stream will not be large +/// If you are working with lots of data, the is almost definitely a better choice +/// Requirements for large amounts of immediately consistent data should be stored in an immediate state store, not an event sourced system. +/// public class HybridRepository where TBaseEvent : class, IEvent where TSnapshot : class, ISnapshot, new() @@ -57,12 +68,25 @@ IOptions options } } - public async Task ApplyEvents(string id, int? expectedRevision, params TBaseEvent[] events) + public async Task GetSnapshotWithCatchupExpensivelyAsync( + string id, + string partitionKey, + bool includeDeleted = false, + CancellationToken cancellationToken = default + ) { - return await ApplyEvents(id, expectedRevision, CancellationToken.None, events); + var streamId = _options.EscapeIdIfRequired(id); + var snapshot = await _snapshotRepository.GetSnapshot(id, partitionKey, includeDeleted, cancellationToken); + + return await ApplyNewEvents(snapshot, streamId, includeDeleted, cancellationToken); } - public async Task ApplyEvents(string id, int? expectedRevision, CancellationToken cancellationToken, params TBaseEvent[] events) + public async Task ApplyEventsAndUpdateSnapshotImmediately( + string id, + int? expectedRevision, + CancellationToken cancellationToken, + params TBaseEvent[] events + ) { var snapshot = await _eventRepository.ApplyEvents(id, expectedRevision, cancellationToken, events);