From 8db52208ea72de03ae72fdb435ffcd0abaedee61 Mon Sep 17 00:00:00 2001 From: Jens Geyer Date: Sat, 18 Feb 2023 14:42:49 +0100 Subject: [PATCH] Upgrade to Orleans 7 (draft) --- global.json | 5 - .../Orleans.Persistence.Cassandra.csproj | 10 +- .../Storage/CassandraGrainStorage.cs | 776 +++++++++--------- .../Storage/MyGrainReferenceJsonConverter.cs | 74 ++ .../StorageExtensions.cs | 40 +- 5 files changed, 510 insertions(+), 395 deletions(-) delete mode 100644 global.json create mode 100644 src/Orleans.Persistence.Cassandra/Storage/MyGrainReferenceJsonConverter.cs diff --git a/global.json b/global.json deleted file mode 100644 index ad1cbff..0000000 --- a/global.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "sdk": { - "version": "2.1.302" - } -} \ No newline at end of file diff --git a/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj b/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj index ca1f553..87a6cda 100644 --- a/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj +++ b/src/Orleans.Persistence.Cassandra/Orleans.Persistence.Cassandra.csproj @@ -1,10 +1,12 @@  - netstandard2.0 + net7.0 + warnings - - - + + + + diff --git a/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs b/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs index 968549d..a7c36be 100644 --- a/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs +++ b/src/Orleans.Persistence.Cassandra/Storage/CassandraGrainStorage.cs @@ -1,370 +1,412 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -using Cassandra; -using Cassandra.Data.Linq; -using Cassandra.Mapping; - -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -using Newtonsoft.Json; - -using Orleans.Configuration; -using Orleans.Persistence.Cassandra.Concurrency; -using Orleans.Persistence.Cassandra.Models; -using Orleans.Persistence.Cassandra.Options; -using Orleans.Runtime; -using Orleans.Serialization; -using Orleans.Storage; - -namespace Orleans.Persistence.Cassandra.Storage -{ - internal sealed class CassandraGrainStorage : IGrainStorage, ILifecycleParticipant - { - private const ConsistencyLevel SerialConsistencyLevel = ConsistencyLevel.Serial; +using System; +using System.Collections.Generic; +using System.Diagnostics.Contracts; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using Cassandra; +using Cassandra.Data.Linq; +using Cassandra.Mapping; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +using Orleans.Configuration; +using Orleans.GrainReferences; +using Orleans.Persistence.Cassandra.Concurrency; +using Orleans.Persistence.Cassandra.Models; +using Orleans.Persistence.Cassandra.Options; +using Orleans.Runtime; +using Orleans.Serialization; +using Orleans.Serialization.Buffers; +using Orleans.Storage; + +namespace Orleans.Persistence.Cassandra.Storage +{ + internal sealed class CassandraGrainStorage : IGrainStorage, ILifecycleParticipant + { + private const ConsistencyLevel SerialConsistencyLevel = ConsistencyLevel.Serial; private const ConsistencyLevel DefaultConsistencyLevel = ConsistencyLevel.Quorum; - - private static readonly CqlQueryOptions SerialConsistencyQueryOptions = - CqlQueryOptions.New().SetSerialConsistencyLevel(SerialConsistencyLevel); - - private static readonly CqlQueryOptions DefaultConsistencyQueryOptions = + + private static readonly CqlQueryOptions SerialConsistencyQueryOptions = + CqlQueryOptions.New().SetSerialConsistencyLevel(SerialConsistencyLevel); + + private static readonly CqlQueryOptions DefaultConsistencyQueryOptions = CqlQueryOptions.New().SetConsistencyLevel(DefaultConsistencyLevel); - - private readonly string _name; - private readonly string _serviceId; - private readonly CassandraStorageOptions _cassandraStorageOptions; - private readonly ILogger _logger; - private readonly IGrainFactory _grainFactory; - private readonly ITypeResolver _typeResolver; - private readonly HashSet _concurrentStateTypes; - - private JsonSerializerSettings _jsonSettings; - private Cluster _cluster; - private Mapper _mapper; - - public CassandraGrainStorage( - string name, - CassandraStorageOptions cassandraStorageOptions, - IOptions clusterOptions, - ILogger logger, - IGrainFactory grainFactory, - ITypeResolver typeResolver, - IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider, - ILoggerProvider loggerProvider) - { - _name = name; - _serviceId = clusterOptions.Value.ServiceId; - _cassandraStorageOptions = cassandraStorageOptions; - _logger = logger; - _grainFactory = grainFactory; - _typeResolver = typeResolver; - _concurrentStateTypes = new HashSet(concurrentGrainStateTypesProvider.GetGrainStateTypes()); - - Diagnostics.CassandraPerformanceCountersEnabled = _cassandraStorageOptions.Diagnostics.PerformanceCountersEnabled; - Diagnostics.CassandraStackTraceIncluded = _cassandraStorageOptions.Diagnostics.StackTraceIncluded; - - if (loggerProvider != null) - { - Diagnostics.AddLoggerProvider(loggerProvider); - } - } - - public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (_, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - if (cassandraState != null) - { - grainState.State = JsonConvert.DeserializeObject(cassandraState.State, _jsonSettings); - grainState.ETag = isConcurrentState ? cassandraState.ETag : string.Empty; - } - } - - public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - try - { - var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); - if (isConcurrentState) - { - var newEtag = 0; - if (cassandraState == null) - { - cassandraState = new CassandraGrainState - { - Id = id, - GrainType = grainType, - State = json, - ETag = newEtag.ToString() - }; - - await _mapper.InsertIfNotExistsAsync(cassandraState, SerialConsistencyQueryOptions) - .ConfigureAwait(false); - } - else - { - int.TryParse(grainState.ETag, out var stateEtag); - newEtag = stateEtag; - newEtag++; - - var appliedInfo = - await _mapper.UpdateIfAsync( - Cql.New( - $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + - $"IF {nameof(CassandraGrainState.ETag)} = ?", - json, - newEtag.ToString(), - id, - grainType, - stateEtag.ToString()) - .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) - .ConfigureAwait(false); - - if (!appliedInfo.Applied) - { - throw new CassandraConcurrencyException(cassandraState.Id, stateEtag.ToString(), appliedInfo.Existing.ETag); - } - } - - grainState.ETag = newEtag.ToString(); - } - else - { - if (cassandraState == null) - { - cassandraState = new CassandraGrainState - { - Id = id, - GrainType = grainType, - State = json, - ETag = string.Empty - }; - - await _mapper.InsertAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - else - { - cassandraState.State = json; - await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - } - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while creating grain state for grain {grainId}.", id); - - throw; - } - } - - public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState) - { - var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); - var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); - try - { - if (_cassandraStorageOptions.DeleteStateOnClear) - { - await _mapper.DeleteAsync( - Cql.New( - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", - id, - grainType) - .WithOptions( - x => - { - if (isConcurrentState) - { - x.SetSerialConsistencyLevel(SerialConsistencyLevel); - } - else - { - x.SetConsistencyLevel(DefaultConsistencyLevel); - } - })) - .ConfigureAwait(false); - - grainState.ETag = string.Empty; - } - else - { - var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); - if (isConcurrentState) - { - int.TryParse(grainState.ETag, out var stateEtag); - var newEtag = stateEtag; - newEtag++; - - var appliedInfo = - await _mapper.UpdateIfAsync( - Cql.New( - $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + - $"IF {nameof(CassandraGrainState.ETag)} = ?", - json, - newEtag.ToString(), - id, - grainType, - stateEtag.ToString()) - .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) - .ConfigureAwait(false); - - if (!appliedInfo.Applied) - { - throw new CassandraConcurrencyException(id, stateEtag.ToString(), appliedInfo.Existing.ETag); - } - - grainState.ETag = newEtag.ToString(); - } - else - { - cassandraState.State = json; - await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) - .ConfigureAwait(false); - } - } - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while clearing grain state for grain {grainId}.", id); - throw; - } - } - - public void Participate(ISiloLifecycle lifecycle) - => lifecycle.Subscribe(OptionFormattingUtilities.Name(_name), _cassandraStorageOptions.InitStage, Init, Close); - - private string GetKeyString(GrainReference grainReference) => $"{_serviceId}_{grainReference.ToKeyString()}"; - - private async Task<(string, CassandraGrainState)> GetCassandraGrainState( - string grainType, - GrainReference grainReference, - bool isConcurrentState) - { - var id = GetKeyString(grainReference); - try - { - var state = await _mapper.FirstOrDefaultAsync( - Cql.New( - $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", - id, - grainType) - .WithOptions( - x => - { - if (isConcurrentState) - { - x.SetSerialConsistencyLevel(SerialConsistencyLevel); - } - else - { - x.SetConsistencyLevel(DefaultConsistencyLevel); - } - })) - .ConfigureAwait(false); - - return (id, state); - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while reading grain state for grain {grainId}.", id); - - throw; - } - } - - private async Task Init(CancellationToken cancellationToken) - { - try - { - _jsonSettings = OrleansJsonSerializer.GetDefaultSerializerSettings(_typeResolver, _grainFactory); - _jsonSettings.TypeNameHandling = _cassandraStorageOptions.JsonSerialization.TypeNameHandling; - _jsonSettings.MetadataPropertyHandling = _cassandraStorageOptions.JsonSerialization.MetadataPropertyHandling; - - if (_cassandraStorageOptions.JsonSerialization.ContractResolver != null) - { - _jsonSettings.ContractResolver = _cassandraStorageOptions.JsonSerialization.ContractResolver; - } - - if (_cassandraStorageOptions.JsonSerialization.UseFullAssemblyNames) - { - _jsonSettings.TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full; - } - - if (_cassandraStorageOptions.JsonSerialization.IndentJson) - { - _jsonSettings.Formatting = Formatting.Indented; - } - - var cassandraOptions = _cassandraStorageOptions; - _cluster = Cluster.Builder() - .AddContactPoints(cassandraOptions.ContactPoints.Split(',')) - .Build(); - - var session = await _cluster.ConnectAsync(); - await Task.Run( - () => - { - var keyspace = cassandraOptions.Keyspace; - session.CreateKeyspaceIfNotExists( - keyspace, - new Dictionary - { - { "class", "SimpleStrategy" }, - { "replication_factor", cassandraOptions.ReplicationFactor.ToString() } - }); - }, - cancellationToken) - .ConfigureAwait(false); - - var mappingConfiguration = new MappingConfiguration().Define(new EntityMappings(cassandraOptions.Keyspace, cassandraOptions.TableName)); - - await Task.Run( - async () => - { - var grainStateTable = new Table(session, mappingConfiguration); - - var systemTableTable = new Table(session, mappingConfiguration); - var result = await systemTableTable.FirstOrDefault( - t => t.KeyspaceName == grainStateTable.KeyspaceName && - t.TableName == grainStateTable.Name) - .ExecuteAsync(); - if (result == null) - { - grainStateTable.Create(); - } - }, - cancellationToken) - .ConfigureAwait(false); - - _mapper = new Mapper(session, mappingConfiguration); - } - catch (DriverException) - { - _logger.LogWarning("Cassandra driver error occured while initializing grain storage for service {serviceId}.", _serviceId); - throw; - } - } - - private async Task Close(CancellationToken cancellationToken) => await _cluster.ShutdownAsync(); - } - - public static class CassandraGrainStorageFactory - { - public static IGrainStorage Create(IServiceProvider services, string name) - { - var optionsSnapshot = services.GetRequiredService>(); - var typesProvider = services.GetRequiredServiceByName(name); - return ActivatorUtilities.CreateInstance(services, name, optionsSnapshot.Get(name), typesProvider); - } - } + + private readonly string _name; + private readonly string _serviceId; + private readonly CassandraStorageOptions _cassandraStorageOptions; + private readonly ILogger _logger; + //private readonly IGrainFactory _grainFactory; + private readonly IServiceProvider _services; + //private readonly ITypeResolver _typeResolver; + private readonly HashSet _concurrentStateTypes; + + private JsonSerializerSettings _jsonSettings; + private Cluster _cluster; + private Mapper _mapper; + + public CassandraGrainStorage( + string name, + CassandraStorageOptions cassandraStorageOptions, + IOptions clusterOptions, + ILogger logger, + IGrainFactory grainFactory, + //ITypeResolver typeResolver, + IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider, + IServiceProvider services, + ILoggerProvider loggerProvider) + { + _name = name; + _serviceId = clusterOptions.Value.ServiceId; + _cassandraStorageOptions = cassandraStorageOptions; + _logger = logger; + _/*grainFactory*/ = grainFactory; + //_typeResolver = typeResolver; + _services = services; + _concurrentStateTypes = new HashSet(concurrentGrainStateTypesProvider.GetGrainStateTypes()); + + Diagnostics.CassandraPerformanceCountersEnabled = _cassandraStorageOptions.Diagnostics.PerformanceCountersEnabled; + Diagnostics.CassandraStackTraceIncluded = _cassandraStorageOptions.Diagnostics.StackTraceIncluded; + + if (loggerProvider != null) + { + Diagnostics.AddLoggerProvider(loggerProvider); + } + } + + public async Task ReadStateAsync(string grainType, GrainId grainReference, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (_, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); + if (cassandraState != null) + { + grainState.State = JsonConvert.DeserializeObject(cassandraState.State, _jsonSettings); + grainState.ETag = isConcurrentState ? cassandraState.ETag : string.Empty; + } + } + + public async Task WriteStateAsync(string grainType, GrainId grainId, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (id, cassandraState) = await GetCassandraGrainState(grainType, grainId, isConcurrentState); + try + { + var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); + if (isConcurrentState) + { + var newEtag = 0; + if (cassandraState == null) + { + cassandraState = new CassandraGrainState + { + Id = id, + GrainType = grainType, + State = json, + ETag = newEtag.ToString() + }; + + await _mapper.InsertIfNotExistsAsync(cassandraState, SerialConsistencyQueryOptions) + .ConfigureAwait(false); + } + else + { + if( !int.TryParse(grainState.ETag, out var stateEtag)) + stateEtag = 0; + newEtag = stateEtag; + newEtag++; + + var appliedInfo = + await _mapper.UpdateIfAsync( + Cql.New( + $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + + $"IF {nameof(CassandraGrainState.ETag)} = ?", + json, + newEtag.ToString(), + id, + grainType, + stateEtag.ToString()) + .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) + .ConfigureAwait(false); + + if (!appliedInfo.Applied) + { + throw new CassandraConcurrencyException(cassandraState.Id, stateEtag.ToString(), appliedInfo.Existing.ETag); + } + } + + grainState.ETag = newEtag.ToString(); + } + else + { + if (cassandraState == null) + { + cassandraState = new CassandraGrainState + { + Id = id, + GrainType = grainType, + State = json, + ETag = string.Empty + }; + + await _mapper.InsertAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + else + { + cassandraState.State = json; + await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + } + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while creating grain state for grain {grainId}.", id); + throw; + } + } + + public async Task ClearStateAsync(string grainType, GrainId grainReference, IGrainState grainState) + { + var isConcurrentState = _concurrentStateTypes.Contains(grainState.State.GetType()); + var (id, cassandraState) = await GetCassandraGrainState(grainType, grainReference, isConcurrentState); + try + { + if (_cassandraStorageOptions.DeleteStateOnClear) + { + await _mapper.DeleteAsync( + Cql.New( + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", + id, + grainType) + .WithOptions( + x => + { + if (isConcurrentState) + { + x.SetSerialConsistencyLevel(SerialConsistencyLevel); + } + else + { + x.SetConsistencyLevel(DefaultConsistencyLevel); + } + })) + .ConfigureAwait(false); + + grainState.ETag = string.Empty; + } + else + { + var json = JsonConvert.SerializeObject(grainState.State, _jsonSettings); + if (isConcurrentState) + { + if(!int.TryParse(grainState.ETag, out var stateEtag)) + stateEtag = 0; + var newEtag = stateEtag; + newEtag++; + + var appliedInfo = + await _mapper.UpdateIfAsync( + Cql.New( + $"SET {nameof(CassandraGrainState.State)} = ?, {nameof(CassandraGrainState.ETag)} = ? " + + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ? " + + $"IF {nameof(CassandraGrainState.ETag)} = ?", + json, + newEtag.ToString(), + id, + grainType, + stateEtag.ToString()) + .WithOptions(x => x.SetSerialConsistencyLevel(SerialConsistencyLevel))) + .ConfigureAwait(false); + + if (!appliedInfo.Applied) + { + throw new CassandraConcurrencyException(id, stateEtag.ToString(), appliedInfo.Existing.ETag); + } + + grainState.ETag = newEtag.ToString(); + } + else + { + cassandraState.State = json; + await _mapper.UpdateAsync(cassandraState, DefaultConsistencyQueryOptions) + .ConfigureAwait(false); + } + } + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while clearing grain state for grain {grainId}.", id); + throw; + } + } + + public void Participate(ISiloLifecycle lifecycle) + => lifecycle.Subscribe(OptionFormattingUtilities.Name(_name), _cassandraStorageOptions.InitStage, Init, Close); + + private string GetKeyString(GrainId grainId) => $"{_serviceId}_{grainId}"; + + private async Task<(string, CassandraGrainState)> GetCassandraGrainState( + string grainType, + GrainId grainReference, + bool isConcurrentState) + { + var id = GetKeyString(grainReference); + try + { + var state = await _mapper.FirstOrDefaultAsync( + Cql.New( + $"WHERE {nameof(CassandraGrainState.Id)} = ? AND {nameof(CassandraGrainState.GrainType)} = ?", + id, + grainType) + .WithOptions( + x => + { + if (isConcurrentState) + { + x.SetSerialConsistencyLevel(SerialConsistencyLevel); + } + else + { + x.SetConsistencyLevel(DefaultConsistencyLevel); + } + })) + .ConfigureAwait(false); + + return (id, state); + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while reading grain state for grain {grainId}.", id); + + throw; + } + } + + private async Task Init(CancellationToken cancellationToken) + { + try + { + _jsonSettings = OrleansJsonSerializerSettings.GetDefaultSerializerSettings(_services/*_typeResolver, _grainFactory*/); + _jsonSettings.TypeNameHandling = _cassandraStorageOptions.JsonSerialization.TypeNameHandling; + _jsonSettings.MetadataPropertyHandling = _cassandraStorageOptions.JsonSerialization.MetadataPropertyHandling; + + // clear and fill again to work around "Unable to cast object ... to type 'Orleans.Runtime.GrainReference'." + // bei Orleans.Serialization.GrainReferenceJsonConverter.WriteJson(JsonWriter writer, Object value, JsonSerializer serializer) + // bei Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeConvertable(JsonWriter writer, JsonConverter converter, Object value, JsonContract contract, JsonContainerContract collectionContract, JsonProperty containerProperty) + // bei Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty) + // bei Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty) + // bei Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty) + // bei Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.Serialize(JsonWriter jsonWriter, Object value, Type objectType) + // bei Newtonsoft.Json.JsonSerializer.SerializeInternal(JsonWriter jsonWriter, Object value, Type objectType) + // bei Newtonsoft.Json.JsonSerializer.Serialize(JsonWriter jsonWriter, Object value, Type objectType) + // bei Newtonsoft.Json.JsonConvert.SerializeObjectInternal(Object value, Type type, JsonSerializer jsonSerializer) + // bei Newtonsoft.Json.JsonConvert.SerializeObject(Object value, Type type, JsonSerializerSettings settings) + // bei Newtonsoft.Json.JsonConvert.SerializeObject(Object value, JsonSerializerSettings settings) + // bei Orleans.Persistence.Cassandra.Storage.CassandraGrainStorage.< WriteStateAsync > d__15`1.MoveNext() in .\lib\3rdparty\Orleans.Persistence.Cassandra\src\Orleans.Persistence.Cassandra\Storage\CassandraGrainStorage.cs: Zeile99 + _jsonSettings.Converters.Clear(); + _jsonSettings.Converters.Add(new IPAddressConverter()); + _jsonSettings.Converters.Add(new IPEndPointConverter()); + _jsonSettings.Converters.Add(new GrainIdConverter()); + _jsonSettings.Converters.Add(new Serialization.ActivationIdConverter()); + _jsonSettings.Converters.Add(new SiloAddressJsonConverter()); + _jsonSettings.Converters.Add(new MembershipVersionJsonConverter()); + _jsonSettings.Converters.Add(new UniqueKeyConverter()); + //_jsonSettings.Converters.Add(new GrainReferenceJsonConverter(services.GetRequiredService())); + _jsonSettings.Converters.Add(new MyGrainReferenceJsonConverter(_services.GetRequiredService())); + + + if (_cassandraStorageOptions.JsonSerialization.ContractResolver != null) + { + _jsonSettings.ContractResolver = _cassandraStorageOptions.JsonSerialization.ContractResolver; + } + + if (_cassandraStorageOptions.JsonSerialization.UseFullAssemblyNames) + { + _jsonSettings.TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full; + } + + if (_cassandraStorageOptions.JsonSerialization.IndentJson) + { + _jsonSettings.Formatting = Formatting.Indented; + } + + /* + var certCollection = new System.Security.Cryptography.X509Certificates.X509Certificate2Collection(); + var amazoncert = new System.Security.Cryptography.X509Certificates.X509Certificate2("./sf-class2-root.crt"); + certCollection.Add(amazoncert); + */ + + var cassandraOptions = _cassandraStorageOptions; + _cluster = Cluster.Builder() + .AddContactPoints(cassandraOptions.ContactPoints.Split(',')) + //.WithPort(9142) + //.WithAuthProvider(new PlainTextAuthProvider("user", "password")) + //.WithSSL(new SSLOptions().SetCertificateCollection(certCollection)) + .Build(); + + var session = await _cluster.ConnectAsync(); + await Task.Run( + () => + { + var keyspace = cassandraOptions.Keyspace; + session.CreateKeyspaceIfNotExists( + keyspace, + new Dictionary + { + { "class", "SimpleStrategy" }, + { "replication_factor", cassandraOptions.ReplicationFactor.ToString() } + }); + }, + cancellationToken) + .ConfigureAwait(false); + + var mappingConfiguration = new MappingConfiguration().Define(new EntityMappings(cassandraOptions.Keyspace, cassandraOptions.TableName)); + + await Task.Run( + async () => + { + var grainStateTable = new Table(session, mappingConfiguration); + + var systemTableTable = new Table(session, mappingConfiguration); + var result = await systemTableTable.FirstOrDefault( + t => t.KeyspaceName == grainStateTable.KeyspaceName && + t.TableName == grainStateTable.Name) + .ExecuteAsync(); + if (result == null) + { + grainStateTable.Create(); + } + }, + cancellationToken) + .ConfigureAwait(false); + + _mapper = new Mapper(session, mappingConfiguration); + } + catch (DriverException) + { + _logger.LogWarning("Cassandra driver error occured while initializing grain storage for service {serviceId}.", _serviceId); + throw; + } + } + + private async Task Close(CancellationToken _) => await _cluster.ShutdownAsync(); + } + + public static class CassandraGrainStorageFactory + { + public static IGrainStorage Create(IServiceProvider services, string name) + { + var optionsSnapshot = services.GetRequiredService>(); + var typesProvider = services.GetRequiredServiceByName(name); + return ActivatorUtilities.CreateInstance(services, name, optionsSnapshot.Get(name), typesProvider, services); + } + } } \ No newline at end of file diff --git a/src/Orleans.Persistence.Cassandra/Storage/MyGrainReferenceJsonConverter.cs b/src/Orleans.Persistence.Cassandra/Storage/MyGrainReferenceJsonConverter.cs new file mode 100644 index 0000000..9257122 --- /dev/null +++ b/src/Orleans.Persistence.Cassandra/Storage/MyGrainReferenceJsonConverter.cs @@ -0,0 +1,74 @@ +using System; +using System.Net; +using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Orleans.Runtime; +using Orleans.GrainReferences; +using Orleans.Serialization.TypeSystem; +using Microsoft.Extensions.Options; +using System.Globalization; +using System.Diagnostics; + +namespace Orleans.Serialization +{ + + /// + /// implementation for . + /// + /// + public class MyGrainReferenceJsonConverter : JsonConverter + { + private static readonly Type AddressableType = typeof(IAddressable); + private readonly GrainReferenceActivator referenceActivator; + + /// + /// Initializes a new instance of the class. + /// + /// The grain reference activator. + public MyGrainReferenceJsonConverter(GrainReferenceActivator referenceActivator) + { + this.referenceActivator = referenceActivator; + } + + /// + public override bool CanConvert(Type objectType) + { + return AddressableType.IsAssignableFrom(objectType); + } + + /// + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + var val = (GrainReference)value; + Debug.Assert(val != null); + + writer.WriteStartObject(); + writer.WritePropertyName("Id"); + writer.WriteStartObject(); + writer.WritePropertyName("Type"); + writer.WriteValue(val.GrainId.Type.ToString()); + writer.WritePropertyName("Key"); + writer.WriteValue(val.GrainId.Key.ToString()); + writer.WriteEndObject(); + writer.WritePropertyName("Interface"); + writer.WriteValue(val.InterfaceType.ToString()); + writer.WriteEndObject(); + } + + /// + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + JObject jo = JObject.Load(reader); + var id = jo["Id"] ?? throw new Exception("'Id' property missing"); + + var type = id["Type"]?.ToObject() ?? throw new Exception("'Type' property missing"); + var key = id["Key"]?.ToObject() ?? throw new Exception("'Key' property missing"); + var intf = jo["Interface"]?.ToString() ?? throw new Exception("'Interface' property missing"); + + GrainId grainId = GrainId.Create(type, key); + var iface = GrainInterfaceType.Create(intf); + return this.referenceActivator.CreateReference(grainId, iface); + } + } +} diff --git a/src/Orleans.Persistence.Cassandra/StorageExtensions.cs b/src/Orleans.Persistence.Cassandra/StorageExtensions.cs index d8cc013..80b0f2c 100644 --- a/src/Orleans.Persistence.Cassandra/StorageExtensions.cs +++ b/src/Orleans.Persistence.Cassandra/StorageExtensions.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; +using Microsoft.Extensions.Hosting; using Orleans.Configuration; using Orleans.Hosting; @@ -21,16 +22,16 @@ public static class StorageExtensions /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, - Func configurationProvider, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, + Func configurationProvider, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { - return builder.ConfigureServices((context, services) => + return builder.ConfigureServices((services) => { services.AddCassandraGrainStorage( ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, - ob => ob.Bind(configurationProvider(context.Configuration)), + ob => ob.Bind(configurationProvider(/*context.Configuration*/)), concurrentGrainStateTypesProvider); }); } @@ -38,17 +39,17 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, - Func configurationProvider, + Func configurationProvider, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { - return builder.ConfigureServices((context, services) => + return builder.ConfigureServices((services) => { services.AddCassandraGrainStorage( name, - ob => ob.Bind(configurationProvider(context.Configuration)), + ob => ob.Bind(configurationProvider(/*context.Configuration*/)), concurrentGrainStateTypesProvider); }); } @@ -56,8 +57,8 @@ public static ISiloHostBuilder AddCassandraGrainStorage( /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, Action configureOptions, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { @@ -67,8 +68,8 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, Action configureOptions, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) @@ -79,8 +80,8 @@ public static ISiloHostBuilder AddCassandraGrainStorage( /// /// Configure silo to use Cassandra storage as the default grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorageAsDefault( + this ISiloBuilder builder, Action> configureOptions = null, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) { @@ -90,8 +91,8 @@ public static ISiloHostBuilder AddCassandraGrainStorageAsDefault( /// /// Configure silo to use Cassandra storage for grain storage. /// - public static ISiloHostBuilder AddCassandraGrainStorage( - this ISiloHostBuilder builder, + public static ISiloBuilder AddCassandraGrainStorage( + this ISiloBuilder builder, string name, Action> configureOptions = null, IConcurrentGrainStateTypesProvider concurrentGrainStateTypesProvider = null) @@ -154,7 +155,8 @@ public static IServiceCollection AddCassandraGrainStorage( return services .AddSingletonNamedService(name, (sp, n) => concurrentGrainStateTypesProvider ?? new NullConcurrentGrainStateTypesProvider()) .AddSingletonNamedService(name, CassandraGrainStorageFactory.Create) - .AddSingletonNamedService(name, (sp, n) => (ILifecycleParticipant)sp.GetRequiredServiceByName(n)); + .AddSingletonNamedService(name, (sp, n) => (ILifecycleParticipant)sp.GetRequiredServiceByName(n)) + ; } } } \ No newline at end of file