Skip to content

Commit

Permalink
RavenDB-19957: fix concurrently connecting subscription connections i…
Browse files Browse the repository at this point in the history
…n ConcurrentSubscription
  • Loading branch information
garayx committed Oct 23, 2023
1 parent 56ef38a commit 0ac77c2
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 61 deletions.
7 changes: 7 additions & 0 deletions src/Raven.Server/Documents/DocumentDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,7 @@ internal IDisposable CallDuringDocumentDatabaseInternalDispose(Action action)

internal Action Subscription_ActionToCallDuringWaitForChangedDocuments;
internal Action<long> Subscription_ActionToCallAfterRegisterSubscriptionConnection;
internal Action<ConcurrentSet<SubscriptionConnection>> ConcurrentSubscription_ActionToCallDuringWaitForSubscribe;

internal IDisposable CallDuringWaitForChangedDocuments(Action action)
{
Expand All @@ -1921,6 +1922,12 @@ internal IDisposable CallAfterRegisterSubscriptionConnection(Action<long> action

return new DisposableAction(() => Subscription_ActionToCallAfterRegisterSubscriptionConnection = null);
}
internal IDisposable CallDuringWaitForSubscribe(Action<ConcurrentSet<SubscriptionConnection>> action)
{
ConcurrentSubscription_ActionToCallDuringWaitForSubscribe = action;

return new DisposableAction(() => ConcurrentSubscription_ActionToCallDuringWaitForSubscribe = null);
}

internal ManualResetEvent DatabaseRecordLoadHold;
internal ManualResetEvent HealthCheckHold;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
Expand All @@ -23,6 +24,7 @@
using Sparrow.Server.Utils;
using Sparrow.Threading;
using Voron;
using static Raven.Server.Documents.Subscriptions.SubscriptionStorage;

namespace Raven.Server.Documents.Subscriptions
{
Expand All @@ -32,7 +34,7 @@ public class SubscriptionConnectionsState : IDisposable
private readonly SubscriptionStorage _subscriptionStorage;
private readonly AsyncManualResetEvent _waitForMoreDocuments;
private DocumentsStorage _documentsStorage;
private SubscriptionState _subscriptionState;
private SubscriptionStateWithRaftIndex _subscriptionState;
private ConcurrentSet<SubscriptionConnection> _connections;
private string _subscriptionName;
private int _maxConcurrentConnections;
Expand All @@ -41,7 +43,6 @@ public class SubscriptionConnectionsState : IDisposable
public Task GetSubscriptionInUseAwaiter => Task.WhenAll(_connections.Select(c => c.SubscriptionConnectionTask));
public string SubscriptionName => _subscriptionName;
public long SubscriptionId => _subscriptionId;
public SubscriptionState SubscriptionState => _subscriptionState;

public bool IsConcurrent => _connections.FirstOrDefault()?.Strategy == SubscriptionOpeningStrategy.Concurrent;

Expand All @@ -57,6 +58,7 @@ public class SubscriptionConnectionsState : IDisposable
public DocumentDatabase DocumentDatabase => _documentsStorage.DocumentDatabase;

private readonly SemaphoreSlim _subscriptionActivelyWorkingLock;
private readonly SemaphoreSlim _subscriptionConnectingLock = new SemaphoreSlim(1);

public string LastChangeVectorSent;

Expand All @@ -74,25 +76,87 @@ public SubscriptionConnectionsState(long subscriptionId, SubscriptionStorage sto
_waitForMoreDocuments = new AsyncManualResetEvent(CancellationTokenSource.Token);
}

public void Initialize(SubscriptionConnection connection, bool afterSubscribe = false)
public async Task InitializeAsync(SubscriptionConnection connection, bool afterSubscribe = false)
{
_subscriptionName = connection.Options.SubscriptionName ?? _subscriptionId.ToString();
_subscriptionState = connection.SubscriptionState;
Query = connection.SubscriptionState.Query;

if (afterSubscribe == false)
return;

// update the subscription data only on new concurrent connection or regular connection
if (afterSubscribe && _connections.Count == 1)
if (IsConcurrent == false)
{
RefreshFeatures(connection);
return;
}

DocumentDatabase.ForTestingPurposes?.ConcurrentSubscription_ActionToCallDuringWaitForSubscribe?.Invoke(_connections);

connection.AddToStatusDescription("Starting to subscribe.");
var sp = Stopwatch.StartNew();
while (await _subscriptionConnectingLock.WaitAsync(SubscriptionConnection.WaitForChangedDocumentsTimeoutInMs) == false)
{
connection.CancellationTokenSource.Token.ThrowIfCancellationRequested();
await connection.SendHeartBeatAsync($"A connection from IP '{connection.ClientUri}' is waiting for other concurrent connections to subscribe.");
sp.Restart();
}

try
{
using (var old = _disposableNotificationsRegistration)
if (_subscriptionState == null)
{
_disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection);
// this connection is the first one, we initialize everything
RefreshFeatures(connection);
return;
}

InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint);
PreviouslyRecordedChangeVector = LastChangeVectorSent;
if (connection.SubscriptionState.LastModifiedIndex < _subscriptionState.LastModifiedIndex)
{
// this connection was modified while waiting to subscribe, lets try to drop it
DropSingleConnection(connection, new SubscriptionClosedException($"The subscription '{_subscriptionName}' was modified, connection have to be restarted.", canReconnect: true));
return;
}

if (connection.SubscriptionState.LastModifiedIndex == _subscriptionState.LastModifiedIndex)
{
// no changes in the subscription
return;
}

if (connection.SubscriptionState.LastModifiedIndex > _subscriptionState.LastModifiedIndex)
{
// we have new connection after subscription have changed
// we have to wait until old connections (with smaller raft index) will get disconnected
// then we continue and will re-initialize
while (_connections.Any(c => c.SubscriptionState.LastModifiedIndex == _subscriptionState.LastModifiedIndex))
{
connection.CancellationTokenSource.Token.ThrowIfCancellationRequested();
await Task.Delay(300);
await connection.SendHeartBeatIfNeededAsync(sp, $"A connection from IP '{connection.ClientUri}' is waiting for old subscription connections to disconnect.");
}

RefreshFeatures(connection);
}
}
finally
{
_subscriptionConnectingLock.Release();
}
}

