diff --git a/src/Raven.Server/Documents/DocumentDatabase.cs b/src/Raven.Server/Documents/DocumentDatabase.cs
index f28362114584..ad980cd86324 100644
--- a/src/Raven.Server/Documents/DocumentDatabase.cs
+++ b/src/Raven.Server/Documents/DocumentDatabase.cs
@@ -1908,6 +1908,7 @@ internal IDisposable CallDuringDocumentDatabaseInternalDispose(Action action)
 
             internal Action Subscription_ActionToCallDuringWaitForChangedDocuments;
             internal Action<long> Subscription_ActionToCallAfterRegisterSubscriptionConnection;
+            internal Action<ConcurrentSet<SubscriptionConnection>> Subscription_ActionToCallDuringWaitForSubscribe;
 
             internal IDisposable CallDuringWaitForChangedDocuments(Action action)
             {
@@ -1921,6 +1922,12 @@ internal IDisposable CallAfterRegisterSubscriptionConnection(Action<long> action
 
                 return new DisposableAction(() => Subscription_ActionToCallAfterRegisterSubscriptionConnection = null);
             }
+            internal IDisposable CallDuringWaitForSubscribe(Action<ConcurrentSet<SubscriptionConnection>> action)
+            {
+                Subscription_ActionToCallDuringWaitForSubscribe = action;
+
+                return new DisposableAction(() => Subscription_ActionToCallDuringWaitForSubscribe = null);
+            }
 
             internal ManualResetEvent DatabaseRecordLoadHold;
             internal ManualResetEvent HealthCheckHold;
diff --git a/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs b/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs
index 74f64271ad96..5ddefadd544a 100644
--- a/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs
+++ b/src/Raven.Server/Documents/Subscriptions/SubscriptionConnectionsState.cs
@@ -1,6 +1,7 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Linq;
 using System.Text;
 using System.Threading;
@@ -23,6 +24,7 @@
 using Sparrow.Server.Utils;
 using Sparrow.Threading;
 using Voron;
+using static Raven.Server.Documents.Subscriptions.SubscriptionStorage;
 
 namespace Raven.Server.Documents.Subscriptions
 {
@@ -32,7 +34,7 @@ public class SubscriptionConnectionsState : IDisposable
         private readonly SubscriptionStorage _subscriptionStorage;
         private readonly AsyncManualResetEvent _waitForMoreDocuments;
         private DocumentsStorage _documentsStorage;
-        private SubscriptionState _subscriptionState;
+        private SubscriptionStateWithRaftIndex _subscriptionState;
         private ConcurrentSet<SubscriptionConnection> _connections;
         private string _subscriptionName;
         private int _maxConcurrentConnections;
@@ -41,7 +43,6 @@ public class SubscriptionConnectionsState : IDisposable
         public Task GetSubscriptionInUseAwaiter => Task.WhenAll(_connections.Select(c => c.SubscriptionConnectionTask));
         public string SubscriptionName => _subscriptionName;
         public long SubscriptionId => _subscriptionId;
-        public SubscriptionState SubscriptionState => _subscriptionState;
 
         public bool IsConcurrent => _connections.FirstOrDefault()?.Strategy == SubscriptionOpeningStrategy.Concurrent;
 
@@ -57,6 +58,7 @@ public class SubscriptionConnectionsState : IDisposable
         public DocumentDatabase DocumentDatabase => _documentsStorage.DocumentDatabase;
 
         private readonly SemaphoreSlim _subscriptionActivelyWorkingLock;
+        private readonly SemaphoreSlim _subscriptionConnectingLock = new SemaphoreSlim(1);
 
         public string LastChangeVectorSent;
 
@@ -74,25 +76,87 @@ public SubscriptionConnectionsState(long subscriptionId, SubscriptionStorage sto
             _waitForMoreDocuments = new AsyncManualResetEvent(CancellationTokenSource.Token);
         }
 
-        public void Initialize(SubscriptionConnection connection, bool afterSubscribe = false)
+        public async Task InitializeAsync(SubscriptionConnection connection, bool afterSubscribe = false)
         {
             _subscriptionName = connection.Options.SubscriptionName ?? _subscriptionId.ToString();
-            _subscriptionState = connection.SubscriptionState;
             Query = connection.SubscriptionState.Query;
 
+            if (afterSubscribe == false)
+                return;
+
             // update the subscription data only on new concurrent connection or regular connection
-            if (afterSubscribe && _connections.Count == 1)
+            if (IsConcurrent == false)
+            {
+                RefreshFeatures(connection);
+                return;
+            }
+
+            DocumentDatabase.ForTestingPurposes?.Subscription_ActionToCallDuringWaitForSubscribe?.Invoke(_connections);
+
+            connection.AddToStatusDescription("Starting to subscribe.");
+            var sp = Stopwatch.StartNew();
+            while (await _subscriptionConnectingLock.WaitAsync(SubscriptionConnection.WaitForChangedDocumentsTimeoutInMs) == false)
+            {
+                connection.CancellationTokenSource.Token.ThrowIfCancellationRequested();
+                await connection.SendHeartBeatAsync($"A connection from IP '{connection.ClientUri}' is waiting for other concurrent connections to subscribe.");
+                sp.Restart();
+            }
+
+            try
             {
-                using (var old = _disposableNotificationsRegistration)
+                if (_subscriptionState == null)
                 {
-                    _disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection);
+                    // this connection is the first one, we initialize everything
+                    RefreshFeatures(connection);
+                    return;
                 }
 
-                InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint);
-                PreviouslyRecordedChangeVector = LastChangeVectorSent;
+                if (connection.SubscriptionState.RaftIndex < _subscriptionState.RaftIndex)
+                {
+                    // this connection was modified while waiting to subscribe, lets try to drop it
+                    DropSingleConnection(connection, new SubscriptionClosedException($"The subscription '{_subscriptionName}' was modified, connection have to be restarted.", canReconnect: true));
+                    return;
+                }
+
+                if (connection.SubscriptionState.RaftIndex == _subscriptionState.RaftIndex)
+                {
+                    // no changes in the subscription
+                    return;
+                }
+
+                if (connection.SubscriptionState.RaftIndex > _subscriptionState.RaftIndex)
+                {
+                    // we have new connection after subscription have changed
+                    // we have to wait until old connections (with smaller raft index) will get disconnected
+                    // then we continue and will re-initialize 
+                    while (_connections.Any(c => c.SubscriptionState.RaftIndex == _subscriptionState.RaftIndex))
+                    {
+                        connection.CancellationTokenSource.Token.ThrowIfCancellationRequested();
+                        await Task.Delay(300);
+                        await connection.SendHeartBeatIfNeededAsync(sp, $"A connection from IP '{connection.ClientUri}' is waiting for old subscription connections to disconnect.");
+                    }
+
+                    RefreshFeatures(connection);
+                }
+            }
+            finally
+            {
+                _subscriptionConnectingLock.Release();
             }
         }
 
