Skip to content

Commit

Permalink
RavenDB-23348 - Increase wait for rehab timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
garayx committed Dec 18, 2024
1 parent 53a03f2 commit dcb5fb2
Showing 1 changed file with 80 additions and 18 deletions.
98 changes: 80 additions & 18 deletions test/SlowTests/Issues/RavenDB-17650.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
Expand All @@ -10,13 +11,16 @@
using Raven.Client.Exceptions;
using Raven.Client.Exceptions.Database;
using Raven.Client.Exceptions.Documents.Subscriptions;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Operations;
using Raven.Server;
using Raven.Server.Config;
using Raven.Server.ServerWide;
using Raven.Server.ServerWide.Context;
using Raven.Server.ServerWide.Maintenance;
using Sparrow.Extensions;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Server;
using Tests.Infrastructure;
using Xunit;
Expand Down Expand Up @@ -96,12 +100,18 @@ public async Task Should_Retry_When_AllTopologyNodesDownException_Was_Thrown()
Server = leader
});
string id = "User/33-A";
string id2 = "User/333-A";
using (var session = store.OpenAsyncSession())
{
await session.StoreAsync(new User { Id = id, Name = "1" });
await session.StoreAsync(new User { Name = "2" });
await session.StoreAsync(new User { Id = id2, Name = "2" });
await session.SaveChangesAsync();
}
// wait for replication
Assert.True(await WaitForDocumentInClusterAsync<Core.Utils.Entities.User>(new DatabaseTopology { Members = new List<string> { nodes.First().ServerStore.NodeTag, nodes.Last().ServerStore.NodeTag } }, store.Database, id, null,
timeout: TimeSpan.FromSeconds(60)), id);
Assert.True(await WaitForDocumentInClusterAsync<Core.Utils.Entities.User>(new DatabaseTopology { Members = new List<string> { nodes.First().ServerStore.NodeTag, nodes.Last().ServerStore.NodeTag } }, store.Database, id2, null,
timeout: TimeSpan.FromSeconds(60)), id2);