private void RefreshFeatures(SubscriptionConnection connection)
{
using (var old = _disposableNotificationsRegistration)
{
_disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection);
}

InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint);
PreviouslyRecordedChangeVector = LastChangeVectorSent;
_subscriptionState = connection.SubscriptionState;
}

internal void InitializeLastChangeVectorSent(string changeVectorForNextBatchStartingPoint)
{
LastChangeVectorSent = changeVectorForNextBatchStartingPoint;
Expand Down
92 changes: 66 additions & 26 deletions src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Raven.Client;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Exceptions.Database;
using Raven.Client.Exceptions.Documents.Subscriptions;
Expand All @@ -19,6 +18,7 @@
using Raven.Client.Util;
using Raven.Server.Documents.Subscriptions.Stats;
using Raven.Server.Documents.TcpHandlers;
using Raven.Server.Json;
using Raven.Server.Rachis;
using Raven.Server.ServerWide;
using Raven.Server.ServerWide.Commands.Subscriptions;
Expand Down Expand Up @@ -103,10 +103,10 @@ public async Task<long> PutSubscription(SubscriptionCreationOptions options, str
return etag;
}

public SubscriptionConnectionsState OpenSubscription(SubscriptionConnection connection)
public async Task<SubscriptionConnectionsState> OpenSubscriptionAsync(SubscriptionConnection connection)
{
var subscriptionState = _subscriptions.GetOrAdd(connection.SubscriptionId, subId => new SubscriptionConnectionsState(subId, this));
subscriptionState.Initialize(connection);
await subscriptionState.InitializeAsync(connection);
return subscriptionState;
}

Expand Down Expand Up @@ -208,15 +208,15 @@ public string GetResponsibleNode(TransactionOperationContext serverContext, stri
return BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter);
}

public async Task<SubscriptionState> AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token)
public async Task<SubscriptionStateWithRaftIndex> AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token)
{
await _serverStore.WaitForCommitIndexChange(RachisConsensus.CommitIndexModification.GreaterOrEqual, id, token);

using (_serverStore.ContextPool.AllocateOperationContext(out TransactionOperationContext serverStoreContext))
using (serverStoreContext.OpenReadTransaction())
using (var record = _serverStore.Cluster.ReadRawDatabaseRecord(serverStoreContext, _db.Name))
{
var subscription = GetSubscriptionFromServerStore(serverStoreContext, name);
var subscription = GetSubscriptionWithRaftIndexFromServerStore(serverStoreContext, name);
var topology = record.Topology;

var whoseTaskIsIt = BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter);
Expand Down Expand Up @@ -279,7 +279,7 @@ public async Task<SubscriptionState> AssertSubscriptionConnectionDetails(long id
return subscription;
}

static void FillNodesAvailabilityReportForState(SubscriptionGeneralDataAndStats subscription, DatabaseTopology topology, Dictionary<string, string> databaseTopologyAvailabilityExplenation, List<string> stateGroup, string stateName)
static void FillNodesAvailabilityReportForState(SubscriptionState subscription, DatabaseTopology topology, Dictionary<string, string> databaseTopologyAvailabilityExplenation, List<string> stateGroup, string stateName)
{
foreach (var nodeInGroup in stateGroup)
{
Expand Down Expand Up @@ -486,8 +486,8 @@ public IEnumerable<SubscriptionGeneralDataAndStats> GetAllRunningSubscriptions(T
if (take-- <= 0)
yield break;

var subscriptionData = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName);
GetRunningSubscriptionInternal(history, subscriptionData, subscriptionConnectionsState);
var state = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName);
var subscriptionData = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState);
yield return subscriptionData;
}
}
Expand All @@ -508,27 +508,27 @@ public int GetNumberOfRunningSubscriptions()