+        private void RefreshFeatures(SubscriptionConnection connection)
+        {
+            using (var old = _disposableNotificationsRegistration)
+            {
+                _disposableNotificationsRegistration = RegisterForNotificationOnNewDocuments(connection);
+            }
+
+            InitializeLastChangeVectorSent(connection.SubscriptionState.ChangeVectorForNextBatchStartingPoint);
+            PreviouslyRecordedChangeVector = LastChangeVectorSent;
+            _subscriptionState = connection.SubscriptionState;
+        }
+
         internal void InitializeLastChangeVectorSent(string changeVectorForNextBatchStartingPoint)
         {
             LastChangeVectorSent = changeVectorForNextBatchStartingPoint;
diff --git a/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs b/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs
index 49e1bb476dc9..5c9a2c58ab16 100644
--- a/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs
+++ b/src/Raven.Server/Documents/Subscriptions/SubscriptionStorage.cs
@@ -10,7 +10,6 @@
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
-using Raven.Client;
 using Raven.Client.Documents.Subscriptions;
 using Raven.Client.Exceptions.Database;
 using Raven.Client.Exceptions.Documents.Subscriptions;
@@ -19,6 +18,7 @@
 using Raven.Client.Util;
 using Raven.Server.Documents.Subscriptions.Stats;
 using Raven.Server.Documents.TcpHandlers;
+using Raven.Server.Json;
 using Raven.Server.Rachis;
 using Raven.Server.ServerWide;
 using Raven.Server.ServerWide.Commands.Subscriptions;
@@ -103,10 +103,10 @@ public async Task<long> PutSubscription(SubscriptionCreationOptions options, str
             return etag;
         }
 
-        public SubscriptionConnectionsState OpenSubscription(SubscriptionConnection connection)
+        public async Task<SubscriptionConnectionsState> OpenSubscriptionAsync(SubscriptionConnection connection)
         {
             var subscriptionState = _subscriptions.GetOrAdd(connection.SubscriptionId, subId => new SubscriptionConnectionsState(subId, this));
-            subscriptionState.Initialize(connection);
+            await subscriptionState.InitializeAsync(connection);
             return subscriptionState;
         }
 
@@ -208,7 +208,7 @@ public string GetResponsibleNode(TransactionOperationContext serverContext, stri
             return BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter);
         }
 