await store.Subscriptions
.CreateAsync(new SubscriptionCreationOptions<User>
Expand Down Expand Up @@ -145,15 +155,46 @@ await store.Subscriptions
//revive node
Assert.True(await failMre.WaitAsync(TimeSpan.FromSeconds(15)), "Subscription didn't fail as expected.");
var revivedNodes = new List<RavenServer>();
var t = Task.Run(() => revivedNodes.Add(ReviveNode(result0.DataDirectory, result0.Url)), cts.Token);
var tt = Task.Run(() => revivedNodes.Add(ReviveNode(result1.DataDirectory, result1.Url)), cts.Token);
ConcurrentDictionary<string, List<string>> initLogs = new ConcurrentDictionary<string, List<string>>();

var loadMre1 = new AsyncManualResetEvent();
var loadMre2 = new AsyncManualResetEvent();

var t = Task.Run(() => revivedNodes.Add(ReviveNode(result0.DataDirectory, result0.Url, serverStore =>
{
serverStore.DatabasesLandlord.OnDatabaseLoaded += (name) =>
{
if (serverStore.DatabasesLandlord.InitLog.TryGetValue(name, out ConcurrentQueue<string> q))
{
initLogs.TryAdd($"{name} @ {serverStore.NodeTag}", q.ToList());
}

loadMre1.Set();
};
})), cts.Token);
var tt = Task.Run(() => revivedNodes.Add(ReviveNode(result1.DataDirectory, result1.Url, serverStore =>
{
serverStore.DatabasesLandlord.OnDatabaseLoaded += (name) =>
{
if (serverStore.DatabasesLandlord.InitLog.TryGetValue(name, out ConcurrentQueue<string> q))
{
initLogs.TryAdd($"{name} @ {serverStore.NodeTag}", q.ToList());
}
loadMre2.Set();
};
})), cts.Token);
await Task.WhenAll(t, tt);

await WaitForRehabAndAssert(revivedNodes, store, subscriptionLog);

if (await successMre.WaitAsync(TimeSpan.FromSeconds(15)) == false)
//Wait for DBs to load
Assert.True(await loadMre1.WaitAsync(cts.Token));
Assert.True(await loadMre2.WaitAsync(cts.Token));

var rehabsCount = await WaitForRehabCount(revivedNodes, store, subscriptionLog);
var mreWait = await successMre.WaitAsync(TimeSpan.FromSeconds(60));

if (rehabsCount != 0 || mreWait == false)
{
subscriptionLog.Add((DateTime.UtcNow, $"Could not reconnect subscription on {result0.Url} & {result1.Url}"));
subscriptionLog.Add((DateTime.UtcNow, $"Could not reconnect subscription on {result0.Url} & {result1.Url}, {nameof(rehabsCount)}: {rehabsCount}, {nameof(mreWait)}: {mreWait}"));

foreach (var node in revivedNodes)
{
Expand Down Expand Up @@ -199,12 +240,27 @@ await ActionWithLeader((l) =>
}

subscriptionLog.Add((DateTime.UtcNow, sb.ToString()));
Assert.Fail(string.Join(Environment.NewLine, subscriptionLog.Select(x => $"#### {x.Item1.GetDefaultRavenFormat()}: {x.Item2}")));

var str = string.Join(Environment.NewLine, subscriptionLog.Select(x => $"#### {x.Item1.GetDefaultRavenFormat()}: {x.Item2}"));
str = str + Environment.NewLine + "#### InitLogs:" + Environment.NewLine;
foreach (var kvp in initLogs)
{
str = str + Environment.NewLine + $"$$$$ Database: {kvp.Key}";
foreach (var log in kvp.Value)
{
str = str + Environment.NewLine + log;
}
str += Environment.NewLine;
}

Assert.Fail(str);
}
}

private static async Task WaitForRehabAndAssert(List<RavenServer> revivedNodes, DocumentStore store, List<(DateTime, string)> subscriptionLog)
private static async Task<int> WaitForRehabCount(List<RavenServer> revivedNodes, DocumentStore store, List<(DateTime, string)> subscriptionLog)
{
var rehabsCount = 0;

foreach (var node in revivedNodes)
{
var rehabs = await WaitForValueAsync(() =>
Expand All @@ -215,11 +271,7 @@ private static async Task WaitForRehabAndAssert(List<RavenServer> revivedNodes,
try
{
var dbTopology = node.ServerStore.Cluster.ReadDatabaseTopology(context, store.Database);
var json = dbTopology.ToJson();

using var bjro = context.ReadObject(json, "ReadDatabaseTopology", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow,
$"ReadDatabaseTopology in WaitForValueAsync for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
LogTopologyToSubscriptionLog(subscriptionLog, context, store.Database, dbTopology.ToJson(), node);
return dbTopology.Rehabs.Count;
}
catch (Exception e)
Expand All @@ -229,13 +281,22 @@ private static async Task WaitForRehabAndAssert(List<RavenServer> revivedNodes,
return int.MaxValue;
}
}
}, expectedVal: 0, interval: 322 * 2);
}, expectedVal: 0, timeout: 60_000, interval: 322 * 2);

Assert.Equal(0, rehabs);
rehabsCount += rehabs;
}

return rehabsCount;
}

private static void LogTopologyToSubscriptionLog(List<(DateTime, string)> subscriptionLog, TransactionOperationContext context, string database, DynamicJsonValue json, RavenServer node)
{
using var bjro = context.ReadObject(json, $"ReadDatabaseTopology_{database}", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow,
$"ReadDatabaseTopology in WaitForValueAsync for ['{database}' @ '{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
}

private RavenServer ReviveNode(string nodeDataDirectory, string nodeUrl)
private RavenServer ReviveNode(string nodeDataDirectory, string nodeUrl, Action<ServerStore> beforeDatabasesStartup = null)
{
var cs = new Dictionary<string, string>(DefaultClusterSettings);
cs[RavenConfiguration.GetKey(x => x.Core.ServerUrls)] = nodeUrl;
Expand All @@ -244,7 +305,8 @@ private RavenServer ReviveNode(string nodeDataDirectory, string nodeUrl)
DeletePrevious = false,
RunInMemory = false,
DataDirectory = nodeDataDirectory,
CustomSettings = cs
CustomSettings = cs,
BeforeDatabasesStartup = beforeDatabasesStartup
});
}

Expand Down

0 comments on commit dcb5fb2

Please sign in to comment.