public SubscriptionGeneralDataAndStats GetSubscription(TransactionOperationContext context, long? id, string name, bool history)
{
SubscriptionGeneralDataAndStats subscription;
SubscriptionState state;

if (string.IsNullOrEmpty(name) == false)
{
subscription = GetSubscriptionFromServerStore(context, name);
state = GetSubscriptionFromServerStore(context, name);
}
else if (id.HasValue)
{
subscription = GetSubscriptionFromServerStore(context, id.ToString());
state = GetSubscriptionFromServerStore(context, id.ToString());
}
else
{
throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data");
}

GetSubscriptionInternal(subscription, history);
var subscription = GetSubscriptionInternal(state, history);

return subscription;
}

public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(TransactionOperationContext context, string name)
public SubscriptionState GetSubscriptionFromServerStore(TransactionOperationContext context, string name)
{
var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name));

Expand All @@ -537,35 +537,45 @@ public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(Transactio

var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable);

var subscriptionJsonValue = new SubscriptionGeneralDataAndStats(subscriptionState);
return subscriptionState;
}

public SubscriptionStateWithRaftIndex GetSubscriptionWithRaftIndexFromServerStore(TransactionOperationContext context, string name)
{
var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name));

if (subscriptionBlittable == null)
throw new SubscriptionDoesNotExistException($"Subscription with name '{name}' was not found in server store");

var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(subscriptionBlittable);

return subscriptionJsonValue;
return subscriptionState;
}

public SubscriptionGeneralDataAndStats GetRunningSubscription(TransactionOperationContext context, long? id, string name, bool history)
{
SubscriptionGeneralDataAndStats subscription;
SubscriptionState state;
if (string.IsNullOrEmpty(name) == false)
{
subscription = GetSubscriptionFromServerStore(context, name);
state = GetSubscriptionFromServerStore(context, name);
}
else if (id.HasValue)
{
name = GetSubscriptionNameById(context, id.Value);
subscription = GetSubscriptionFromServerStore(context, name);
state = GetSubscriptionFromServerStore(context, name);
}
else
{
throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data");
}

if (_subscriptions.TryGetValue(subscription.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false)
if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false)
return null;

if (subscriptionConnectionsState.IsSubscriptionActive() == false)
return null;

GetRunningSubscriptionInternal(history, subscription, subscriptionConnectionsState);
var subscription = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState);
return subscription;
}

Expand All @@ -590,14 +600,35 @@ public SubscriptionConnectionsState GetSubscriptionConnectionsState<T>(Transacti
if (subscriptionBlittable == null)
return null;

var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable);
if (subscriptionBlittable.TryGet(nameof(SubscriptionState.SubscriptionId), out long id) == false)
{
if (_logger.IsOperationsEnabled)
_logger.Info($"Could not figure out the Subscription Task ID for subscription named: '{subscriptionName}'.");

if (_subscriptions.TryGetValue(subscriptionState.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription) == false)
return null;
}

if (_subscriptions.TryGetValue(id, out SubscriptionConnectionsState concurrentSubscription) == false)
return null;

return concurrentSubscription;
}

public class SubscriptionStateWithRaftIndex : SubscriptionState
{
// raft index used to create or update subscription task
public long LastModifiedIndex { get; set; }

public SubscriptionStateWithRaftIndex() { }

public override DynamicJsonValue ToJson()
{
var djv = base.ToJson();
djv[nameof(LastModifiedIndex)] = LastModifiedIndex;
return djv;
}
}

public class SubscriptionGeneralDataAndStats : SubscriptionState
{
public List<SubscriptionConnection> Connections;
Expand Down Expand Up @@ -644,22 +675,31 @@ private static void SetSubscriptionHistory(SubscriptionConnectionsState subscrip
subscriptionData.CurrentPendingConnections = subscriptionConnectionsState.PendingConnections;
}

private static void GetRunningSubscriptionInternal(bool history, SubscriptionGeneralDataAndStats subscriptionData, SubscriptionConnectionsState subscriptionConnectionsState)
private static SubscriptionGeneralDataAndStats GetRunningSubscriptionInternal(bool history, SubscriptionState state, SubscriptionConnectionsState subscriptionConnectionsState)
{
subscriptionData.Connections = subscriptionConnectionsState.GetConnections();
var subscriptionData = new SubscriptionGeneralDataAndStats(state)
{
Connections = subscriptionConnectionsState.GetConnections()
};

if (history) // Only valid for this node
SetSubscriptionHistory(subscriptionConnectionsState, subscriptionData);

return subscriptionData;
}

private void GetSubscriptionInternal(SubscriptionGeneralDataAndStats subscriptionData, bool history)
private SubscriptionGeneralDataAndStats GetSubscriptionInternal(SubscriptionState state, bool history)
{
if (_subscriptions.TryGetValue(subscriptionData.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription))
var subscriptionData = new SubscriptionGeneralDataAndStats(state);
if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription))
{
subscriptionData.Connections = concurrentSubscription.GetConnections();

if (history)//Only valid if this is my subscription
SetSubscriptionHistory(concurrentSubscription, subscriptionData);
}

return subscriptionData;
}

public void HandleDatabaseRecordChange(DatabaseRecord databaseRecord)
Expand Down
Loading

0 comments on commit 0ac77c2

Please sign in to comment.