-        public async Task<SubscriptionState> AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token)
+        public async Task<SubscriptionStateWithRaftIndex> AssertSubscriptionConnectionDetails(long id, string name, long? registerConnectionDurationInTicks, CancellationToken token)
         {
             await _serverStore.WaitForCommitIndexChange(RachisConsensus.CommitIndexModification.GreaterOrEqual, id, token);
 
@@ -216,7 +216,7 @@ public async Task<SubscriptionState> AssertSubscriptionConnectionDetails(long id
             using (serverStoreContext.OpenReadTransaction())
             using (var record = _serverStore.Cluster.ReadRawDatabaseRecord(serverStoreContext, _db.Name))
             {
-                var subscription = GetSubscriptionFromServerStore(serverStoreContext, name);
+                var subscription = GetSubscriptionWithRaftIndexFromServerStore(serverStoreContext, name);
                 var topology = record.Topology;
 
                 var whoseTaskIsIt = BackupUtils.WhoseTaskIsIt(_serverStore, topology, subscription, subscription, _db.NotificationCenter);
@@ -279,7 +279,7 @@ public async Task<SubscriptionState> AssertSubscriptionConnectionDetails(long id
                 return subscription;
             }
 
-            static void FillNodesAvailabilityReportForState(SubscriptionGeneralDataAndStats subscription, DatabaseTopology topology, Dictionary<string, string> databaseTopologyAvailabilityExplenation, List<string> stateGroup, string stateName)
+            static void FillNodesAvailabilityReportForState(SubscriptionState subscription, DatabaseTopology topology, Dictionary<string, string> databaseTopologyAvailabilityExplenation, List<string> stateGroup, string stateName)
             {
                 foreach (var nodeInGroup in stateGroup)
                 {
@@ -486,8 +486,8 @@ public IEnumerable<SubscriptionGeneralDataAndStats> GetAllRunningSubscriptions(T
                 if (take-- <= 0)
                     yield break;
 
-                var subscriptionData = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName);
-                GetRunningSubscriptionInternal(history, subscriptionData, subscriptionConnectionsState);
+                var state = GetSubscriptionFromServerStore(context, subscriptionConnectionsState.SubscriptionName);
+                var subscriptionData = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState);
                 yield return subscriptionData;
             }
         }
@@ -508,27 +508,27 @@ public int GetNumberOfRunningSubscriptions()
 
         public SubscriptionGeneralDataAndStats GetSubscription(TransactionOperationContext context, long? id, string name, bool history)
         {
-            SubscriptionGeneralDataAndStats subscription;
+            SubscriptionState state;
 
             if (string.IsNullOrEmpty(name) == false)
             {
-                subscription = GetSubscriptionFromServerStore(context, name);
+                state = GetSubscriptionFromServerStore(context, name);
             }
             else if (id.HasValue)
             {
-                subscription = GetSubscriptionFromServerStore(context, id.ToString());
+                state = GetSubscriptionFromServerStore(context, id.ToString());
             }
             else
             {
                 throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data");
             }
 
-            GetSubscriptionInternal(subscription, history);
+            var subscription = GetSubscriptionInternal(state, history);
 
             return subscription;
         }
 
-        public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(TransactionOperationContext context, string name)
+        public SubscriptionState GetSubscriptionFromServerStore(TransactionOperationContext context, string name)
         {
             var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name));
 
@@ -537,35 +537,45 @@ public SubscriptionGeneralDataAndStats GetSubscriptionFromServerStore(Transactio
 
             var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable);
 
-            var subscriptionJsonValue = new SubscriptionGeneralDataAndStats(subscriptionState);
+            return subscriptionState;
+        }
+
+        public SubscriptionStateWithRaftIndex GetSubscriptionWithRaftIndexFromServerStore(TransactionOperationContext context, string name)
+        {
+            var subscriptionBlittable = _serverStore.Cluster.Read(context, SubscriptionState.GenerateSubscriptionItemKeyName(_db.Name, name));
+
+            if (subscriptionBlittable == null)
+                throw new SubscriptionDoesNotExistException($"Subscription with name '{name}' was not found in server store");
+
+            var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(subscriptionBlittable);
 
-            return subscriptionJsonValue;
+            return subscriptionState;
         }
 
         public SubscriptionGeneralDataAndStats GetRunningSubscription(TransactionOperationContext context, long? id, string name, bool history)
         {
-            SubscriptionGeneralDataAndStats subscription;
+            SubscriptionState state;
             if (string.IsNullOrEmpty(name) == false)
             {
-                subscription = GetSubscriptionFromServerStore(context, name);
+                state = GetSubscriptionFromServerStore(context, name);
             }
             else if (id.HasValue)
             {
                 name = GetSubscriptionNameById(context, id.Value);
-                subscription = GetSubscriptionFromServerStore(context, name);
+                state = GetSubscriptionFromServerStore(context, name);
             }
             else
             {
                 throw new ArgumentNullException("Must receive either subscription id or subscription name in order to provide subscription data");
             }
 
-            if (_subscriptions.TryGetValue(subscription.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false)
+            if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState subscriptionConnectionsState) == false)
                 return null;
 
             if (subscriptionConnectionsState.IsSubscriptionActive() == false)
                 return null;
 
-            GetRunningSubscriptionInternal(history, subscription, subscriptionConnectionsState);
+            var subscription = GetRunningSubscriptionInternal(history, state, subscriptionConnectionsState);
             return subscription;
         }
 
@@ -590,14 +600,35 @@ public SubscriptionConnectionsState GetSubscriptionConnectionsState<T>(Transacti
             if (subscriptionBlittable == null)
                 return null;
 
-            var subscriptionState = JsonDeserializationClient.SubscriptionState(subscriptionBlittable);
+            if (subscriptionBlittable.TryGet(nameof(SubscriptionState.SubscriptionId), out long id) == false)
+            {
+                if (_logger.IsOperationsEnabled)
+                    _logger.Info($"Could not figure out the Subscription Task ID for subscription named: '{subscriptionName}'.");
 
-            if (_subscriptions.TryGetValue(subscriptionState.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription) == false)
+                return null;
+            }
+
+            if (_subscriptions.TryGetValue(id, out SubscriptionConnectionsState concurrentSubscription) == false)
                 return null;
 
             return concurrentSubscription;
         }
 
