From 2ffbd90b0c5693c7c29ad628aeae940117272725 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Mon, 25 Mar 2024 14:04:10 +0100 Subject: [PATCH] fix(EventSourcing): Enabled plugin-based injection for all event sourcing implementations fix(Plugins): Fixed the NugetPackagePluginSource to resolve the target framework using the entry assembly instead of a strong typed one --- ...astructure.EventSourcing.EventStore.csproj | 2 + .../Services/ESEventStore.cs | 13 ++-- .../Services/ESEventStoreFactory.cs | 53 +++++++++++++++ .../Services/MemoryEventStore.cs | 41 ++++++------ ....Infrastructure.EventSourcing.Redis.csproj | 2 + .../Services/RedisEventStore.cs | 48 +++++++------- .../Services/RedisEventStoreFactory.cs | 53 +++++++++++++++ .../Services/RedisDatabase.cs | 6 +- .../Services/RedisDatabaseFactory.cs | 17 ++--- .../Services/NugetPackagePluginSource.cs | 64 +++++++++---------- .../Services/PluginNugetProject.cs | 4 +- .../EventSourcing/EventStoreTestsBase.cs | 12 +--- 12 files changed, 197 insertions(+), 118 deletions(-) create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStoreFactory.cs diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj index 48a1a9844..08119969a 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj @@ -31,11 +31,13 @@ + + \ No newline at end of file diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs index b0e3a9f5c..762b8d039 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs @@ -16,9 +16,10 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Neuroglia.Data.Infrastructure.EventSourcing.Configuration; +using Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; using Neuroglia.Data.Infrastructure.EventSourcing.Services; +using Neuroglia.Plugins; using Neuroglia.Serialization; -using System.IO; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Runtime.CompilerServices; @@ -29,6 +30,7 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing; /// /// Represents the default Event Store implementation of the interface /// +[Plugin(Tags = ["event-store"]), Factory(typeof(ESEventStoreFactory))] public class ESEventStore : IEventStore { @@ -36,15 +38,13 @@ public class ESEventStore /// /// Initializes a new /// - /// The current /// The service used to perform logging /// The options used to configure the /// The service used to provide s /// The service used to interact with the remove Event Store service /// The service used to interact with the remove Event Store service, exclusively for persistent subscriptions - public ESEventStore(IServiceProvider serviceProvider, ILogger logger, IOptions options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient) + public ESEventStore(ILogger logger, IOptions options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient) { - this.ServiceProvider = serviceProvider; this.Logger = logger; this.Options = options.Value; this.Serializer = serializerProvider.GetSerializers().First(s => this.Options.SerializerType == null || s.GetType() == this.Options.SerializerType); @@ -52,11 +52,6 @@ public ESEventStore(IServiceProvider serviceProvider, ILogger logg this.EventStorePersistentSubscriptionsClient = eventStorePersistentSubscriptionsClient; } - /// - /// Gets the current - /// - protected virtual IServiceProvider ServiceProvider { get; } - /// /// Gets the service used to perform logging /// diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs new file mode 100644 index 000000000..5dd09c250 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs @@ -0,0 +1,53 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using EventStore.Client; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; + +/// +/// Represents the service used to create instances +/// +/// +/// Initializes a new +/// +/// The current +public class ESEventStoreFactory(IServiceProvider serviceProvider) + : IFactory +{ + + /// + /// Gets the name of the EventStore connection string + /// + public const string ConnectionStringName = "eventstore"; + + /// + /// Gets the current + /// + protected IServiceProvider ServiceProvider { get; } = serviceProvider; + + /// + public virtual ESEventStore Create() + { + var configuration = this.ServiceProvider.GetRequiredService(); + var connectionString = configuration.GetConnectionString(ConnectionStringName); + if (string.IsNullOrWhiteSpace(connectionString)) throw new Exception($"An error occurred while attempting to create an ESEventStore instance. The '{ConnectionStringName}' connection string is not provided or is invalid. Please ensure that the connection string is properly configured in the application settings."); + var settings = EventStoreClientSettings.Create(connectionString); + return ActivatorUtilities.CreateInstance(this.ServiceProvider, new EventStoreClient(settings), new EventStorePersistentSubscriptionsClient(settings)); + } + + object IFactory.Create() => this.Create(); + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs index 6c08aace9..816894000 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs @@ -27,26 +27,21 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing.DistributedCache.Services; /// Represents an implementation relying on an /// /// Should not be used in production +/// +/// Initializes a new +/// +/// The cache to stream events to [Plugin(Tags = ["event-store"]), Factory(typeof(MemoryCacheEventStoreFactory))] -public class MemoryEventStore +public class MemoryEventStore(IMemoryCache cache) : IEventStore, IDisposable { - private bool _disposed; - - /// - /// Initializes a new - /// - /// The cache to stream events to - public MemoryEventStore(IMemoryCache cache) - { - this.Cache = cache; - } + bool _disposed; /// /// Gets the cache to stream events to /// - protected IMemoryCache Cache { get; } + protected IMemoryCache Cache { get; } = cache; /// /// Gets the containing all published s @@ -66,19 +61,19 @@ public virtual Task AppendAsync(string streamId, IEnumerable e if (expectedVersion < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(expectedVersion)); this.Cache.TryGetValue>(streamId, out var stream); - var actualversion = stream == null ? (long?)null : (long)stream.Last().Offset; + var actualVersion = stream == null ? (long?)null : (long)stream.Last().Offset; if (expectedVersion.HasValue) { if(expectedVersion.Value == StreamPosition.EndOfStream) { - if (actualversion != null) throw new OptimisticConcurrencyException(expectedVersion, actualversion); + if (actualVersion != null) throw new OptimisticConcurrencyException(expectedVersion, actualVersion); } - else if(actualversion == null || actualversion != expectedVersion) throw new OptimisticConcurrencyException(expectedVersion, actualversion); + else if(actualVersion == null || actualVersion != expectedVersion) throw new OptimisticConcurrencyException(expectedVersion, actualVersion); } stream ??= []; - ulong offset = actualversion.HasValue ? (ulong)actualversion.Value + 1 : StreamPosition.StartOfStream; + ulong offset = actualVersion.HasValue ? (ulong)actualVersion.Value + 1 : StreamPosition.StartOfStream; foreach(var e in events) { var record = new EventRecord(streamId, Guid.NewGuid().ToString(), offset, (ulong)this.Stream.Count, DateTimeOffset.Now, e.Type, e.Data, e.Metadata); @@ -122,7 +117,7 @@ public virtual IAsyncEnumerable ReadAsync(string? streamId, Stream protected virtual async IAsyncEnumerable ReadFromStreamAsync(string streamId, StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); if (!this.Cache.TryGetValue>(streamId, out var stream) || stream == null) throw new StreamNotFoundException(streamId); @@ -157,7 +152,7 @@ protected virtual async IAsyncEnumerable ReadFromStreamAsync(strin } /// - /// Reads recorded events accross all streams + /// Reads recorded events across all streams /// /// The direction in which to read events /// The offset starting from which to read events @@ -190,12 +185,12 @@ public virtual Task> ObserveAsync(string? streamId, lo protected virtual async Task> ObserveStreamAsync(string streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); if (!this.Cache.TryGetValue>(streamId, out var stream) || stream == null) throw new StreamNotFoundException(streamId); var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false) ?? offset; - var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty().ToList() : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false); + var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false); var subject = new Subject(); var checkpointedPosition = (ulong?)null; @@ -218,10 +213,10 @@ protected virtual async Task> ObserveStreamAsync(strin /// A new used to observe events protected virtual async Task> ObserveAllAsync(long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) { - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false) ?? offset; - var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty().ToList() : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false); + var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false); var subject = new ReplaySubject(); var checkpointedPosition = (ulong?)null; @@ -250,7 +245,7 @@ protected virtual async Task> ObserveAllAsync(long off public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup)); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var checkpointedPosition = (ulong?)await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false); if (checkpointedPosition.HasValue) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, checkpointedPosition.Value, cancellationToken).ConfigureAwait(false); diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Neuroglia.Data.Infrastructure.EventSourcing.Redis.csproj b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Neuroglia.Data.Infrastructure.EventSourcing.Redis.csproj index e267e0f33..80784766e 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Neuroglia.Data.Infrastructure.EventSourcing.Redis.csproj +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Neuroglia.Data.Infrastructure.EventSourcing.Redis.csproj @@ -29,12 +29,14 @@ + + diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs index 7be3cda00..44e783808 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs @@ -13,9 +13,9 @@ using Microsoft.Extensions.Options; using Neuroglia.Data.Infrastructure.EventSourcing.Configuration; +using Neuroglia.Plugins; using Neuroglia.Serialization; using StackExchange.Redis; -using System.IO; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -26,6 +26,7 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; /// /// Represents a StackExchange Redis implementation of the interface /// +[Plugin(Tags = ["event-store"]), Factory(typeof(RedisEventStoreFactory))] public class RedisEventStore : IEventStore { @@ -82,25 +83,25 @@ public virtual async Task AppendAsync(string streamId, IEnumerable(); record.Metadata[EventRecordMetadata.ClrTypeName] = e.Data?.GetType().AssemblyQualifiedName!; var entryValue = this.Serializer.SerializeToByteArray(record); - await this.Database.HashSetAsync(streamId, new HashEntry[] { new(offset, entryValue) }).ConfigureAwait(false); + await this.Database.HashSetAsync(streamId, [new(offset, entryValue)]).ConfigureAwait(false); await this.AppendToGlobalStreamAsync(record, cancellationToken).ConfigureAwait(false); await this.Database.PublishAsync(this.GetRedisChannel(streamId), entryValue).ConfigureAwait(false); await this.Database.PublishAsync(this.GetRedisChannel(), entryValue).ConfigureAwait(false); @@ -118,9 +119,9 @@ public virtual async Task AppendAsync(string streamId, IEnumerable @@ -132,7 +133,7 @@ public virtual async Task GetAsync(string streamId, Canc var keys = (await this.Database.HashKeysAsync(streamId).ConfigureAwait(false)).Order().ToList(); DateTimeOffset? firstEventAt = null; DateTimeOffset? lastEventAt = null; - if(keys.Any()) + if(keys.Count != 0) { firstEventAt = this.Serializer.Deserialize((byte[])(await this.Database.HashGetAsync(streamId, keys.First()).ConfigureAwait(false))!)!.Timestamp; lastEventAt = this.Serializer.Deserialize((byte[])(await this.Database.HashGetAsync(streamId, keys.Last()).ConfigureAwait(false))!)!.Timestamp; @@ -161,7 +162,7 @@ protected virtual async IAsyncEnumerable ReadFromStreamAsync(strin { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); if (!await this.Database.KeyExistsAsync(streamId).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var hashKeys = (await this.Database.HashKeysAsync(streamId).ConfigureAwait(false)).Order().ToList(); @@ -193,7 +194,7 @@ protected virtual async IAsyncEnumerable ReadFromStreamAsync(strin } /// - /// Reads recorded events accross all streams + /// Reads recorded events across all streams /// /// The direction in which to read events /// The offset starting from which to read events @@ -254,13 +255,13 @@ public virtual async Task> ObserveStreamAsync(string s { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); if (!await this.Database.KeyExistsAsync(streamId).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var checkpointedPosition = (ulong?)null; if (!string.IsNullOrWhiteSpace(consumerGroup)) checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false); var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false) ?? offset; - var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty().ToList() : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken).Select(e => this.WrapEvent(e, streamId, consumerGroup, checkpointedPosition.HasValue ? checkpointedPosition.Value > e.Offset : string.IsNullOrWhiteSpace(consumerGroup) ? null : false))).ToListAsync(cancellationToken).ConfigureAwait(false); + var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken).Select(e => this.WrapEvent(e, streamId, consumerGroup, checkpointedPosition.HasValue ? checkpointedPosition.Value > e.Offset : string.IsNullOrWhiteSpace(consumerGroup) ? null : false))).ToListAsync(cancellationToken).ConfigureAwait(false); var messageQueue = await this.Subscriber.SubscribeAsync(this.GetRedisChannel(streamId)); var subject = new Subject(); @@ -280,13 +281,13 @@ public virtual async Task> ObserveStreamAsync(string s /// A new used to observe events public virtual async Task> ObserveAllAsync(long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) { - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var checkpointedPosition = (ulong?)null; if (!string.IsNullOrWhiteSpace(consumerGroup)) checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, null, cancellationToken).ConfigureAwait(false); var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false) ?? offset; - var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty().ToList() : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken).Select(e => this.WrapEvent(e, null, consumerGroup, checkpointedPosition.HasValue ? checkpointedPosition.Value > e.Offset : string.IsNullOrWhiteSpace(consumerGroup) ? null : false))).ToListAsync(cancellationToken).ConfigureAwait(false); + var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken).Select(e => this.WrapEvent(e, null, consumerGroup, checkpointedPosition.HasValue ? checkpointedPosition.Value > e.Offset : string.IsNullOrWhiteSpace(consumerGroup) ? null : false))).ToListAsync(cancellationToken).ConfigureAwait(false); var messageQueue = await this.Subscriber.SubscribeAsync(this.GetRedisChannel()); var subject = new ReplaySubject(); @@ -305,7 +306,7 @@ public virtual async Task TruncateAsync(string streamId, ulong? beforeVersion = if (beforeVersion.HasValue && beforeVersion < StreamPosition.StartOfStream) throw new ArgumentOutOfRangeException(nameof(beforeVersion)); var hashKeys = (await this.Database.HashKeysAsync(streamId).ConfigureAwait(false)).Order().ToList(); - if (!hashKeys.Any()) return; + if (hashKeys.Count == 0) return; var beforeElement = hashKeys.Select(k => (ulong?)k).FirstOrDefault(o => o >= beforeVersion); if (beforeElement != null) @@ -313,7 +314,7 @@ public virtual async Task TruncateAsync(string streamId, ulong? beforeVersion = var index = hashKeys.Select(k => (ulong?)k).ToList().IndexOf(beforeElement); hashKeys = hashKeys.Take(index).ToList(); } - await this.Database.HashDeleteAsync(streamId, hashKeys.ToArray()).ConfigureAwait(false); + await this.Database.HashDeleteAsync(streamId, [.. hashKeys]).ConfigureAwait(false); } /// @@ -345,7 +346,7 @@ public virtual async Task DeleteAsync(string streamId, CancellationToken cancell public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup)); - if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); var checkpointedPosition = (ulong?)await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false); if (checkpointedPosition.HasValue) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, checkpointedPosition.Value, cancellationToken).ConfigureAwait(false); @@ -455,16 +456,11 @@ protected static class EventRecordMetadata } - class RedisSubscription - : IDisposable + class RedisSubscription(ChannelMessageQueue queue) + : IDisposable { - private readonly ChannelMessageQueue _queue; - - public RedisSubscription(ChannelMessageQueue queue) - { - this._queue = queue; - } + readonly ChannelMessageQueue _queue = queue; /// public void Dispose() diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStoreFactory.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStoreFactory.cs new file mode 100644 index 000000000..38c125570 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStoreFactory.cs @@ -0,0 +1,53 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using StackExchange.Redis; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Represents the service used to create instances +/// +/// +/// Initializes a new +/// +/// The current +public class RedisEventStoreFactory(IServiceProvider serviceProvider) + : IFactory +{ + + /// + /// Gets the name of the Redis connection string + /// + public const string ConnectionStringName = "redis"; + + /// + /// Gets the current + /// + protected IServiceProvider ServiceProvider { get; } = serviceProvider; + + /// + public virtual RedisEventStore Create() + { + var configuration = this.ServiceProvider.GetRequiredService(); + var connectionString = configuration.GetConnectionString(ConnectionStringName); + if (string.IsNullOrWhiteSpace(connectionString)) throw new Exception($"An error occurred while attempting to create an RedisEventStore instance. The '{ConnectionStringName}' connection string is not provided or is invalid. Please ensure that the connection string is properly configured in the application settings."); + var connectionMultiplexer = ConnectionMultiplexer.Connect(connectionString); + return ActivatorUtilities.CreateInstance(this.ServiceProvider, connectionMultiplexer); + } + + object IFactory.Create() => this.Create(); + +} diff --git a/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabase.cs b/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabase.cs index d25cbce79..790fa3816 100644 --- a/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabase.cs +++ b/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabase.cs @@ -25,7 +25,7 @@ namespace Neuroglia.Data.Infrastructure.ResourceOriented.Services; /// -/// Represents a Redis implementation of the interface +/// Represents a Redis implementation of the interface /// [Plugin(Tags = ["roa", "redis"]), Factory(typeof(RedisDatabaseFactory))] public class RedisDatabase @@ -69,7 +69,7 @@ public RedisDatabase(ILoggerFactory loggerFactory, IConnectionMultiplexer redis, protected IConnectionMultiplexer Redis { get; } /// - /// Gets the current + /// Gets the current /// protected StackExchange.Redis.IDatabase Database { get; } @@ -108,7 +108,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationTo initialized = true; await this.CreateResourceAsync(new NamespaceDefinition(), false, this.CancellationTokenSource.Token).ConfigureAwait(false); } - if ((await this.ListResourcesAsync(cancellationToken: this.CancellationTokenSource.Token).ConfigureAwait(false)).Items?.Any() == false) + if ((await this.ListResourcesAsync(cancellationToken: this.CancellationTokenSource.Token).ConfigureAwait(false)).Items?.Count == 0) { initialized = true; await this.CreateNamespaceAsync(Namespace.DefaultNamespaceName, false, this.CancellationTokenSource.Token).ConfigureAwait(false); diff --git a/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabaseFactory.cs b/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabaseFactory.cs index b45fa2e31..58e0cebf5 100644 --- a/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabaseFactory.cs +++ b/src/Neuroglia.Data.Infrastructure.ResourceOriented.Redis/Services/RedisDatabaseFactory.cs @@ -18,25 +18,20 @@ namespace Neuroglia.Data.Infrastructure.ResourceOriented.Services; /// /// Represents the service used to create s /// -public class RedisDatabaseFactory +/// +/// Initializes a new +/// +/// The current application's services +public class RedisDatabaseFactory(IServiceProvider applicationServices) : IFactory, IDisposable, IAsyncDisposable { private bool _disposed; - /// - /// Initializes a new - /// - /// The current application's services - public RedisDatabaseFactory(IServiceProvider applicationServices) - { - this.ApplicationServices = applicationServices; - } - /// /// Gets the current application's services /// - protected IServiceProvider ApplicationServices { get; } + protected IServiceProvider ApplicationServices { get; } = applicationServices; /// /// Gets the plugin's services diff --git a/src/Neuroglia.Plugins/Services/NugetPackagePluginSource.cs b/src/Neuroglia.Plugins/Services/NugetPackagePluginSource.cs index 6022e9364..e6ecd42ce 100644 --- a/src/Neuroglia.Plugins/Services/NugetPackagePluginSource.cs +++ b/src/Neuroglia.Plugins/Services/NugetPackagePluginSource.cs @@ -39,7 +39,7 @@ public class NugetPackagePluginSource { const string DefaultPackageSource = "https://api.nuget.org/v3/index.json"; - readonly List _assemblies = new(); + readonly List _assemblies = []; /// /// Initializes a new @@ -158,20 +158,20 @@ public virtual async Task LoadAsync(CancellationToken cancellationToken = defaul var settings = Settings.LoadDefaultSettings(string.Empty, null, new MachineWideSettings()); var packageSourceProvider = new PackageSourceProvider(settings); var sourceRepositoryProvider = new SourceRepositoryProvider(packageSourceProvider, Providers); - var framework = NuGetFramework.Parse(typeof(PluginManager).Assembly.GetCustomAttribute()!.FrameworkName!); + var framework = NuGetFramework.Parse(Assembly.GetEntryAssembly()!.GetCustomAttribute()!.FrameworkName!); var package = await this.GetPackageAsync(this.PackageId, this.PackageVersion, this.IncludePreRelease, false, cancellationToken).ConfigureAwait(false) ?? throw new ArgumentNullException($"Failed to find the specified package with id '{this.PackageId}' and version '{this.PackageVersion}'"); var project = new PluginNugetProject(this.PackagesDirectory.FullName, this.PluginDirectory.FullName, package.Identity, framework); var packageManager = new NuGetPackageManager(sourceRepositoryProvider, settings, this.PackagesDirectory.FullName) { PackagesFolderNuGetProject = project }; var clientPolicyContext = ClientPolicyContext.GetClientPolicy(settings, this.NugetLogger); - var projectContext = new PluginNugetProjectContext(this.LoggerFactory) { PackageExtractionContext = new PackageExtractionContext(PackageSaveMode.Defaultv2, PackageExtractionBehavior.XmlDocFileSaveMode, clientPolicyContext, this.NugetLogger) }; + var projectContext = new PluginNugetProjectContext(this.LoggerFactory) { PackageExtractionContext = new PackageExtractionContext(PackageSaveMode.Defaultv3, PackageExtractionBehavior.XmlDocFileSaveMode, clientPolicyContext, this.NugetLogger) }; var resolutionContext = new ResolutionContext(DependencyBehavior.Lowest, true, false, VersionConstraints.None); var downloadContext = new PackageDownloadContext(resolutionContext.SourceCacheContext, this.PackagesDirectory.FullName, resolutionContext.SourceCacheContext.DirectDownload); - await packageManager.InstallPackageAsync(project, package.Identity, resolutionContext, projectContext, downloadContext, package.SourceRepository, new List(), cancellationToken).ConfigureAwait(false); + await packageManager.InstallPackageAsync(project, package.Identity, resolutionContext, projectContext, downloadContext, package.SourceRepository, [], cancellationToken).ConfigureAwait(false); await project.PostProcessAsync(projectContext, cancellationToken).ConfigureAwait(false); await project.PreProcessAsync(projectContext, cancellationToken).ConfigureAwait(false); - await packageManager.RestorePackageAsync(package.Identity, projectContext, downloadContext, new[] { package.SourceRepository }, cancellationToken).ConfigureAwait(false); + await packageManager.RestorePackageAsync(package.Identity, projectContext, downloadContext, [package.SourceRepository], cancellationToken).ConfigureAwait(false); foreach (var pluginAssemblyFilePath in project.PluginAssemblies) { @@ -211,7 +211,7 @@ public virtual async Task LoadAsync(CancellationToken cancellationToken = defaul } /// - /// Searchs a for the specified Nuget package + /// Searches a for the specified Nuget package /// /// The id of the package to search for /// The version, if any, of the package to search for @@ -256,51 +256,45 @@ public virtual async Task LoadAsync(CancellationToken cancellationToken = defaul /// The package source uri, if any /// A new protected virtual PackageSource BuildPackageSource(Uri sourceUri) - { - if (sourceUri == null) throw new ArgumentNullException(nameof(sourceUri)); + { + ArgumentNullException.ThrowIfNull(sourceUri); - var source = sourceUri.OriginalString; - var userInfo = sourceUri.UserInfo; - if (string.IsNullOrWhiteSpace(source)) source = DefaultPackageSource; - var packageSource = new PackageSource(source); + var source = sourceUri.OriginalString; + var userInfo = sourceUri.UserInfo; + if (string.IsNullOrWhiteSpace(source)) source = DefaultPackageSource; + var packageSource = new PackageSource(source); - if (!string.IsNullOrWhiteSpace(userInfo)) - { - var components = userInfo.Split(':', StringSplitOptions.RemoveEmptyEntries); - var username = components.Length > 0 ? components[0] : null; - var password = components.Length > 1 ? components[1] : null; - packageSource.Credentials = new PackageSourceCredential(source, username, password, true, null); - } - - return packageSource; + if (!string.IsNullOrWhiteSpace(userInfo)) + { + var components = userInfo.Split(':', StringSplitOptions.RemoveEmptyEntries); + var username = components.Length > 0 ? components[0] : null; + var password = components.Length > 1 ? components[1] : null; + packageSource.Credentials = new PackageSourceCredential(source, username, password, true, null); } + return packageSource; + } + /// /// Describes a Nuget package /// - protected class NugetPackageInfo + /// + /// Initializes a new + /// + /// The the package is sourced by + /// The package's identity + protected class NugetPackageInfo(SourceRepository sourceRepository, PackageIdentity identity) { - /// - /// Initializes a new - /// - /// The the package is sourced by - /// The package's identitye - public NugetPackageInfo(SourceRepository sourceRepository, PackageIdentity identity) - { - this.SourceRepository = sourceRepository; - this.Identity = identity; - } - /// /// Gets the the package is sourced by /// - public SourceRepository SourceRepository { get; } + public SourceRepository SourceRepository { get; } = sourceRepository; /// /// Gets the package's identity /// - public PackageIdentity Identity { get; } + public PackageIdentity Identity { get; } = identity; } diff --git a/src/Neuroglia.Plugins/Services/PluginNugetProject.cs b/src/Neuroglia.Plugins/Services/PluginNugetProject.cs index 2c1f7d53c..12510080c 100644 --- a/src/Neuroglia.Plugins/Services/PluginNugetProject.cs +++ b/src/Neuroglia.Plugins/Services/PluginNugetProject.cs @@ -27,7 +27,7 @@ public class PluginNugetProject : FolderNuGetProject { - readonly List _pluginAssemblies = new(); + readonly List _pluginAssemblies = []; /// /// Initializes a new @@ -113,7 +113,7 @@ protected virtual async Task ExtractAssembliesAsync(PackageIdentity packageIdent if (mostCompatibleFramework == null) return; var matchingEntries = entriesWithTargetFramework.Where(e => e.TargetFramework == mostCompatibleFramework).ToList(); - if (!matchingEntries.Any()) return; + if (matchingEntries.Count == 0) return; foreach (var entry in matchingEntries) { diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs index 0935bb025..5c71c0628 100644 --- a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs +++ b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs @@ -21,16 +21,10 @@ namespace Neuroglia.UnitTests.Cases.Data.Infrastructure.EventSourcing; -public abstract class EventStoreTestsBase +public abstract class EventStoreTestsBase(IServiceCollection services) : IAsyncLifetime { - - public EventStoreTestsBase(IServiceCollection services) - { - ServiceProvider = services.BuildServiceProvider(); - } - - protected ServiceProvider ServiceProvider { get; } + protected ServiceProvider ServiceProvider { get; } = services.BuildServiceProvider(); protected CancellationTokenSource CancellationTokenSource { get; } = new(); @@ -97,7 +91,7 @@ public async Task Append_WithValidExpectedVersion_Should_Work() } [Fact, Priority(4)] - public async Task Append_WithInvalidExpectedVersion_Should_Throw_OptmisticConcurrencyException() + public async Task Append_WithInvalidExpectedVersion_Should_Throw_OptimisticConcurrencyException() { //arrange var streamId = "fake-stream";