Skip to content

Commit

Permalink
RavenDB-20832 - add sharding version for test
Browse files Browse the repository at this point in the history
  • Loading branch information
garayx committed Dec 12, 2024
1 parent d67670c commit 0d1acf2
Showing 1 changed file with 158 additions and 33 deletions.
191 changes: 158 additions & 33 deletions test/SlowTests/Issues/RavenDB-17650.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
using Raven.Server.Config;
using Raven.Server.ServerWide.Context;
using Raven.Server.ServerWide.Maintenance;
using Raven.Server.Utils;
using Sparrow.Extensions;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Server;
using Tests.Infrastructure;
using Xunit;
Expand Down Expand Up @@ -152,30 +154,57 @@ await store.Subscriptions
var tt = Task.Run(() => revivedNodes.Add(ReviveNode(result1.DataDirectory, result1.Url)), cts.Token);
await Task.WhenAll(t, tt);

await WaitForRehabAndAssert(revivedNodes, store, subscriptionLog);
await WaitForRehabAndAssert(revivedNodes, store, options, subscriptionLog);

if (await successMre.WaitAsync(TimeSpan.FromSeconds(15)) == false)
{
subscriptionLog.Add((DateTime.UtcNow, $"Could not reconnect subscription on {result0.Url} & {result1.Url}"));

foreach (var node in revivedNodes)
if (options.DatabaseMode == RavenDatabaseMode.Sharded)
{
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
Dictionary<string, List<string>> relevantDatabases = GetRelevantShardedDatabasesForNode(revivedNodes, store);
foreach (var node in revivedNodes)
{
try
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
{
var json = node.ServerStore.Cluster.ReadDatabaseTopology(context, store.Database).ToJson();

using var bjro = context.ReadObject(json, "ReadDatabaseTopology", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow, $"ReadDatabaseTopology for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
try
{
foreach (var shard in relevantDatabases[node.ServerStore.NodeTag])
{

var json = node.ServerStore.Cluster.ReadDatabaseTopologyForShard(context, store.Database, ShardHelper.GetShardNumberFromDatabaseName(shard)).ToJson();
LogDatabaseTopologyOnFailure(context, json, subscriptionLog, store.Database, node);
}
}
catch (Exception e)
{
LogDatabaseTopologyErrorOnFailure(subscriptionLog, store.Database, node, e);
}
}
catch (Exception e)

}
}
else
{
foreach (var node in revivedNodes)
{
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
{
subscriptionLog.Add((DateTime.UtcNow, $"Could not ReadDatabaseTopology for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{e}"));
try
{
var json = node.ServerStore.Cluster.ReadDatabaseTopology(context, store.Database).ToJson();

LogDatabaseTopologyOnFailure(context, json, subscriptionLog, store.Database, node);
}
catch (Exception e)
{
LogDatabaseTopologyErrorOnFailure(subscriptionLog, store.Database, node, e);
}
}
subscriptionLog.Add((DateTime.UtcNow, $"GetClusterTopology for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{node.ServerStore.GetClusterTopology()}"));
}
subscriptionLog.Add((DateTime.UtcNow, $"GetClusterTopology for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{node.ServerStore.GetClusterTopology()}"));
}

List<ClusterObserverLogEntry> logs = new List<ClusterObserverLogEntry>();
Expand Down Expand Up @@ -206,36 +235,132 @@ await ActionWithLeader((l) =>
}
}

private static async Task WaitForRehabAndAssert(List<RavenServer> revivedNodes, DocumentStore store, List<(DateTime, string)> subscriptionLog)
private static void LogDatabaseTopologyErrorOnFailure(List<(DateTime, string)> subscriptionLog, string database, RavenServer node, Exception e)
{
foreach (var node in revivedNodes)
subscriptionLog.Add((DateTime.UtcNow, $"Could not ReadDatabaseTopology for ['{database}' @ '{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{e}"));
}

private static void LogDatabaseTopologyOnFailure(TransactionOperationContext context, DynamicJsonValue json, List<(DateTime, string)> subscriptionLog, string database, RavenServer node)
{
using var bjro = context.ReadObject(json, "ReadDatabaseTopology", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow, $"ReadDatabaseTopology for ['{database}' @ '{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
}

private static async Task WaitForRehabAndAssert(List<RavenServer> revivedNodes, DocumentStore store, Options options, List<(DateTime, string)> subscriptionLog)
{
if (options.DatabaseMode == RavenDatabaseMode.Sharded)
{
Dictionary<string, List<string>> relevantDatabases = GetRelevantShardedDatabasesForNode(revivedNodes, store);

foreach (var node in revivedNodes)
{
var rehabs = await WaitForValueAsync(() => GetRehabsCountForShards(store, subscriptionLog, node, relevantDatabases), expectedVal: 0, interval: 322 * 2);

Assert.Equal(0, rehabs);
}
}
else
{
var rehabs = await WaitForValueAsync(() =>
foreach (var node in revivedNodes)
{
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
var rehabs = await WaitForValueAsync(() =>
{
try
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
{
var dbTopology = node.ServerStore.Cluster.ReadDatabaseTopology(context, store.Database);
var json = dbTopology.ToJson();

using var bjro = context.ReadObject(json, "ReadDatabaseTopology", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow,
$"ReadDatabaseTopology in WaitForValueAsync for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
return dbTopology.Rehabs.Count;
try
{
var dbTopology = node.ServerStore.Cluster.ReadDatabaseTopology(context, store.Database);
LogTopologyToSubscriptionLog(subscriptionLog, context, store.Database, dbTopology.ToJson(), node);
return dbTopology.Rehabs.Count;

}
catch (Exception e)
{
AddErrorToSubscriptionLog(subscriptionLog, node, e);
return int.MaxValue;
}
}
catch (Exception e)
}, expectedVal: 0, interval: 322 * 2);

Assert.Equal(0, rehabs);
}
}
}

private static void LogTopologyToSubscriptionLog(List<(DateTime, string)> subscriptionLog, TransactionOperationContext context, string database, DynamicJsonValue json, RavenServer node)
{
using var bjro = context.ReadObject(json, $"ReadDatabaseTopology_{database}", BlittableJsonDocumentBuilder.UsageMode.ToDisk);
subscriptionLog.Add((DateTime.UtcNow,
$"ReadDatabaseTopology in WaitForValueAsync for ['{database}' @ '{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{bjro}"));
}

private static int GetRehabsCountForShards(DocumentStore store, List<(DateTime, string)> subscriptionLog, RavenServer node, Dictionary<string, List<string>> relevantDatabases)
{
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
{
try
{
if (relevantDatabases.TryGetValue(node.ServerStore.NodeTag, out List<string> shards) == false)
return int.MaxValue;

var rehabsForShards = 0;
foreach (var shard in shards)
{
var dbTopology = node.ServerStore.Cluster.ReadDatabaseTopologyForShard(context, store.Database, ShardHelper.GetShardNumberFromDatabaseName(shard));
LogTopologyToSubscriptionLog(subscriptionLog, context, shard, dbTopology.ToJson(), node);
rehabsForShards += dbTopology.Rehabs.Count;
}

return rehabsForShards;
}
catch (Exception e)
{
AddErrorToSubscriptionLog(subscriptionLog, node, e);
return int.MaxValue;
}
}
}

private static void AddErrorToSubscriptionLog(List<(DateTime, string)> subscriptionLog, RavenServer node, Exception e)
{
subscriptionLog.Add((DateTime.UtcNow,
$"Could not ReadDatabaseTopology in WaitForValueAsync for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{e}"));
}

private static Dictionary<string, List<string>> GetRelevantShardedDatabasesForNode(List<RavenServer> revivedNodes, DocumentStore store)
{
var relevantDatabases = new Dictionary<string, List<string>>();
foreach (var node in revivedNodes)
{
using (node.ServerStore.ContextPool.AllocateOperationContext(out TransactionOperationContext context))
using (context.OpenReadTransaction())
using (var databaseRecord = node.ServerStore.Cluster.ReadRawDatabaseRecord(context, store.Database))
{
if (databaseRecord == null)
continue;

foreach (var shard in databaseRecord.Topologies)
{
if (shard.Topology.RelevantFor(node.ServerStore.NodeTag))
{
subscriptionLog.Add((DateTime.UtcNow,
$"Could not ReadDatabaseTopology in WaitForValueAsync for ['{node.ServerStore.NodeTag}', {node.WebUrl}]{Environment.NewLine}{e}"));
return int.MaxValue;
if (relevantDatabases.ContainsKey(node.ServerStore.NodeTag))
{
var shards = relevantDatabases[node.ServerStore.NodeTag];
shards.Add(shard.Name);
relevantDatabases[node.ServerStore.NodeTag] = shards;
}
else
{
relevantDatabases.Add(node.ServerStore.NodeTag, new List<string>() { shard.Name });
}
}
}
}, expectedVal: 0, interval: 322 * 2);

Assert.Equal(0, rehabs);
}
}

return relevantDatabases;
}

private RavenServer ReviveNode(string nodeDataDirectory, string nodeUrl)
Expand Down

0 comments on commit 0d1acf2

Please sign in to comment.