+        public class SubscriptionStateWithRaftIndex : SubscriptionState
+        {
+            // raft index used to create or update subscription task
+            public long RaftIndex { get; set; }
+
+            public SubscriptionStateWithRaftIndex() { }
+
+            public override DynamicJsonValue ToJson()
+            {
+                var djv = base.ToJson();
+                djv[nameof(RaftIndex)] = RaftIndex;
+                return djv;
+            }
+        }
+
         public class SubscriptionGeneralDataAndStats : SubscriptionState
         {
             public List<SubscriptionConnection> Connections;
@@ -644,22 +675,31 @@ private static void SetSubscriptionHistory(SubscriptionConnectionsState subscrip
             subscriptionData.CurrentPendingConnections = subscriptionConnectionsState.PendingConnections;
         }
 
-        private static void GetRunningSubscriptionInternal(bool history, SubscriptionGeneralDataAndStats subscriptionData, SubscriptionConnectionsState subscriptionConnectionsState)
+        private static SubscriptionGeneralDataAndStats GetRunningSubscriptionInternal(bool history, SubscriptionState state, SubscriptionConnectionsState subscriptionConnectionsState)
         {
-            subscriptionData.Connections = subscriptionConnectionsState.GetConnections();
+            var subscriptionData = new SubscriptionGeneralDataAndStats(state)
+            {
+                Connections = subscriptionConnectionsState.GetConnections()
+            };
+
             if (history) // Only valid for this node
                 SetSubscriptionHistory(subscriptionConnectionsState, subscriptionData);
+
+            return subscriptionData;
         }
 
-        private void GetSubscriptionInternal(SubscriptionGeneralDataAndStats subscriptionData, bool history)
+        private SubscriptionGeneralDataAndStats GetSubscriptionInternal(SubscriptionState state, bool history)
         {
-            if (_subscriptions.TryGetValue(subscriptionData.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription))
+            var subscriptionData = new SubscriptionGeneralDataAndStats(state);
+            if (_subscriptions.TryGetValue(state.SubscriptionId, out SubscriptionConnectionsState concurrentSubscription))
             {
                 subscriptionData.Connections = concurrentSubscription.GetConnections();
 
                 if (history)//Only valid if this is my subscription
                     SetSubscriptionHistory(concurrentSubscription, subscriptionData);
             }
+
+            return subscriptionData;
         }
 
         public void HandleDatabaseRecordChange(DatabaseRecord databaseRecord)
diff --git a/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs b/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs
index 4b3875d9be4f..5a28b7c72b1b 100644
--- a/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs
+++ b/src/Raven.Server/Documents/TcpHandlers/SubscriptionConnection.cs
@@ -41,6 +41,8 @@
 using Sparrow.Json.Parsing;
 using Sparrow.Logging;
 using Sparrow.Utils;
+using static Raven.Server.Documents.Subscriptions.SubscriptionStorage;
+using static Raven.Server.Documents.TcpHandlers.SubscriptionConnection;
 using Constants = Voron.Global.Constants;
 using Exception = System.Exception;
 using QueryParser = Raven.Server.Documents.Queries.Parser.QueryParser;
@@ -129,7 +131,7 @@ public List<SubscriptionBatchStatsAggregator> GetBatchesPerformanceStats()
         public string LastSentChangeVectorInThisConnection;
 
         private bool _isDisposed;
-        public SubscriptionState SubscriptionState;
+        public SubscriptionStateWithRaftIndex SubscriptionState;
 
         public ParsedSubscription Subscription;
 
@@ -185,7 +187,7 @@ private async Task ParseSubscriptionOptionsAsync()
                 _logger = LoggingSource.Instance.GetLogger(TcpConnection.DocumentDatabase.Name, $"{nameof(SubscriptionConnection)}<{_options.SubscriptionName}>");
                 context.OpenReadTransaction();
 
-                var subscriptionItemKey = SubscriptionState.GenerateSubscriptionItemKeyName(TcpConnection.DocumentDatabase.Name, _options.SubscriptionName);
+                var subscriptionItemKey = Client.Documents.Subscriptions.SubscriptionState.GenerateSubscriptionItemKeyName(TcpConnection.DocumentDatabase.Name, _options.SubscriptionName);
                 var translation = TcpConnection.DocumentDatabase.ServerStore.Cluster.Read(context, subscriptionItemKey);
                 if (translation == null)
                     throw new SubscriptionDoesNotExistException("Cannot find any Subscription Task with name: " + _options.SubscriptionName);
@@ -230,7 +232,9 @@ await TcpConnection.DocumentDatabase.SubscriptionStorage.AssertSubscriptionConne
             Subscription = ParseSubscriptionQuery(SubscriptionState.Query);
 
             // update the state if above data changed
-            _subscriptionConnectionsState.Initialize(this, afterSubscribe: true);
+            await _subscriptionConnectionsState.InitializeAsync(this, afterSubscribe: true);
+
+            CancellationTokenSource.Token.ThrowIfCancellationRequested();
 
             await TcpConnection.DocumentDatabase.SubscriptionStorage.UpdateClientConnectionTime(SubscriptionState.SubscriptionId,
                 SubscriptionState.SubscriptionName, SubscriptionState.MentorNode);
@@ -256,6 +260,7 @@ await WriteJsonAsync(new DynamicJsonValue
 
             try
             {
+                var sp = Stopwatch.StartNew();
                 while (true)
                 {
                     CancellationTokenSource.Token.ThrowIfCancellationRequested();
@@ -277,7 +282,7 @@ await WriteJsonAsync(new DynamicJsonValue
 
                         var timeout = TimeSpan.FromMilliseconds(Math.Max(250, (long)_options.TimeToWaitBeforeConnectionRetry.TotalMilliseconds / 2) + random.Next(15, 50));
                         await Task.Delay(timeout);
-                        await SendHeartBeat(
+                        await SendHeartBeatIfNeededAsync(sp,
                             $"A connection from IP {TcpConnection.TcpClient.Client.RemoteEndPoint} is waiting for Subscription Task that is serving a connection from IP " +
                             $"{_subscriptionConnectionsState.GetConnectionsAsString()} to be released");
                     }
@@ -289,6 +294,15 @@ await SendHeartBeat(
             }
         }
 
+        internal async Task SendHeartBeatIfNeededAsync(Stopwatch sp, string reason)
+        {
+            if (sp.ElapsedMilliseconds >= WaitForChangedDocumentsTimeoutInMs)
+            {
+                await SendHeartBeatAsync(reason);
+                sp.Restart();
+            }
+        }
+
         private void AssertSupportedFeatures()
         {
             if (_supportedFeatures.Subscription.Includes == false)
@@ -367,7 +381,7 @@ public async Task Run(TcpConnectionOptions tcpConnectionOptions, IDisposable sub
                         using (_pendingConnectionScope)
                         {
                             await InitAsync();
-                            _subscriptionConnectionsState = TcpConnection.DocumentDatabase.SubscriptionStorage.OpenSubscription(this);
+                            _subscriptionConnectionsState = await TcpConnection.DocumentDatabase.SubscriptionStorage.OpenSubscriptionAsync(this);
                             (disposeOnDisconnect, registerConnectionDurationInTicks) = await SubscribeAsync();
                         }
 
@@ -714,7 +728,7 @@ await TcpConnection.DocumentDatabase.SubscriptionStorage.LegacyAcknowledgeBatchP
                                     UpdateBatchPerformanceStats(0, false);
 
                                     if (sendingCurrentBatchStopwatch.ElapsedMilliseconds > 1000)
-                                        await SendHeartBeat("Didn't find any documents to send and more then 1000ms passed");
+                                        await SendHeartBeatAsync("Didn't find any documents to send and more then 1000ms passed");
 
                                     using (docsContext.OpenReadTransaction())
                                     {
@@ -808,7 +822,7 @@ private async Task<Task<SubscriptionConnectionClientMessage>> WaitForClientAck(T
                     break;
                 }
 
-                await SendHeartBeat("Waiting for client ACK");
+                await SendHeartBeatAsync("Waiting for client ACK");
                 await SubscriptionConnectionsState.SendNoopAck();
             }
 
@@ -918,7 +932,7 @@ private async Task<bool> TrySendingBatchToClient(DocumentsOperationContext docsC
                                 {
                                     if (sendingCurrentBatchStopwatch.ElapsedMilliseconds > 1000)
                                     {
-                                        await SendHeartBeat("Skipping docs for more than 1000ms without sending any data");
+                                        await SendHeartBeatAsync("Skipping docs for more than 1000ms without sending any data");
                                         sendingCurrentBatchStopwatch.Restart();
                                     }
 
@@ -1078,7 +1092,7 @@ private async Task<bool> TrySendingBatchToClient(DocumentsOperationContext docsC
             }
         }
 
-        private async Task SendHeartBeat(string reason)
+        internal async Task SendHeartBeatAsync(string reason)
         {
             try
             {
@@ -1141,7 +1155,7 @@ private async Task<bool> WaitForChangedDocs(Task pendingReply)
                     return true;
                 }
 
-                await SendHeartBeat("Waiting for changed documents");
+                await SendHeartBeatAsync("Waiting for changed documents");
                 await SubscriptionConnectionsState.SendNoopAck();
             } while (CancellationTokenSource.IsCancellationRequested == false);
             return false;
diff --git a/src/Raven.Server/Json/JsonDeserializationServer.cs b/src/Raven.Server/Json/JsonDeserializationServer.cs
index 6dfa78a87932..131a5df246c8 100644
--- a/src/Raven.Server/Json/JsonDeserializationServer.cs
+++ b/src/Raven.Server/Json/JsonDeserializationServer.cs
@@ -55,6 +55,7 @@
 using BackupConfiguration = Raven.Client.Documents.Operations.Backups.BackupConfiguration;
 using MigrationConfiguration = Raven.Server.Smuggler.Migration.MigrationConfiguration;
 using StudioConfiguration = Raven.Client.Documents.Operations.Configuration.StudioConfiguration;
+using static Raven.Server.Documents.Subscriptions.SubscriptionStorage;
 
 namespace Raven.Server.Json
 {
@@ -228,6 +229,8 @@ internal sealed class JsonDeserializationServer : JsonDeserializationBase
 
         public static readonly Func<BlittableJsonReaderObject, BlockingTombstoneDetails> BlockingTombstoneDetails = GenerateJsonDeserializationRoutine<BlockingTombstoneDetails>();
 
+        public static readonly Func<BlittableJsonReaderObject, SubscriptionStateWithRaftIndex> SubscriptionStateWithRaftIndex = GenerateJsonDeserializationRoutine<SubscriptionStateWithRaftIndex>();
+
         public class Parameters
         {
             private Parameters()
diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs
index f1e318120bdf..dbe664db70db 100644
--- a/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs
+++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/AcknowledgeSubscriptionBatchCommand.cs
@@ -7,6 +7,7 @@
 using Raven.Client.ServerWide;
 using Raven.Server.Documents.Subscriptions;
 using Raven.Server.Documents.TcpHandlers;
+using Raven.Server.Json;
 using Raven.Server.Rachis;
 using Raven.Server.ServerWide.Context;
 using Sparrow.Binary;
@@ -58,7 +59,7 @@ protected override UpdatedValue GetUpdatedValue(long index, RawDatabaseRecord re
                 return new UpdatedValue(UpdatedValueActionType.Noop, value: null);
             }
 
-            var subscription = JsonDeserializationCluster.SubscriptionState(existingValue);
+            var subscription = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue);
             AssertSubscriptionState(record, subscription, subscriptionName);
 
             if (IsLegacyCommand())
diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs
index e58ec5c769c7..d6408f642c30 100644
--- a/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs
+++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/PutSubscriptionCommand.cs
@@ -4,17 +4,17 @@
 using Raven.Client.Exceptions.Documents.Subscriptions;
 using Raven.Client.ServerWide;
 using Raven.Server.Documents;
+using Raven.Server.Documents.Replication;
+using Raven.Server.Documents.Subscriptions;
 using Raven.Server.Documents.TcpHandlers;
+using Raven.Server.Json;
+using Raven.Server.Rachis;
 using Raven.Server.ServerWide.Context;
 using Sparrow.Json;
 using Sparrow.Json.Parsing;
 using Voron;
 using Voron.Data.Tables;
-using Raven.Server.Documents.Replication;
-using Raven.Server.Rachis;
-using Voron.Impl.Paging;
-using Raven.Client.Json.Serialization;
-using Raven.Server.Documents.Subscriptions;
+using static Raven.Server.Documents.Subscriptions.SubscriptionStorage;
 
 namespace Raven.Server.ServerWide.Commands.Subscriptions
 {
@@ -68,7 +68,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items
                         var ptr = tvr.Read(2, out int size);
                         var doc = new BlittableJsonReaderObject(ptr, size, context);
 
-                        var existingSubscriptionState = JsonDeserializationClient.SubscriptionState(doc);
+                        var existingSubscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(doc);
                         if (SubscriptionId != existingSubscriptionState.SubscriptionId)
                         {
                             if (string.IsNullOrEmpty(originalName))
@@ -105,7 +105,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items
                         AssertValidChangeVector();
                     }
 
-                    using (var receivedSubscriptionState = context.ReadObject(new SubscriptionState
+                    using (var receivedSubscriptionState = context.ReadObject(new SubscriptionStateWithRaftIndex
                     {
                         Query = Query,
                         ChangeVectorForNextBatchStartingPoint = InitialChangeVector,
@@ -115,7 +115,8 @@ public override unsafe void Execute(ClusterOperationContext context, Table items
                         Disabled = Disabled,
                         MentorNode = MentorNode,
                         PinToMentorNode = PinToMentorNode,
-                        LastClientConnectionTime = null
+                        LastClientConnectionTime = null,
+                        RaftIndex = index
                     }.ToJson(), SubscriptionName))
                     {
                         ClusterStateMachine.UpdateValue(index, items, valueNameLowered, valueName, receivedSubscriptionState);
diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs
index 69a665b3f084..d88825e48702 100644
--- a/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs
+++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/RecordBatchSubscriptionDocumentsCommand.cs
@@ -3,9 +3,9 @@
 using Raven.Client.Documents.Subscriptions;
 using Raven.Client.Exceptions.Database;
 using Raven.Client.Exceptions.Documents.Subscriptions;
-using Raven.Client.Json.Serialization;
 using Raven.Client.ServerWide;
 using Raven.Server.Documents.Subscriptions;
+using Raven.Server.Json;
 using Raven.Server.Rachis;
 using Raven.Server.ServerWide.Context;
 using Raven.Server.Utils;
@@ -83,7 +83,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items
                 if (existingValue == null)
                     throw new SubscriptionDoesNotExistException($"Subscription with name '{subscriptionName}' does not exist in database '{DatabaseName}'");
 
-                var subscriptionState = JsonDeserializationClient.SubscriptionState(existingValue);
+                var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue);
 
                 var topology = record.Topology;
                 var lastResponsibleNode = AcknowledgeSubscriptionBatchCommand.GetLastResponsibleNode(HasHighlyAvailableTasks, topology, NodeTag);
diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs
index 432728cb8da8..835d394bbf93 100644
--- a/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs
+++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/ToggleSubscriptionStateCommand.cs
@@ -3,6 +3,7 @@
 using Raven.Client.Exceptions.Documents.Subscriptions;
 using Raven.Client.Json.Serialization;
 using Raven.Client.ServerWide;
+using Raven.Server.Json;
 using Raven.Server.Rachis;
 using Raven.Server.ServerWide.Context;
 using Sparrow.Json;
@@ -57,7 +58,7 @@ public override unsafe void Execute(ClusterOperationContext context, Table items
                 var ptr = tvr.Read(2, out int size);
                 var doc = new BlittableJsonReaderObject(ptr, size, context);
 
-                var subscriptionState = JsonDeserializationClient.SubscriptionState(doc);
+                var subscriptionState = JsonDeserializationServer.SubscriptionStateWithRaftIndex(doc);
                 subscriptionState.Disabled = Disable;
                 using (var obj = context.ReadObject(subscriptionState.ToJson(), "subscription"))
                 {
diff --git a/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs b/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs
index 146a1c45f410..04ede87692da 100644
--- a/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs
+++ b/src/Raven.Server/ServerWide/Commands/Subscriptions/UpdateSubscriptionClientConnectionTime.cs
@@ -3,7 +3,7 @@
 using Raven.Client.Exceptions.Database;
 using Raven.Client.Exceptions.Documents.Subscriptions;
 using Raven.Client.ServerWide;
-using Raven.Server.Rachis;
+using Raven.Server.Json;
 using Sparrow.Json;
 using Sparrow.Json.Parsing;
 
@@ -32,7 +32,7 @@ protected override UpdatedValue GetUpdatedValue(long index, RawDatabaseRecord re
             if (existingValue == null)
                 throw new SubscriptionDoesNotExistException($"Subscription with id '{itemId}' does not exist");
 
-            var subscription = JsonDeserializationCluster.SubscriptionState(existingValue);
+            var subscription = JsonDeserializationServer.SubscriptionStateWithRaftIndex(existingValue);
 
             var topology = record.Topology;
             var lastResponsibleNode = AcknowledgeSubscriptionBatchCommand.GetLastResponsibleNode(HasHighlyAvailableTasks, topology, NodeTag);
diff --git a/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs b/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs
index a77792e58ea8..e04e009efcd8 100644
--- a/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs
+++ b/test/SlowTests/Client/Subscriptions/ConcurrentSubscriptionsTests.cs
@@ -1018,6 +1018,279 @@ await AssertWaitForValueAsync(() =>
             }
         }
 
+        [Theory]
+        [InlineData(1)]
+        [InlineData(3)]
+        public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnections(int count)
+        {
+            using (var store = GetDocumentStore())
+            {
+                var id = await store.Subscriptions.CreateAsync<User>();
+                var docs = new HashSet<string>();
+
+                for (int i = 0; i < 10; i++)
+                {
+                    using (var session = store.OpenSession())
+                    {
+                        session.Store(new User(), $"users/{i}");
+                        session.SaveChanges();
+                    }
+
+                    var workers = new List<SubscriptionWorker<User>>();
+                    for (int j = 0; j < count; j++)
+                    {
+                        workers.Add(store.Subscriptions.GetSubscriptionWorker<User>(new SubscriptionWorkerOptions(id)
+                        {
+                            TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1),
+                            Strategy = SubscriptionOpeningStrategy.Concurrent,
+                            MaxDocsPerBatch = 1
+                        }));
+                    }
+
+                    try
+                    {
+                        foreach (var worker in workers)
+                        {
+                            var t = worker.Run(x =>
+                            {
+                                foreach (var item in x.Items)
+                                {
+                                    docs.Add(item.Id);
+                                }
+                            });
+                        }
+
+                        await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds));
+                        await AssertNoLeftovers(store, id);
+                    }
+                    finally
+                    {
+                        foreach (var w in workers)
+                        {
+                            await w.DisposeAsync();
+                        }
+                    }
+                }
+            }
+        }
+
+        [Theory]
+        [InlineData(1)]
+        [InlineData(3)]
+        public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnectionsAfterUpdate(int count)
+        {
+            using (var store = GetDocumentStore())
+            {
+                var id = await store.Subscriptions.CreateAsync<User>(options: new SubscriptionCreationOptions<User>()
+                {
+                    Filter = user => user.Age == 0
+                });
+
+                var docs = new HashSet<string>();
+
+                for (int i = 0; i < 10; i++)
+                {
+                    if (i > 0)
+                    {
+                        await store.Subscriptions.UpdateAsync(options: new SubscriptionUpdateOptions()
+                        {
+                            Name = id,
+                            Query = @$"declare function predicate() {{ return this.Age==={i} }}
+from 'Users' as doc
+where predicate.call(doc)"
+                        });
+                    }
+
+                    using (var session = store.OpenSession())
+                    {
+                        session.Store(new User()
+                        {
+                            Age = i
+                        }, $"users/{i}");
+                        session.SaveChanges();
+                    }
+
+                    var workers = new List<SubscriptionWorker<User>>();
+                    for (int j = 0; j < count; j++)
+                    {
+                        workers.Add(store.Subscriptions.GetSubscriptionWorker<User>(new SubscriptionWorkerOptions(id)
+                        {
+                            TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1),
+                            Strategy = SubscriptionOpeningStrategy.Concurrent,
+                            MaxDocsPerBatch = 1
+                        }));
+                    }
+
+                    try
+                    {
+                        foreach (var worker in workers)
+                        {
+                            var t = worker.Run(x =>
+                            {
+                                foreach (var item in x.Items)
+                                {
+                                    docs.Add(item.Id);
+                                }
+                            });
+                        }
+
+                        await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds));
+                        await AssertNoLeftovers(store, id);
+                    }
+                    finally
+                    {
+                        foreach (var w in workers)
+                        {
+                            await w.DisposeAsync();
+                        }
+                    }
+                }
+
+                var subs = await store.Subscriptions.GetSubscriptionsAsync(0, 1024);
+                Assert.Equal(1, subs.Count);
+            }
+        }
+
+        [Fact]
+        public async Task ConcurrentSubscriptionsShouldContinueProcessOnNewConnectionsAfterUpdate_AndDisposeWhileConnecting()
+        {
+            using (var store = GetDocumentStore())
+            {
+                var id = await store.Subscriptions.CreateAsync<User>(options: new SubscriptionCreationOptions<User>()
+                {
+                    Filter = user => user.Age == 0
+                });
+
+                var docs = new HashSet<string>();
+                var workers = new List<SubscriptionWorker<User>>();
+                try
+                {
+                    for (int i = 0; i < 10; i++)
+                    {
+                        if (i > 0)
+                        {
+                            // we update the subscription on each iteration
+                            // so it will process only the new created document
+                            await store.Subscriptions.UpdateAsync(options: new SubscriptionUpdateOptions()
+                            {
+                                Name = id,
+                                Query = @$"declare function predicate() {{ return this.Age==={i} }}
+from 'Users' as doc
+where predicate.call(doc)"
+                            });
+                        }
+
+                        using (var session = store.OpenSession())
+                        {
+                            session.Store(new User()
+                            {
+                                Age = i
+                            }, $"users/{i}");
+                            session.SaveChanges();
+                        }
+
+                        var w = store.Subscriptions.GetSubscriptionWorker<User>(new SubscriptionWorkerOptions(id)
+                        {
+                            TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(1),
+                            Strategy = SubscriptionOpeningStrategy.Concurrent,
+                            MaxDocsPerBatch = 1
+                        });
+                        workers.Add(w);
+
+                        var t2 = w.Run(x =>
+                        {
+                            foreach (var item in x.Items)
+                            {
+                                docs.Add(item.Id);
+                                Thread.Sleep(1000);
+                            }
+                        });
+
+                        await AssertWaitForTrueAsync(() => Task.FromResult(docs.Count == i + 1), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds));
+                        await AssertNoLeftovers(store, id);
+                    }
+                }
+                finally
+                {
+                    foreach (var w in workers)
+                    {
+                        await w.DisposeAsync();
+                    }
+                }
+
+                var subs = await store.Subscriptions.GetSubscriptionsAsync(0, 1024);
+                Assert.Equal(1, subs.Count);
+            }
+        }
+
+        [Fact]
+        public async Task ConcurrentSubscriptionsShouldProcessWhen_2_ConnectionsAreSubscribingConcurrently()
+        {
+            using (var store = GetDocumentStore())
+            {
+                var id = await store.Subscriptions.CreateAsync<User>();
+
+                var db = await Databases.GetDocumentDatabaseInstanceFor(store);
+                var testingStuff = db.ForTestingPurposesOnly();
+
+                using (testingStuff.CallDuringWaitForSubscribe(connections =>
+                {
+                    while (connections.Count < 2)
+                    {
+                        Thread.Sleep(111);
+                    }
+                }))
+                {
+                    await using (var subscription = store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id)
+                    {
+                        TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(5),
+                        Strategy = SubscriptionOpeningStrategy.Concurrent,
+                        MaxDocsPerBatch = 2
+                    }))
+                    await using (var secondSubscription = store.Subscriptions.GetSubscriptionWorker(new SubscriptionWorkerOptions(id)
+                    {
+                        Strategy = SubscriptionOpeningStrategy.Concurrent,
+                        TimeToWaitBeforeConnectionRetry = TimeSpan.FromSeconds(5),
+                        MaxDocsPerBatch = 2
+                    }))
+                    {
+                        using (var session = store.OpenSession())
+                        {
+                            session.Store(new User(), "user/1");
+                            session.Store(new User(), "user/2");
+                            session.Store(new User(), "user/3");
+                            session.Store(new User(), "user/4");
+                            session.Store(new User(), "user/5");
+                            session.Store(new User(), "user/6");
+                            session.SaveChanges();
+                        }
+
+                        var con1Docs = new List<string>();
+                        var con2Docs = new List<string>();
+
+                        var t = subscription.Run(x =>
+                        {
+                            foreach (var item in x.Items)
+                            {
+                                con1Docs.Add(item.Id);
+                            }
+                        });
+
+                        var _ = secondSubscription.Run(x =>
+                        {
+                            foreach (var item in x.Items)
+                            {
+                                con2Docs.Add(item.Id);
+                            }
+                        });
+
+                        await AssertWaitForTrueAsync(() => Task.FromResult(con1Docs.Count + con2Docs.Count == 6), Convert.ToInt32(_reasonableWaitTime.TotalMilliseconds));
+                        await AssertNoLeftovers(store, id);
+                    }
+                }
+            }
+        }
+
         private class GetSubscriptionResendListCommand : RavenCommand<ResendListResults>
         {
             private readonly string _database;
@@ -1072,6 +1345,7 @@ private class ResendListResult
         private class User
         {
             public string Name;
+            public int Age;
         }
     }
 }
diff --git a/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs b/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs
index a54026d6313a..7e7e95f07c4d 100644
--- a/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs
+++ b/test/SlowTests/Client/Subscriptions/RavenDB_9117.cs
@@ -65,7 +65,7 @@ public async Task SubscriptionWorkerRetryEventGivesCorrectError()
                 Assert.True(await signalWhenStartedProcessingDoc.WaitAsync(_reasonableWaitTime));
                 var database = await GetDatabase(store.Database);
 
-                SubscriptionStorage.SubscriptionGeneralDataAndStats subscriptionState;
+                SubscriptionState subscriptionState;
                 using (database.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
                 using (context.OpenReadTransaction())
                 {
diff --git a/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs b/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs
index 8cd126d9e6f8..29675d915e36 100644
--- a/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs
+++ b/test/SlowTests/Client/Subscriptions/SubscriptionsBasic.cs
@@ -341,7 +341,7 @@ public async Task RunningSubscriptionShouldJumpToNextChangeVectorIfItWasChangedB
 
                     Assert.True(await ackFirstCV.WaitAsync(_reasonableWaitTime));
 
-                    SubscriptionStorage.SubscriptionGeneralDataAndStats subscriptionState;
+                    SubscriptionState subscriptionState;
                     using (database.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
                     using (context.OpenReadTransaction())
                     {