diff --git a/src/Raven.Server/Documents/DocumentDatabase.cs b/src/Raven.Server/Documents/DocumentDatabase.cs index f28362114584..ffb6bd75a373 100644 --- a/src/Raven.Server/Documents/DocumentDatabase.cs +++ b/src/Raven.Server/Documents/DocumentDatabase.cs @@ -1908,6 +1908,7 @@ internal IDisposable CallDuringDocumentDatabaseInternalDispose(Action action) internal Action Subscription_ActionToCallDuringWaitForChangedDocuments; internal Action Subscription_ActionToCallAfterRegisterSubscriptionConnection; + internal Action> ConcurrentSubscription_ActionToCallDuringWaitForSubscribe; internal IDisposable CallDuringWaitForChangedDocuments(Action action) { @@ -1921,6 +1922,12 @@ internal IDisposable CallAfterRegisterSubscriptionConnection(Action action return new DisposableAction(() => Subscription_ActionToCallAfterRegisterSubscriptionConnection = null); } + internal IDisposable CallDuringWaitForSubscribe(Action> action) + { + ConcurrentSubscription_ActionToCallDuringWaitForSubscribe = action; + + return new DisposableAction(() => ConcurrentSubscription_ActionToCallDuringWaitForSubscribe = null); + } internal ManualResetEvent DatabaseRecordLoadHold; internal ManualResetEvent HealthCheckHold; diff --git a/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs b/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs index 74f64271ad96..e298bcaf20c2 100644 --- a/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs +++ b/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; @@ -23,6 +24,7 @@ using Sparrow.Server.Utils; using Sparrow.Threading; using Voron; +using static Raven.Server.Documents.Subscriptions.SubscriptionStorage; namespace Raven.Server.Documents.Subscriptions { @@ -32,7 +34,7 @@ public class SubscriptionConnectionsState : IDisposable private readonly SubscriptionStorage _subscriptionStorage; private readonly AsyncManualResetEvent _waitForMoreDocuments; private DocumentsStorage _documentsStorage; - private SubscriptionState _subscriptionState; + private SubscriptionStateWithRaftIndex _subscriptionState; private ConcurrentSet _connections; private string _subscriptionName; private int _maxConcurrentConnections; @@ -41,7 +43,6 @@ public class SubscriptionConnectionsState : IDisposable public Task GetSubscriptionInUseAwaiter => Task.WhenAll(_connections.Select(c => c.SubscriptionConnectionTask)); public string SubscriptionName => _subscriptionName; public long SubscriptionId => _subscriptionId; - public SubscriptionState SubscriptionState => _subscriptionState; public bool IsConcurrent => _connections.FirstOrDefault()?.Strategy == SubscriptionOpeningStrategy.Concurrent; @@ -57,6 +58,7 @@ public class SubscriptionConnectionsState : IDisposable public DocumentDatabase DocumentDatabase => _documentsStorage.DocumentDatabase; private readonly SemaphoreSlim _subscriptionActivelyWorkingLock; + private readonly SemaphoreSlim _subscriptionConnectingLock = new SemaphoreSlim(1); public string LastChangeVectorSent; @@ -74,25 +76,87 @@ public SubscriptionConnectionsState(long subscriptionId, SubscriptionStorage sto _waitForMoreDocuments = new AsyncManualResetEvent(CancellationTokenSource.Token); } - public void Initialize(SubscriptionConnection connection, bool afterSubscribe = false) + public async Task InitializeAsync(SubscriptionConnection connection, bool afterSubscribe = false) { _subscriptionName = connection.Options.SubscriptionName ?? _subscriptionId.ToString(); - _subscriptionState = connection.SubscriptionState; Query = connection.SubscriptionState.Query; + if (afterSubscribe == false) + return; + // update the subscription data only on new concurrent connection or regular connection - if (afterSubscribe && _connections.Count == 1) + if (IsConcurrent == false) + { + RefreshFeatures(connection); + return; + } + + DocumentDatabase.ForTestingPurposes?.ConcurrentSubscription_ActionToCallDuringWaitForSubscribe?.Invoke(_connections); + + connection.AddToStatusDescription("Starting to subscribe."); + var sp = Stopwatch.StartNew(); + while (await _subscriptionConnectingLock.WaitAsync(SubscriptionConnection.WaitForChangedDocumentsTimeoutInMs) == false) + { + connection.CancellationTokenSource.Token.ThrowIfCancellationRequested(); + await connection.SendHeartBeatAsync($"A connection from IP '{connection.ClientUri}' is waiting for other concurrent connections to subscribe."); + sp.Restart(); + } + + try { - using (var old = _disposableNotificationsRegistration) + if (_subscriptionState == null) { - _disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection); + // this connection is the first one, we initialize everything + RefreshFeatures(connection); + return; } - InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint); - PreviouslyRecordedChangeVector = LastChangeVectorSent; + if (connection.SubscriptionState.LastModifiedIndex < _subscriptionState.LastModifiedIndex) + { + // this connection was modified while waiting to subscribe, lets try to drop it + DropSingleConnection(connection, new SubscriptionClosedException($"The subscription '{_subscriptionName}' was modified, connection have to be restarted.", canReconnect: true)); + return; + } + + if (connection.SubscriptionState.LastModifiedIndex == _subscriptionState.LastModifiedIndex) + { + // no changes in the subscription + return; + } + + if (connection.SubscriptionState.LastModifiedIndex > _subscriptionState.LastModifiedIndex) + { + // we have new connection after subscription have changed + // we have to wait until old connections (with smaller raft index) will get disconnected + // then we continue and will re-initialize + while (_connections.Any(c => c.SubscriptionState.LastModifiedIndex == _subscriptionState.LastModifiedIndex)) + { + connection.CancellationTokenSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(300); + await connection.SendHeartBeatIfNeededAsync(sp, $"A connection from IP '{connection.ClientUri}' is waiting for old subscription connections to disconnect."); + } + + RefreshFeatures(connection); + } + } + finally + { + _subscriptionConnectingLock.Release(); } } + private void RefreshFeatures(SubscriptionConnection connection) + { + using (var old = _disposableNotificationsRegistration) + { + _disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection); + } + + InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint); + PreviouslyRecordedChangeVector = LastChangeVectorSent; + _subscriptionState = connection.SubscriptionState; + } + internal void InitializeLastChangeVectorSent(string changeVectorForNextBatchStartingPoint) { LastChangeVectorSent = changeVectorForNextBatchStartingPoint; diff --git a/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs b/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs index 49e1bb476dc9..2cdc1da40f13 100644 --- a/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs +++ b/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs @@ -10,7 +10,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Raven.Client; using Raven.Client.Documents.Subscriptions; using Raven.Client.Exceptions.Database; using Raven.Client.Exceptions.Documents.Subscriptions; @@ -19,6 +18,7 @@ using Raven.Client.Util; using Raven.Server.Documents.Subscriptions.Stats; using Raven.Server.Documents.TcpHandlers; +using Raven.Server.Json; using Raven.Server.Rachis; using Raven.Server.ServerWide; using Raven.Server.ServerWide.Commands.Subscriptions; @@ -103,10 +103,10 @@ public async Task PutSubscription(SubscriptionCreationOptions options, str return etag; } - public SubscriptionConnectionsState OpenSubscription(SubscriptionConnection connection) + public async Task OpenSubscriptionAsync(SubscriptionConnection connection) { var subscriptionState = _subscriptions.GetOrAdd(connection.SubscriptionId, subId => new SubscriptionConnectionsState(subId, this)); - subscriptionState.Initialize(connection); + await subscriptionState.InitializeAsync(connection); return subscriptionState; } @@ -208,7 +208,7 @@ public string GetResponsibleNode(TransactionOperationContext serverContext, stri return BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter); } - public async Task AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token) + public async Task AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token) { await _serverStore.WaitForCommitIndexChange(RachisConsensus.CommitIndexModification.GreaterOrEqual, id, token); @@ -216,7 +216,7 @@ public async Task AssertSubscriptionConnectionDetails(long id using (serverStoreContext.OpenReadTransaction()) using (var record = _serverStore.Cluster.ReadRawDatabaseRecord(serverStoreContext, _db.Name)) { - var subscription = GetSubscriptionFromServerStore(serverStoreContext, name); + var subscription = GetSubscriptionWithRaftIndexFromServerStore(serverStoreContext, name); var topology = record.Topology; var whoseTaskIsIt = BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter); @@ -279,7 +279,7 @@ public async Task AssertSubscriptionConnectionDetails(long id return subscription; } - static void FillNodesAvailabilityReportForState(SubscriptionGeneralDataAndStats subscription, DatabaseTopology topology, Dictionary databaseTopologyAvailabilityExplenation, List stateGroup, string stateName) + static void FillNodesAvailabilityReportForState(SubscriptionState subscription, DatabaseTopology topology, Dictionary databaseTopologyAvailabilityExplenation, List stateGroup, string stateName) { foreach (var nodeInGroup in stateGroup) { @@ -486,8 +486,8 @@ public IEnumerable GetAllRunningSubscriptions(T if (take-- <= 0) yield break; - var subscriptionData = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName); - GetRunningSubscriptionInternal(history, subscriptionData, subscriptionConnectionsState); + var state = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName); + var subscriptionData = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState); yield return subscriptionData; } } @@ -508,27 +508,27 @@ public int GetNumberOfRunningSubscriptions() public SubscriptionGeneralDataAndStats GetSubscription(TransactionOperationContext context, long? id, string name, bool history) { - SubscriptionGeneralDataAndStats subscription; + SubscriptionState state; if (string.IsNullOrEmpty(name) == false) { - subscription = GetSubscriptionFromServerStore(context, name); + state = GetSubscriptionFromServerStore(context, name); } else if (id.HasValue) { - subscription = GetSubscriptionFromServerStore(context, id.ToString()); + state = GetSubscriptionFromServerStore(context, id.ToString()); } else { throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data"); } - GetSubscriptionInternal(subscription, history); + var subscription = GetSubscriptionInternal(state, history); return subscription; } - public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(TransactionOperationContext context, string name) + public SubscriptionState GetSubscriptionFromServerStore(TransactionOperationContext context, string name) { var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name)); @@ -537,35 +537,45 @@ public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(Transactio var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable); - var subscriptionJsonValue = new SubscriptionGeneralDataAndStats(subscriptionState); + return subscriptionState; + } + + public SubscriptionStateWithRaftIndex GetSubscriptionWithRaftIndexFromServerStore(TransactionOperationContext context, string name) + { + var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name)); + + if (subscriptionBlittable == null) + throw new SubscriptionDoesNotExistException($"Subscription with name '{name}' was not found in server store"); + + var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(subscriptionBlittable); - return subscriptionJsonValue; + return subscriptionState; } public SubscriptionGeneralDataAndStats GetRunningSubscription(TransactionOperationContext context, long? id, string name, bool history) { - SubscriptionGeneralDataAndStats subscription; + SubscriptionState state; if (string.IsNullOrEmpty(name) == false) { - subscription = GetSubscriptionFromServerStore(context, name); + state = GetSubscriptionFromServerStore(context, name); } else if (id.HasValue) { name = GetSubscriptionNameById(context, id.Value); - subscription = GetSubscriptionFromServerStore(context, name); + state = GetSubscriptionFromServerStore(context, name); } else { throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data"); } - if (_subscriptions.TryGetValue(subscription.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false) + if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false) return null; if (subscriptionConnectionsState.IsSubscriptionActive() == false) return null; - GetRunningSubscriptionInternal(history, subscription, subscriptionConnectionsState); + var subscription = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState); return subscription; } @@ -590,14 +600,35 @@ public SubscriptionConnectionsState GetSubscriptionConnectionsState(Transacti if (subscriptionBlittable == null) return null; - var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable); + if (subscriptionBlittable.TryGet(nameof(SubscriptionState.SubscriptionId), out long id) == false) + { + if (_logger.IsOperationsEnabled) + _logger.Info($"Could not figure out the Subscription Task ID for subscription named: '{subscriptionName}'."); - if (_subscriptions.TryGetValue(subscriptionState.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription) == false) + return null; + } + + if (_subscriptions.TryGetValue(id, out SubscriptionConnectionsState concurrentSubscription) == false) return null; return concurrentSubscription; } + public class SubscriptionStateWithRaftIndex : SubscriptionState + { + // raft index used to create or update subscription task + public long LastModifiedIndex { get; set; } + + public SubscriptionStateWithRaftIndex() { } + + public override DynamicJsonValue ToJson() + { + var djv = base.ToJson(); + djv[nameof(LastModifiedIndex)] = LastModifiedIndex; + return djv; + } + } + public class SubscriptionGeneralDataAndStats : SubscriptionState { public List Connections; @@ -644,22 +675,31 @@ private static void SetSubscriptionHistory(SubscriptionConnectionsState subscrip subscriptionData.CurrentPendingConnections = subscriptionConnectionsState.PendingConnections; } - private static void GetRunningSubscriptionInternal(bool history, SubscriptionGeneralDataAndStats subscriptionData, SubscriptionConnectionsState subscriptionConnectionsState) + private static SubscriptionGeneralDataAndStats GetRunningSubscriptionInternal(bool history, SubscriptionState state, SubscriptionConnectionsState subscriptionConnectionsState) { - subscriptionData.Connections = subscriptionConnectionsState.GetConnections(); + var subscriptionData = new SubscriptionGeneralDataAndStats(state) + { + Connections = subscriptionConnectionsState.GetConnections() + }; + if (history) // Only valid for this node SetSubscriptionHistory(subscriptionConnectionsState, subscriptionData); + + return subscriptionData; } - private void GetSubscriptionInternal(SubscriptionGeneralDataAndStats subscriptionData, bool history) + private SubscriptionGeneralDataAndStats GetSubscriptionInternal(SubscriptionState state, bool history) { - if (_subscriptions.TryGetValue(subscriptionData.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription)) + var subscriptionData = new SubscriptionGeneralDataAndStats(state); + if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription)) { subscriptionData.Connections = concurrentSubscription.GetConnections(); if (history)//Only valid if this is my subscription SetSubscriptionHistory(concurrentSubscription, subscriptionData); } + + return subscriptionData; } public void HandleDatabaseRecordChange(DatabaseRecord databaseRecord) diff --git a/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs b/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs index 4b3875d9be4f..5a28b7c72b1b 100644 --- a/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs +++ b/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs @@ -41,6 +41,8 @@ using Sparrow.Json.Parsing; using Sparrow.Logging; using Sparrow.Utils; +using static Raven.Server.Documents.Subscriptions.SubscriptionStorage; +using static Raven.Server.Documents.TcpHandlers.SubscriptionConnection; using Constants = Voron.Global.Constants; using Exception = System.Exception; using QueryParser = Raven.Server.Documents.Queries.Parser.QueryParser; @@ -129,7 +131,7 @@ public List GetBatchesPerformanceStats() public string LastSentChangeVectorInThisConnection; private bool _isDisposed; - public SubscriptionState SubscriptionState; + public SubscriptionStateWithRaftIndex SubscriptionState; public ParsedSubscription Subscription; @@ -185,7 +187,7 @@ private async Task ParseSubscriptionOptionsAsync() _logger = LoggingSource.Instance.GetLogger(TcpConnection.DocumentDatabase.Name, $"{nameof(SubscriptionConnection)}<{_options.SubscriptionName}>"); context.OpenReadTransaction(); - var subscriptionItemKey = SubscriptionState.GenerateSubscriptionItemKeyName(TcpConnection.DocumentDatabase.Name, _options.SubscriptionName); + var subscriptionItemKey = Client.Documents.Subscriptions.SubscriptionState.GenerateSubscriptionItemKeyName(TcpConnection.DocumentDatabase.Name, _options.SubscriptionName); var translation = TcpConnection.DocumentDatabase.ServerStore.Cluster.Read(context, subscriptionItemKey); if (translation == null) throw new SubscriptionDoesNotExistException("Cannot find any Subscription Task with name: " + _options.SubscriptionName); @@ -230,7 +232,9 @@ await TcpConnection.DocumentDatabase.SubscriptionStorage.AssertSubscriptionConne Subscription = ParseSubscriptionQuery(SubscriptionState.Query); // update the state if above data changed - _subscriptionConnectionsState.Initialize(this, afterSubscribe: true); + await _subscriptionConnectionsState.InitializeAsync(this, afterSubscribe: true); + + CancellationTokenSource.Token.ThrowIfCancellationRequested(); await TcpConnection.DocumentDatabase.SubscriptionStorage.UpdateClientConnectionTime(SubscriptionState.SubscriptionId, SubscriptionState.SubscriptionName, SubscriptionState.MentorNode); @@ -256,6 +260,7 @@ await WriteJsonAsync(new DynamicJsonValue try { + var sp = Stopwatch.StartNew(); while (true) { CancellationTokenSource.Token.ThrowIfCancellationRequested(); @@ -277,7 +282,7 @@ await WriteJsonAsync(new DynamicJsonValue var timeout = TimeSpan.FromMilliseconds(Math.Max(250, (long)_options.TimeToWaitBeforeConnectionRetry.TotalMilliseconds / 2) + random.Next(15, 50)); await Task.Delay(timeout); - await SendHeartBeat( + await SendHeartBeatIfNeededAsync(sp, $"A connection from IP {TcpConnection.TcpClient.Client.RemoteEndPoint} is waiting for Subscription Task that is serving a connection from IP " + $"{_subscriptionConnectionsState.GetConnectionsAsString()} to be released"); } @@ -289,6 +294,15 @@ await SendHeartBeat( } } + internal async Task SendHeartBeatIfNeededAsync(Stopwatch sp, string reason) + { + if (sp.ElapsedMilliseconds >= WaitForChangedDocumentsTimeoutInMs) + { + await SendHeartBeatAsync(reason); + sp.Restart(); + } + } + private void AssertSupportedFeatures() { if (_supportedFeatures.Subscription.Includes == false) @@ -367,7 +381,7 @@ public async Task Run(TcpConnectionOptions tcpConnectionOptions, IDisposable sub using (_pendingConnectionScope) { await InitAsync(); - _subscriptionConnectionsState = TcpConnection.DocumentDatabase.SubscriptionStorage.OpenSubscription(this); + _subscriptionConnectionsState = await TcpConnection.DocumentDatabase.SubscriptionStorage.OpenSubscriptionAsync(this); (disposeOnDisconnect, registerConnectionDurationInTicks) = await SubscribeAsync(); } @@ -714,7 +728,7 @@ await TcpConnection.DocumentDatabase.SubscriptionStorage.LegacyAcknowledgeBatchP UpdateBatchPerformanceStats(0, false); if (sendingCurrentBatchStopwatch.ElapsedMilliseconds > 1000) - await SendHeartBeat("Didn't find any documents to send and more then 1000ms passed"); + await SendHeartBeatAsync("Didn't find any documents to send and more then 1000ms passed"); using (docsContext.OpenReadTransaction()) { @@ -808,7 +822,7 @@ private async Task> WaitForClientAck(T break; } - await SendHeartBeat("Waiting for client ACK"); + await SendHeartBeatAsync("Waiting for client ACK"); await SubscriptionConnectionsState.SendNoopAck(); } @@ -918,7 +932,7 @@ private async Task TrySendingBatchToClient(DocumentsOperationContext docsC { if (sendingCurrentBatchStopwatch.ElapsedMilliseconds > 1000) { - await SendHeartBeat("Skipping docs for more than 1000ms without sending any data"); + await SendHeartBeatAsync("Skipping docs for more than 1000ms without sending any data"); sendingCurrentBatchStopwatch.Restart(); } @@ -1078,7 +1092,7 @@ private async Task TrySendingBatchToClient(DocumentsOperationContext docsC } } - private async Task SendHeartBeat(string reason) + internal async Task SendHeartBeatAsync(string reason) { try { @@ -1141,7 +1155,7 @@ private async Task WaitForChangedDocs(Task pendingReply) return true; } - await SendHeartBeat("Waiting for changed documents"); + await SendHeartBeatAsync("Waiting for changed documents"); await SubscriptionConnectionsState.SendNoopAck(); } while (CancellationTokenSource.IsCancellationRequested == false); return false; diff --git a/src/Raven.Server/Json/JsonDeserializationServer.cs b/src/Raven.Server/Json/JsonDeserializationServer.cs index 6dfa78a87932..131a5df246c8 100644 --- a/src/Raven.Server/Json/JsonDeserializationServer.cs +++ b/src/Raven.Server/Json/JsonDeserializationServer.cs @@ -55,6 +55,7 @@ using BackupConfiguration = Raven.Client.Documents.Operations.Backups.BackupConfiguration; using MigrationConfiguration = Raven.Server.Smuggler.Migration.MigrationConfiguration; using StudioConfiguration = Raven.Client.Documents.Operations.Configuration.StudioConfiguration; +using static Raven.Server.Documents.Subscriptions.SubscriptionStorage; namespace Raven.Server.Json { @@ -228,6 +229,8 @@ internal sealed class JsonDeserializationServer : JsonDeserializationBase public static readonly Func BlockingTombstoneDetails = GenerateJsonDeserializationRoutine(); + public static readonly Func SubscriptionStateWithRaftIndex = GenerateJsonDeserializationRoutine(); + public class Parameters { private Parameters() diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs index f1e318120bdf..dbe664db70db 100644 --- a/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs @@ -7,6 +7,7 @@ using Raven.Client.ServerWide; using Raven.Server.Documents.Subscriptions; using Raven.Server.Documents.TcpHandlers; +using Raven.Server.Json; using Raven.Server.Rachis; using Raven.Server.ServerWide.Context; using Sparrow.Binary; @@ -58,7 +59,7 @@ protected override UpdatedValue GetUpdatedValue(long index, RawDatabaseRecord re return new UpdatedValue(UpdatedValueActionType.Noop, value: null); } - var subscription = JsonDeserializationCluster.SubscriptionState(existingValue); + var subscription = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue); AssertSubscriptionState(record, subscription, subscriptionName); if (IsLegacyCommand()) diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs index e58ec5c769c7..0c13c1f28f67 100644 --- a/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs @@ -4,17 +4,17 @@ using Raven.Client.Exceptions.Documents.Subscriptions; using Raven.Client.ServerWide; using Raven.Server.Documents; +using Raven.Server.Documents.Replication; +using Raven.Server.Documents.Subscriptions; using Raven.Server.Documents.TcpHandlers; +using Raven.Server.Json; +using Raven.Server.Rachis; using Raven.Server.ServerWide.Context; using Sparrow.Json; using Sparrow.Json.Parsing; using Voron; using Voron.Data.Tables; -using Raven.Server.Documents.Replication; -using Raven.Server.Rachis; -using Voron.Impl.Paging; -using Raven.Client.Json.Serialization; -using Raven.Server.Documents.Subscriptions; +using static Raven.Server.Documents.Subscriptions.SubscriptionStorage; namespace Raven.Server.ServerWide.Commands.Subscriptions { @@ -68,7 +68,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items var ptr = tvr.Read(2, out int size); var doc = new BlittableJsonReaderObject(ptr, size, context); - var existingSubscriptionState = JsonDeserializationClient.SubscriptionState(doc); + var existingSubscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(doc); if (SubscriptionId != existingSubscriptionState.SubscriptionId) { if (string.IsNullOrEmpty(originalName)) @@ -105,7 +105,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items AssertValidChangeVector(); } - using (var receivedSubscriptionState = context.ReadObject(new SubscriptionState + using (var receivedSubscriptionState = context.ReadObject(new SubscriptionStateWithRaftIndex { Query = Query, ChangeVectorForNextBatchStartingPoint = InitialChangeVector, @@ -115,7 +115,8 @@ public override unsafe void Execute(ClusterOperationContext context, Table items Disabled = Disabled, MentorNode = MentorNode, PinToMentorNode = PinToMentorNode, - LastClientConnectionTime = null + LastClientConnectionTime = null, + LastModifiedIndex = index }.ToJson(), SubscriptionName)) { ClusterStateMachine.UpdateValue(index, items, valueNameLowered, valueName, receivedSubscriptionState); diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs index 69a665b3f084..d88825e48702 100644 --- a/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs @@ -3,9 +3,9 @@ using Raven.Client.Documents.Subscriptions; using Raven.Client.Exceptions.Database; using Raven.Client.Exceptions.Documents.Subscriptions; -using Raven.Client.Json.Serialization; using Raven.Client.ServerWide; using Raven.Server.Documents.Subscriptions; +using Raven.Server.Json; using Raven.Server.Rachis; using Raven.Server.ServerWide.Context; using Raven.Server.Utils; @@ -83,7 +83,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items if (existingValue == null) throw new SubscriptionDoesNotExistException($"Subscription with name '{subscriptionName}' does not exist in database '{DatabaseName}'"); - var subscriptionState = JsonDeserializationClient.SubscriptionState(existingValue); + var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue); var topology = record.Topology; var lastResponsibleNode = AcknowledgeSubscriptionBatchCommand.GetLastResponsibleNode(HasHighlyAvailableTasks, topology, NodeTag); diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs index 432728cb8da8..835d394bbf93 100644 --- a/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs @@ -3,6 +3,7 @@ using Raven.Client.Exceptions.Documents.Subscriptions; using Raven.Client.Json.Serialization; using Raven.Client.ServerWide; +using Raven.Server.Json; using Raven.Server.Rachis; using Raven.Server.ServerWide.Context; using Sparrow.Json; @@ -57,7 +58,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items var ptr = tvr.Read(2, out int size); var doc = new BlittableJsonReaderObject(ptr, size, context); - var subscriptionState = JsonDeserializationClient.SubscriptionState(doc); + var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(doc); subscriptionState.Disabled = Disable; using (var obj = context.ReadObject(subscriptionState.ToJson(), "subscription")) { diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs index 146a1c45f410..04ede87692da 100644 --- a/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs +++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs @@ -3,7 +3,7 @@ using Raven.Client.Exceptions.Database; using Raven.Client.Exceptions.Documents.Subscriptions; using Raven.Client.ServerWide; -using Raven.Server.Rachis; +using Raven.Server.Json; using Sparrow.Json; using Sparrow.Json.Parsing; @@ -32,7 +32,7 @@ protected override UpdatedValue GetUpdatedValue(long index, RawDatabaseRecord re if (existingValue == null) throw new SubscriptionDoesNotExistException($"Subscription with id '{itemId}' does not exist"); - var subscription = JsonDeserializationCluster.SubscriptionState(existingValue); + var subscription = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue); var topology = record.Topology; var lastResponsibleNode = AcknowledgeSubscriptionBatchCommand.GetLastResponsibleNode(HasHighlyAvailableTasks, topology, NodeTag); diff --git a/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs b/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs index a77792e58ea8..c6d3d2887dfe 100644 --- a/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs +++ b/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs @@ -1018,6 +1018,279 @@ await AssertWaitForValueAsync(() => } } + [RavenTheory(RavenTestCategory.Subscriptions)] + [InlineData(1)] + [InlineData(3)] + public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnections(int count) + { + using (var store = GetDocumentStore()) + { + var id = await store.Subscriptions.CreateAsync(); + var docs = new HashSet(); + + for (int i = 0; i < 10; i++) + { + using (var session = store.OpenSession()) + { + session.Store(new User(), $"users/{i}"); + session.SaveChanges(); + } + + var workers = new List>(); + for (int j = 0; j < count; j++) + { + workers.Add(store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id) + { + TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1), + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 1 + })); + } + + try + { + foreach (var worker in workers) + { + var t = worker.Run(x => + { + foreach (var item in x.Items) + { + docs.Add(item.Id); + } + }); + } + + await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds)); + await AssertNoLeftovers(store, id); + } + finally + { + foreach (var w in workers) + { + await w.DisposeAsync(); + } + } + } + } + } + + [RavenTheory(RavenTestCategory.Subscriptions)] + [InlineData(1)] + [InlineData(3)] + public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnectionsAfterUpdate(int count) + { + using (var store = GetDocumentStore()) + { + var id = await store.Subscriptions.CreateAsync(options: new SubscriptionCreationOptions() + { + Filter = user => user.Age == 0 + }); + + var docs = new HashSet(); + + for (int i = 0; i < 10; i++) + { + if (i > 0) + { + await store.Subscriptions.UpdateAsync(options: new SubscriptionUpdateOptions() + { + Name = id, + Query = @$"declare function predicate() {{ return this.Age==={i} }} +from 'Users' as doc +where predicate.call(doc)" + }); + } + + using (var session = store.OpenSession()) + { + session.Store(new User() + { + Age = i + }, $"users/{i}"); + session.SaveChanges(); + } + + var workers = new List>(); + for (int j = 0; j < count; j++) + { + workers.Add(store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id) + { + TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1), + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 1 + })); + } + + try + { + foreach (var worker in workers) + { + var t = worker.Run(x => + { + foreach (var item in x.Items) + { + docs.Add(item.Id); + } + }); + } + + await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds)); + await AssertNoLeftovers(store, id); + } + finally + { + foreach (var w in workers) + { + await w.DisposeAsync(); + } + } + } + + var subs = await store.Subscriptions.GetSubscriptionsAsync(0, 1024); + Assert.Equal(1, subs.Count); + } + } + + [RavenFact(RavenTestCategory.Subscriptions)] + public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnectionsAfterUpdate_AndDisposeWhileConnecting() + { + using (var store = GetDocumentStore()) + { + var id = await store.Subscriptions.CreateAsync(options: new SubscriptionCreationOptions() + { + Filter = user => user.Age == 0 + }); + + var docs = new HashSet(); + var workers = new List>(); + try + { + for (int i = 0; i < 10; i++) + { + if (i > 0) + { + // we update the subscription on each iteration + // so it will process only the new created document + await store.Subscriptions.UpdateAsync(options: new SubscriptionUpdateOptions() + { + Name = id, + Query = @$"declare function predicate() {{ return this.Age==={i} }} +from 'Users' as doc +where predicate.call(doc)" + }); + } + + using (var session = store.OpenSession()) + { + session.Store(new User() + { + Age = i + }, $"users/{i}"); + session.SaveChanges(); + } + + var w = store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id) + { + TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1), + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 1 + }); + workers.Add(w); + + var t2 = w.Run(x => + { + foreach (var item in x.Items) + { + docs.Add(item.Id); + Thread.Sleep(1000); + } + }); + + await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds)); + await AssertNoLeftovers(store, id); + } + } + finally + { + foreach (var w in workers) + { + await w.DisposeAsync(); + } + } + + var subs = await store.Subscriptions.GetSubscriptionsAsync(0, 1024); + Assert.Equal(1, subs.Count); + } + } + + [RavenFact(RavenTestCategory.Subscriptions)] + public async Task ConcurrentSubscriptionsShouldProcessWhen_2_ConnectionsAreSubscribingConcurrently() + { + using (var store = GetDocumentStore()) + { + var id = await store.Subscriptions.CreateAsync(); + + var db = await Databases.GetDocumentDatabaseInstanceFor(store); + var testingStuff = db.ForTestingPurposesOnly(); + + using (testingStuff.CallDuringWaitForSubscribe(connections => + { + while (connections.Count < 2) + { + Thread.Sleep(111); + } + })) + { + await using (var subscription = store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id) + { + TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(5), + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 2 + })) + await using (var secondSubscription = store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id) + { + Strategy = SubscriptionOpeningStrategy.Concurrent, + TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(5), + MaxDocsPerBatch = 2 + })) + { + using (var session = store.OpenSession()) + { + session.Store(new User(), "user/1"); + session.Store(new User(), "user/2"); + session.Store(new User(), "user/3"); + session.Store(new User(), "user/4"); + session.Store(new User(), "user/5"); + session.Store(new User(), "user/6"); + session.SaveChanges(); + } + + var con1Docs = new List(); + var con2Docs = new List(); + + var t = subscription.Run(x => + { + foreach (var item in x.Items) + { + con1Docs.Add(item.Id); + } + }); + + var _ = secondSubscription.Run(x => + { + foreach (var item in x.Items) + { + con2Docs.Add(item.Id); + } + }); + + await AssertWaitForTrueAsync(() => Task.FromResult(con1Docs.Count + con2Docs.Count == 6), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds)); + await AssertNoLeftovers(store, id); + } + } + } + } + private class GetSubscriptionResendListCommand : RavenCommand { private readonly string _database; @@ -1072,6 +1345,7 @@ private class ResendListResult private class User { public string Name; + public int Age; } } } diff --git a/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs b/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs index a54026d6313a..7e7e95f07c4d 100644 --- a/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs +++ b/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs @@ -65,7 +65,7 @@ public async Task SubscriptionWorkerRetryEventGivesCorrectError() Assert.True(await signalWhenStartedProcessingDoc.WaitAsync(_reasonableWaitTime)); var database = await GetDatabase(store.Database); - SubscriptionStorage.SubscriptionGeneralDataAndStats subscriptionState; + SubscriptionState subscriptionState; using (database.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context)) using (context.OpenReadTransaction()) { diff --git a/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs b/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs index 8cd126d9e6f8..29675d915e36 100644 --- a/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs +++ b/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs @@ -341,7 +341,7 @@ public async Task RunningSubscriptionShouldJumpToNextChangeVectorIfItWasChangedB Assert.True(await ackFirstCV.WaitAsync(_reasonableWaitTime)); - SubscriptionStorage.SubscriptionGeneralDataAndStats subscriptionState; + SubscriptionState subscriptionState; using (database.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context)) using (context.OpenReadTransaction()) {