Skip to content

Commit

Permalink
fix(EventSourcing): Enabled plugin-based injection for all event sour…
Browse files Browse the repository at this point in the history
…cing implementations

fix(Plugins): Fixed the NugetPackagePluginSource to resolve the target framework using the entry assembly instead of a strong typed one
  • Loading branch information
cdavernas committed Mar 25, 2024
1 parent 67fc277 commit 2ffbd90
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
<ItemGroup>
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="23.2.1" />
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="23.2.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Neuroglia.Data.Infrastructure.EventSourcing\Neuroglia.Data.Infrastructure.EventSourcing.csproj" />
<ProjectReference Include="..\Neuroglia.Plugins.Abstractions\Neuroglia.Plugins.Abstractions.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Neuroglia.Data.Infrastructure.EventSourcing.Configuration;
using Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services;
using Neuroglia.Data.Infrastructure.EventSourcing.Services;
using Neuroglia.Plugins;
using Neuroglia.Serialization;
using System.IO;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
Expand All @@ -29,34 +30,28 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing;
/// <summary>
/// Represents the default <see href="https://www.eventstore.com/">Event Store</see> implementation of the <see cref="IEventStore"/> interface
/// </summary>
[Plugin(Tags = ["event-store"]), Factory(typeof(ESEventStoreFactory))]
public class ESEventStore
: IEventStore
{

/// <summary>
/// Initializes a new <see cref="ESEventStore"/>
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="logger">The service used to perform logging</param>
/// <param name="options">The options used to configure the <see cref="ESEventStore"/></param>
/// <param name="serializerProvider">The service used to provide <see cref="ISerializer"/>s</param>
/// <param name="eventStoreClient">The service used to interact with the remove <see href="https://www.eventstore.com/">Event Store</see> service</param>
/// <param name="eventStorePersistentSubscriptionsClient">The service used to interact with the remove <see href="https://www.eventstore.com/">Event Store</see> service, exclusively for persistent subscriptions</param>
public ESEventStore(IServiceProvider serviceProvider, ILogger<ESEventStore> logger, IOptions<EventStoreOptions> options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient)
public ESEventStore(ILogger<ESEventStore> logger, IOptions<EventStoreOptions> options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient)
{
this.ServiceProvider = serviceProvider;
this.Logger = logger;
this.Options = options.Value;
this.Serializer = serializerProvider.GetSerializers().First(s => this.Options.SerializerType == null || s.GetType() == this.Options.SerializerType);
this.EventStoreClient = eventStoreClient;
this.EventStorePersistentSubscriptionsClient = eventStorePersistentSubscriptionsClient;
}

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected virtual IServiceProvider ServiceProvider { get; }

/// <summary>
/// Gets the service used to perform logging
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using EventStore.Client;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services;

/// <summary>
/// Represents the service used to create <see cref="ESEventStore"/> instances
/// </summary>
/// <remarks>
/// Initializes a new <see cref="ESEventStoreFactory"/>
/// </remarks>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
public class ESEventStoreFactory(IServiceProvider serviceProvider)
: IFactory<ESEventStore>
{

/// <summary>
/// Gets the name of the EventStore connection string
/// </summary>
public const string ConnectionStringName = "eventstore";

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected IServiceProvider ServiceProvider { get; } = serviceProvider;

/// <inheritdoc/>
public virtual ESEventStore Create()
{
var configuration = this.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionString = configuration.GetConnectionString(ConnectionStringName);
if (string.IsNullOrWhiteSpace(connectionString)) throw new Exception($"An error occurred while attempting to create an ESEventStore instance. The '{ConnectionStringName}' connection string is not provided or is invalid. Please ensure that the connection string is properly configured in the application settings.");
var settings = EventStoreClientSettings.Create(connectionString);
return ActivatorUtilities.CreateInstance<ESEventStore>(this.ServiceProvider, new EventStoreClient(settings), new EventStorePersistentSubscriptionsClient(settings));
}

object IFactory.Create() => this.Create();

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,21 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing.DistributedCache.Services;
/// Represents an <see cref="IEventStore"/> implementation relying on an <see cref="IMemoryCache"/>
/// </summary>
/// <remarks>Should not be used in production</remarks>
/// <remarks>
/// Initializes a new <see cref="MemoryEventStore"/>
/// </remarks>
/// <param name="cache">The cache to stream events to</param>
[Plugin(Tags = ["event-store"]), Factory(typeof(MemoryCacheEventStoreFactory))]
public class MemoryEventStore
public class MemoryEventStore(IMemoryCache cache)
: IEventStore, IDisposable
{

private bool _disposed;

/// <summary>
/// Initializes a new <see cref="MemoryEventStore"/>
/// </summary>
/// <param name="cache">The cache to stream events to</param>
public MemoryEventStore(IMemoryCache cache)
{
this.Cache = cache;
}
bool _disposed;

/// <summary>
/// Gets the cache to stream events to
/// </summary>
protected IMemoryCache Cache { get; }
protected IMemoryCache Cache { get; } = cache;

/// <summary>
/// Gets the <see cref="ConcurrentDictionary{TKey, TValue}"/> containing all published <see cref="IEventRecord"/>s
Expand All @@ -66,19 +61,19 @@ public virtual Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> e
if (expectedVersion < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(expectedVersion));

this.Cache.TryGetValue<ObservableCollection<IEventRecord>>(streamId, out var stream);
var actualversion = stream == null ? (long?)null : (long)stream.Last().Offset;
var actualVersion = stream == null ? (long?)null : (long)stream.Last().Offset;

if (expectedVersion.HasValue)
{
if(expectedVersion.Value == StreamPosition.EndOfStream)
{
if (actualversion != null) throw new OptimisticConcurrencyException(expectedVersion, actualversion);
if (actualVersion != null) throw new OptimisticConcurrencyException(expectedVersion, actualVersion);
}
else if(actualversion == null || actualversion != expectedVersion) throw new OptimisticConcurrencyException(expectedVersion, actualversion);
else if(actualVersion == null || actualVersion != expectedVersion) throw new OptimisticConcurrencyException(expectedVersion, actualVersion);
}

stream ??= [];
ulong offset = actualversion.HasValue ? (ulong)actualversion.Value + 1 : StreamPosition.StartOfStream;
ulong offset = actualVersion.HasValue ? (ulong)actualVersion.Value + 1 : StreamPosition.StartOfStream;
foreach(var e in events)
{
var record = new EventRecord(streamId, Guid.NewGuid().ToString(), offset, (ulong)this.Stream.Count, DateTimeOffset.Now, e.Type, e.Data, e.Metadata);
Expand Down Expand Up @@ -122,7 +117,7 @@ public virtual IAsyncEnumerable<IEventRecord> ReadAsync(string? streamId, Stream
protected virtual async IAsyncEnumerable<IEventRecord> ReadFromStreamAsync(string streamId, StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

if (!this.Cache.TryGetValue<ObservableCollection<IEventRecord>>(streamId, out var stream) || stream == null) throw new StreamNotFoundException(streamId);

Expand Down Expand Up @@ -157,7 +152,7 @@ protected virtual async IAsyncEnumerable<IEventRecord> ReadFromStreamAsync(strin
}

/// <summary>
/// Reads recorded events accross all streams
/// Reads recorded events across all streams
/// </summary>
/// <param name="readDirection">The direction in which to read events</param>
/// <param name="offset">The offset starting from which to read events</param>
Expand Down Expand Up @@ -190,12 +185,12 @@ public virtual Task<IObservable<IEventRecord>> ObserveAsync(string? streamId, lo
protected virtual async Task<IObservable<IEventRecord>> ObserveStreamAsync(string streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

if (!this.Cache.TryGetValue<ObservableCollection<IEventRecord>>(streamId, out var stream) || stream == null) throw new StreamNotFoundException(streamId);

var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false) ?? offset;
var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty<IEventRecord>().ToList() : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false);
var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(streamId, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false);
var subject = new Subject<IEventRecord>();

var checkpointedPosition = (ulong?)null;
Expand All @@ -218,10 +213,10 @@ protected virtual async Task<IObservable<IEventRecord>> ObserveStreamAsync(strin
/// <returns>A new <see cref="IObservable{T}"/> used to observe events</returns>
protected virtual async Task<IObservable<IEventRecord>> ObserveAllAsync(long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default)
{
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

var storedOffset = string.IsNullOrWhiteSpace(consumerGroup) ? offset : await this.GetOffsetAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false) ?? offset;
var events = storedOffset == StreamPosition.EndOfStream ? Array.Empty<IEventRecord>().ToList() : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false);
var events = storedOffset == StreamPosition.EndOfStream ? [] : await (this.ReadAsync(null, StreamReadDirection.Forwards, storedOffset, cancellationToken: cancellationToken)).ToListAsync(cancellationToken).ConfigureAwait(false);
var subject = new ReplaySubject<IEventRecord>();

var checkpointedPosition = (ulong?)null;
Expand Down Expand Up @@ -250,7 +245,7 @@ protected virtual async Task<IObservable<IEventRecord>> ObserveAllAsync(long off
public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

var checkpointedPosition = (ulong?)await this.GetOffsetAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false);
if (checkpointedPosition.HasValue) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, checkpointedPosition.Value, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Neuroglia.Data.Infrastructure.EventSourcing\Neuroglia.Data.Infrastructure.EventSourcing.csproj" />
<ProjectReference Include="..\Neuroglia.Plugins.Abstractions\Neuroglia.Plugins.Abstractions.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit 2ffbd90

Please sign in to comment.