From 906f7ab703eed6fd798a52121e6be7e3533900ed Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Sat, 18 Feb 2023 14:42:49 +0100 Subject: [PATCH] Upgrade to Orleans 7 (draft) --- global.json | 5 - .../Orleans.Persistence.Cassandra.csproj | 10 +- .../Storage/CassandraGrainStorage.cs | 755 +++++++++--------- .../StorageExtensions.cs | 40 +- 4 files changed, 413 insertions(+), 397 deletions(-) delete mode 100644 global.json diff --git a/global.json b/global.json deleted file mode 100644 index ad1cbff..0000000 --- a/global.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "sdk": { - "version": "2.1.302" - } -} \ No newline at end of file diff --git a/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj b/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj index ca1f553..87a6cda 100644 --- a/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj +++ b/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj @@ -1,10 +1,12 @@  - netstandard2.0 + net7.0 + warnings - - - + + + + diff --git a/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs b/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs index 968549d..50ff3b6 100644 --- a/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs +++ b/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs @@ -1,370 +1,387 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -using Cassandra; -using Cassandra.Data.Linq; -using Cassandra.Mapping; - -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -using Newtonsoft.Json; - -using Orleans.Configuration; -using Orleans.Persistence.Cassandra.Concurrency; -using Orleans.Persistence.Cassandra.Models; -using Orleans.Persistence.Cassandra.Options; -using Orleans.Runtime; -using Orleans.Serialization; -using Orleans.Storage; - -namespace Orleans.Persistence.Cassandra.Storage -{ - internal sealed class CassandraGrainStorage : IGrainStorage, ILifecycleParticipant - { - private const ConsistencyLevel SerialConsistencyLevel = ConsistencyLevel.Serial; - private const ConsistencyLevel DefaultConsistencyLevel = ConsistencyLevel.Quorum; - - private static readonly CqlQueryOptions SerialConsistencyQueryOptions = - CqlQueryOptions.New().SetSerialConsistencyLevel(SerialConsistencyLevel); - - private static readonly CqlQueryOptions DefaultConsistencyQueryOptions = - CqlQueryOptions.New().SetConsistencyLevel(DefaultConsistencyLevel); - - private readonly string _name; - private readonly string _serviceId; - private readonly CassandraStorageOptions _cassandraStorageOptions; - private readonly ILogger _logger; - private readonly IGrainFactory _grainFactory; - private readonly ITypeResolver _typeResolver; - private readonly HashSet _concurrentStateTypes; - - private JsonSerializerSettings _jsonSettings; - private Cluster _cluster; - private Mapper _mapper; - - public CassandraGrainStorage( - string name, - CassandraStorageOptions cassandraStorageOptions, - IOptions clusterOptions, - ILogger logger, - IGrainFactory grainFactory, - ITypeResolver typeResolver, - IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider, - ILoggerProvider loggerProvider) - { - _name = name; - _serviceId = clusterOptions.Value.ServiceId; - _cassandraStorageOptions = cassandraStorageOptions; - _logger = logger; - _grainFactory = grainFactory; - _typeResolver = typeResolver; - _concurrentStateTypes = new HashSet(concurrentGrainStateTypesProvider.GetGrainStateTypes()); - - Diagnostics.CassandraPerformanceCountersEnabled = _cassandraStorageOptions.Diagnostics.PerformanceCountersEnabled; - Diagnostics.CassandraStackTraceIncluded = _cassandraStorageOptions.Diagnostics.StackTraceIncluded; - - if (loggerProvider != null) - { - Diagnostics.AddLoggerProvider(loggerProvider); - } - } - - public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (_, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - if (cassandraState != null) - { - grainState.State = JsonConvert.DeserializeObject(cassandraState.State, _jsonSettings); - grainState.ETag = isConcurrentState ? cassandraState.ETag : string.Empty; - } - } - - public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - try - { - var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); - if (isConcurrentState) - { - var newEtag = 0; - if (cassandraState == null) - { - cassandraState = new CassandraGrainState - { - Id = id, - GrainType = grainType, - State = json, - ETag = newEtag.ToString() - }; - - await _mapper.InsertIfNotExistsAsync(cassandraState, SerialConsistencyQueryOptions) - .ConfigureAwait(false); - } - else - { - int.TryParse(grainState.ETag, out var stateEtag); - newEtag = stateEtag; - newEtag++; - - var appliedInfo = - await _mapper.UpdateIfAsync( - Cql.New( - $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + - $"IF {nameof(CassandraGrainState.ETag)} = ?", - json, - newEtag.ToString(), - id, - grainType, - stateEtag.ToString()) - .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) - .ConfigureAwait(false); - - if (!appliedInfo.Applied) - { - throw new CassandraConcurrencyException(cassandraState.Id, stateEtag.ToString(), appliedInfo.Existing.ETag); - } - } - - grainState.ETag = newEtag.ToString(); - } - else - { - if (cassandraState == null) - { - cassandraState = new CassandraGrainState - { - Id = id, - GrainType = grainType, - State = json, - ETag = string.Empty - }; - - await _mapper.InsertAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - else - { - cassandraState.State = json; - await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - } - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while creating grain state for grain {grainId}.", id); - - throw; - } - } - - public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - try - { - if (_cassandraStorageOptions.DeleteStateOnClear) - { - await _mapper.DeleteAsync( - Cql.New( - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", - id, - grainType) - .WithOptions( - x => - { - if (isConcurrentState) - { - x.SetSerialConsistencyLevel(SerialConsistencyLevel); - } - else - { - x.SetConsistencyLevel(DefaultConsistencyLevel); - } - })) - .ConfigureAwait(false); - - grainState.ETag = string.Empty; - } - else - { - var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); - if (isConcurrentState) - { - int.TryParse(grainState.ETag, out var stateEtag); - var newEtag = stateEtag; - newEtag++; - - var appliedInfo = - await _mapper.UpdateIfAsync( - Cql.New( - $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + - $"IF {nameof(CassandraGrainState.ETag)} = ?", - json, - newEtag.ToString(), - id, - grainType, - stateEtag.ToString()) - .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) - .ConfigureAwait(false); - - if (!appliedInfo.Applied) - { - throw new CassandraConcurrencyException(id, stateEtag.ToString(), appliedInfo.Existing.ETag); - } - - grainState.ETag = newEtag.ToString(); - } - else - { - cassandraState.State = json; - await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - } - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while clearing grain state for grain {grainId}.", id); - throw; - } - } - - public void Participate(ISiloLifecycle lifecycle) - => lifecycle.Subscribe(OptionFormattingUtilities.Name(_name), _cassandraStorageOptions.InitStage, Init, Close); - - private string GetKeyString(GrainReference grainReference) => $"{_serviceId}_{grainReference.ToKeyString()}"; - - private async Task<(string, CassandraGrainState)> GetCassandraGrainState( - string grainType, - GrainReference grainReference, - bool isConcurrentState) - { - var id = GetKeyString(grainReference); - try - { - var state = await _mapper.FirstOrDefaultAsync( - Cql.New( - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", - id, - grainType) - .WithOptions( - x => - { - if (isConcurrentState) - { - x.SetSerialConsistencyLevel(SerialConsistencyLevel); - } - else - { - x.SetConsistencyLevel(DefaultConsistencyLevel); - } - })) - .ConfigureAwait(false); - - return (id, state); - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while reading grain state for grain {grainId}.", id); - - throw; - } - } - - private async Task Init(CancellationToken cancellationToken) - { - try - { - _jsonSettings = OrleansJsonSerializer.GetDefaultSerializerSettings(_typeResolver, _grainFactory); - _jsonSettings.TypeNameHandling = _cassandraStorageOptions.JsonSerialization.TypeNameHandling; - _jsonSettings.MetadataPropertyHandling = _cassandraStorageOptions.JsonSerialization.MetadataPropertyHandling; - - if (_cassandraStorageOptions.JsonSerialization.ContractResolver != null) - { - _jsonSettings.ContractResolver = _cassandraStorageOptions.JsonSerialization.ContractResolver; - } - - if (_cassandraStorageOptions.JsonSerialization.UseFullAssemblyNames) - { - _jsonSettings.TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full; - } - - if (_cassandraStorageOptions.JsonSerialization.IndentJson) - { - _jsonSettings.Formatting = Formatting.Indented; - } - - var cassandraOptions = _cassandraStorageOptions; - _cluster = Cluster.Builder() - .AddContactPoints(cassandraOptions.ContactPoints.Split(',')) - .Build(); - - var session = await _cluster.ConnectAsync(); - await Task.Run( - () => - { - var keyspace = cassandraOptions.Keyspace; - session.CreateKeyspaceIfNotExists( - keyspace, - new Dictionary - { - { "class", "SimpleStrategy" }, - { "replication_factor", cassandraOptions.ReplicationFactor.ToString() } - }); - }, - cancellationToken) - .ConfigureAwait(false); - - var mappingConfiguration = new MappingConfiguration().Define(new EntityMappings(cassandraOptions.Keyspace, cassandraOptions.TableName)); - - await Task.Run( - async () => - { - var grainStateTable = new Table(session, mappingConfiguration); - - var systemTableTable = new Table(session, mappingConfiguration); - var result = await systemTableTable.FirstOrDefault( - t => t.KeyspaceName == grainStateTable.KeyspaceName && - t.TableName == grainStateTable.Name) - .ExecuteAsync(); - if (result == null) - { - grainStateTable.Create(); - } - }, - cancellationToken) - .ConfigureAwait(false); - - _mapper = new Mapper(session, mappingConfiguration); - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while initializing grain storage for service {serviceId}.", _serviceId); - throw; - } - } - - private async Task Close(CancellationToken cancellationToken) => await _cluster.ShutdownAsync(); - } - - public static class CassandraGrainStorageFactory - { - public static IGrainStorage Create(IServiceProvider services, string name) - { - var optionsSnapshot = services.GetRequiredService>(); - var typesProvider = services.GetRequiredServiceByName(name); - return ActivatorUtilities.CreateInstance(services, name, optionsSnapshot.Get(name), typesProvider); - } - } +using System; +using System.Collections.Generic; +using System.Diagnostics.Contracts; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using Cassandra; +using Cassandra.Data.Linq; +using Cassandra.Mapping; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +using Orleans.Configuration; +using Orleans.GrainReferences; +using Orleans.Persistence.Cassandra.Concurrency; +using Orleans.Persistence.Cassandra.Models; +using Orleans.Persistence.Cassandra.Options; +using Orleans.Runtime; +using Orleans.Serialization; +using Orleans.Serialization.Buffers; +using Orleans.Storage; + +namespace Orleans.Persistence.Cassandra.Storage +{ + internal sealed class CassandraGrainStorage : IGrainStorage, ILifecycleParticipant + { + private const ConsistencyLevel SerialConsistencyLevel = ConsistencyLevel.Serial; + private const ConsistencyLevel DefaultConsistencyLevel = ConsistencyLevel.Quorum; + + private static readonly CqlQueryOptions SerialConsistencyQueryOptions = + CqlQueryOptions.New().SetSerialConsistencyLevel(SerialConsistencyLevel); + + private static readonly CqlQueryOptions DefaultConsistencyQueryOptions = + CqlQueryOptions.New().SetConsistencyLevel(DefaultConsistencyLevel); + + private readonly string _name; + private readonly string _serviceId; + private readonly CassandraStorageOptions _cassandraStorageOptions; + private readonly ILogger _logger; + //private readonly IGrainFactory _grainFactory; + private readonly IServiceProvider _services; + //private readonly ITypeResolver _typeResolver; + private readonly HashSet _concurrentStateTypes; + + private JsonSerializerSettings _jsonSettings; + private Cluster _cluster; + private Mapper _mapper; + + public CassandraGrainStorage( + string name, + CassandraStorageOptions cassandraStorageOptions, + IOptions clusterOptions, + ILogger logger, + IGrainFactory grainFactory, + //ITypeResolver typeResolver, + IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider, + IServiceProvider services, + ILoggerProvider loggerProvider) + { + _name = name; + _serviceId = clusterOptions.Value.ServiceId; + _cassandraStorageOptions = cassandraStorageOptions; + _logger = logger; + _/*grainFactory*/ = grainFactory; + //_typeResolver = typeResolver; + _services = services; + _concurrentStateTypes = new HashSet(concurrentGrainStateTypesProvider.GetGrainStateTypes()); + + Diagnostics.CassandraPerformanceCountersEnabled = _cassandraStorageOptions.Diagnostics.PerformanceCountersEnabled; + Diagnostics.CassandraStackTraceIncluded = _cassandraStorageOptions.Diagnostics.StackTraceIncluded; + + if (loggerProvider != null) + { + Diagnostics.AddLoggerProvider(loggerProvider); + } + } + + public async Task ReadStateAsync(string grainType, GrainId grainReference, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (_, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); + if (cassandraState != null) + { + grainState.State = JsonConvert.DeserializeObject(cassandraState.State, _jsonSettings); + grainState.ETag = isConcurrentState ? cassandraState.ETag : string.Empty; + } + } + + public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (id, cassandraState) = await GetCassandraGrainState(grainType, grainId, isConcurrentState); + try + { + var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); + if (isConcurrentState) + { + var newEtag = 0; + if (cassandraState == null) + { + cassandraState = new CassandraGrainState + { + Id = id, + GrainType = grainType, + State = json, + ETag = newEtag.ToString() + }; + + await _mapper.InsertIfNotExistsAsync(cassandraState, SerialConsistencyQueryOptions) + .ConfigureAwait(false); + } + else + { + if( !int.TryParse(grainState.ETag, out var stateEtag)) + stateEtag = 0; + newEtag = stateEtag; + newEtag++; + + var appliedInfo = + await _mapper.UpdateIfAsync( + Cql.New( + $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + + $"IF {nameof(CassandraGrainState.ETag)} = ?", + json, + newEtag.ToString(), + id, + grainType, + stateEtag.ToString()) + .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) + .ConfigureAwait(false); + + if (!appliedInfo.Applied) + { + throw new CassandraConcurrencyException(cassandraState.Id, stateEtag.ToString(), appliedInfo.Existing.ETag); + } + } + + grainState.ETag = newEtag.ToString(); + } + else + { + if (cassandraState == null) + { + cassandraState = new CassandraGrainState + { + Id = id, + GrainType = grainType, + State = json, + ETag = string.Empty + }; + + await _mapper.InsertAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + else + { + cassandraState.State = json; + await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + } + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while creating grain state for grain {grainId}.", id); + throw; + } + } + + public async Task ClearStateAsync(string grainType, GrainId grainReference, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); + try + { + if (_cassandraStorageOptions.DeleteStateOnClear) + { + await _mapper.DeleteAsync( + Cql.New( + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", + id, + grainType) + .WithOptions( + x => + { + if (isConcurrentState) + { + x.SetSerialConsistencyLevel(SerialConsistencyLevel); + } + else + { + x.SetConsistencyLevel(DefaultConsistencyLevel); + } + })) + .ConfigureAwait(false); + + grainState.ETag = string.Empty; + } + else + { + var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); + if (isConcurrentState) + { + if(!int.TryParse(grainState.ETag, out var stateEtag)) + stateEtag = 0; + var newEtag = stateEtag; + newEtag++; + + var appliedInfo = + await _mapper.UpdateIfAsync( + Cql.New( + $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + + $"IF {nameof(CassandraGrainState.ETag)} = ?", + json, + newEtag.ToString(), + id, + grainType, + stateEtag.ToString()) + .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) + .ConfigureAwait(false); + + if (!appliedInfo.Applied) + { + throw new CassandraConcurrencyException(id, stateEtag.ToString(), appliedInfo.Existing.ETag); + } + + grainState.ETag = newEtag.ToString(); + } + else + { + cassandraState.State = json; + await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + } + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while clearing grain state for grain {grainId}.", id); + throw; + } + } + + public void Participate(ISiloLifecycle lifecycle) + => lifecycle.Subscribe(OptionFormattingUtilities.Name(_name), _cassandraStorageOptions.InitStage, Init, Close); + + private string GetKeyString(GrainId grainId) => $"{_serviceId}_{grainId}"; + + private async Task<(string, CassandraGrainState)> GetCassandraGrainState( + string grainType, + GrainId grainReference, + bool isConcurrentState) + { + var id = GetKeyString(grainReference); + try + { + var state = await _mapper.FirstOrDefaultAsync( + Cql.New( + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", + id, + grainType) + .WithOptions( + x => + { + if (isConcurrentState) + { + x.SetSerialConsistencyLevel(SerialConsistencyLevel); + } + else + { + x.SetConsistencyLevel(DefaultConsistencyLevel); + } + })) + .ConfigureAwait(false); + + return (id, state); + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while reading grain state for grain {grainId}.", id); + + throw; + } + } + + private async Task Init(CancellationToken cancellationToken) + { + try + { + _jsonSettings = OrleansJsonSerializerSettings.GetDefaultSerializerSettings(_services/*_typeResolver, _grainFactory*/); + _jsonSettings.TypeNameHandling = _cassandraStorageOptions.JsonSerialization.TypeNameHandling; + _jsonSettings.MetadataPropertyHandling = _cassandraStorageOptions.JsonSerialization.MetadataPropertyHandling; + + if (_cassandraStorageOptions.JsonSerialization.ContractResolver != null) + { + _jsonSettings.ContractResolver = _cassandraStorageOptions.JsonSerialization.ContractResolver; + } + + if (_cassandraStorageOptions.JsonSerialization.UseFullAssemblyNames) + { + _jsonSettings.TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full; + } + + if (_cassandraStorageOptions.JsonSerialization.IndentJson) + { + _jsonSettings.Formatting = Formatting.Indented; + } + + /* + var certCollection = new System.Security.Cryptography.X509Certificates.X509Certificate2Collection(); + var amazoncert = new System.Security.Cryptography.X509Certificates.X509Certificate2("./sf-class2-root.crt"); + certCollection.Add(amazoncert); + */ + + var cassandraOptions = _cassandraStorageOptions; + _cluster = Cluster.Builder() + .AddContactPoints(cassandraOptions.ContactPoints.Split(',')) + //.WithPort(9142) + //.WithAuthProvider(new PlainTextAuthProvider("user", "password")) + //.WithSSL(new SSLOptions().SetCertificateCollection(certCollection)) + .Build(); + + var session = await _cluster.ConnectAsync(); + await Task.Run( + () => + { + var keyspace = cassandraOptions.Keyspace; + session.CreateKeyspaceIfNotExists( + keyspace, + new Dictionary + { + { "class", "SimpleStrategy" }, + { "replication_factor", cassandraOptions.ReplicationFactor.ToString() } + }); + }, + cancellationToken) + .ConfigureAwait(false); + + var mappingConfiguration = new MappingConfiguration().Define(new EntityMappings(cassandraOptions.Keyspace, cassandraOptions.TableName)); + + await Task.Run( + async () => + { + var grainStateTable = new Table(session, mappingConfiguration); + + var systemTableTable = new Table(session, mappingConfiguration); + var result = await systemTableTable.FirstOrDefault( + t => t.KeyspaceName == grainStateTable.KeyspaceName && + t.TableName == grainStateTable.Name) + .ExecuteAsync(); + if (result == null) + { + grainStateTable.Create(); + } + }, + cancellationToken) + .ConfigureAwait(false); + + _mapper = new Mapper(session, mappingConfiguration); + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while initializing grain storage for service {serviceId}.", _serviceId); + throw; + } + } + + private async Task Close(CancellationToken _) => await _cluster.ShutdownAsync(); + } + + public static class CassandraGrainStorageFactory + { + public static IGrainStorage Create(IServiceProvider services, string name) + { + var optionsSnapshot = services.GetRequiredService>(); + var typesProvider = services.GetRequiredServiceByName(name); + return ActivatorUtilities.CreateInstance(services, name, optionsSnapshot.Get(name), typesProvider, services); + } + } } \ No newline at end of file diff --git a/src/Orleans.Persistence.Cassandra/StorageExtensions.cs b/src/Orleans.Persistence.Cassandra/StorageExtensions.cs index d8cc013..80b0f2c 100644 --- a/src/Orleans.Persistence.Cassandra/StorageExtensions.cs +++ b/src/Orleans.Persistence.Cassandra/StorageExtensions.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; +using Microsoft.Extensions.Hosting; using Orleans.Configuration; using Orleans.Hosting; @@ -21,16 +22,16 @@ public static class StorageExtensions /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, - Func configurationProvider, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, + Func configurationProvider, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { - return builder.ConfigureServices((context, services) => + return builder.ConfigureServices((services) => { services.AddCassandraGrainStorage( ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, - ob => ob.Bind(configurationProvider(context.Configuration)), + ob => ob.Bind(configurationProvider(/*context.Configuration*/)), concurrentGrainStateTypesProvider); }); } @@ -38,17 +39,17 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, - Func configurationProvider, + Func configurationProvider, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { - return builder.ConfigureServices((context, services) => + return builder.ConfigureServices((services) => { services.AddCassandraGrainStorage( name, - ob => ob.Bind(configurationProvider(context.Configuration)), + ob => ob.Bind(configurationProvider(/*context.Configuration*/)), concurrentGrainStateTypesProvider); }); } @@ -56,8 +57,8 @@ public static ISiloHostBuilder AddCassandraGrainStorage( /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, Action configureOptions, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { @@ -67,8 +68,8 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, Action configureOptions, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) @@ -79,8 +80,8 @@ public static ISiloHostBuilder AddCassandraGrainStorage( /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, Action> configureOptions = null, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { @@ -90,8 +91,8 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, Action> configureOptions = null, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) @@ -154,7 +155,8 @@ public static IServiceCollection AddCassandraGrainStorage( return services .AddSingletonNamedService(name, (sp, n) => concurrentGrainStateTypesProvider ?? new NullConcurrentGrainStateTypesProvider()) .AddSingletonNamedService(name, CassandraGrainStorageFactory.Create) - .AddSingletonNamedService(name, (sp, n) => (ILifecycleParticipant)sp.GetRequiredServiceByName(n)); + .AddSingletonNamedService(name, (sp, n) => (ILifecycleParticipant)sp.GetRequiredServiceByName(n)) + ; } } } \ No newline at end of file