Skip to content

Commit

Permalink
close #2266 clustered routers now load correct default number of rout…
Browse files Browse the repository at this point in the history
…ees (#2310)
  • Loading branch information
Aaronontheweb authored and Danthar committed Sep 21, 2016
1 parent 078b7d6 commit 458daa9
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ Target "MultiNodeTests" <| fun _ ->
|> append assembly
|> append "-Dmultinode.enable-filesink=on"
|> append (sprintf "-Dmultinode.output-directory=\"%s\"" testOutput)
|> appendIfNotNullOrEmpty spec "-Dmultinode.test-spec="
|> appendIfNotNullOrEmpty spec "-Dmultinode.spec="
|> toText

let result = ExecProcess(fun info ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ namespace Akka.Actor
}
public class Deployer
{
protected readonly Akka.Configuration.Config Default;
public Deployer(Akka.Actor.Settings settings) { }
public Akka.Actor.Deploy Lookup(Akka.Actor.ActorPath path) { }
public Akka.Actor.Deploy Lookup(System.Collections.Generic.IEnumerable<string> path) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
<Compile Include="RestartNode2Spec.cs" />
<Compile Include="RestartNode3Spec.cs" />
<Compile Include="RestartNodeSpec.cs" />
<Compile Include="Routing\ClusterBroadcastRouter2266BugfixSpec.cs" />
<Compile Include="Routing\ClusterConsistentHashingGroupSpec.cs" />
<Compile Include="Routing\ClusterConsistentHashingRouterSpec.cs" />
<Compile Include="Routing\ClusterRoundRobinSpec.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterBroadcastRouter2266BugfixSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Routing;
using FluentAssertions;

namespace Akka.Cluster.Tests.MultiNode.Routing
{
public class ClusterBroadcastGroupSpecConfig : MultiNodeConfig
{
internal interface IRouteeType { }
internal class PoolRoutee : IRouteeType { }
internal class GroupRoutee : IRouteeType { }

internal class Reply
{
public Reply(IRouteeType routeeType, IActorRef actorRef)
{
RouteeType = routeeType;
ActorRef = actorRef;
}

public IRouteeType RouteeType { get; }

public IActorRef ActorRef { get; }
}

internal class SomeActor : ReceiveActor
{
private readonly IRouteeType _routeeType;

public SomeActor() : this(new PoolRoutee())
{
}

public SomeActor(IRouteeType routeeType)
{
_routeeType = routeeType;
Receive<string>(s => s == "hit", s =>
{
Sender.Tell(new Reply(_routeeType, Self));
});
}
}

public RoleName First { get; }
public RoleName Second { get; }

public ClusterBroadcastGroupSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = MultiNodeLoggingConfig.LoggingConfig
.WithFallback(DebugConfig(true))
.WithFallback(ConfigurationFactory.ParseString(@"
akka.actor.deployment {
/router1 {
router = broadcast-group
routees.paths = [""/user/myserviceA""]
cluster {
enabled = on
allow-local-routees = on
}
}
}"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

NodeConfig(new List<RoleName> { First }, new List<Config> { ConfigurationFactory.ParseString(@"akka.cluster.roles = [""a"", ""b""]") });
NodeConfig(new List<RoleName> { Second }, new List<Config> { ConfigurationFactory.ParseString(@"akka.cluster.roles = [""a""]") });

TestTransport = true;
}
}

public class ClusterBroadcastMultiNode1 : ClusterBroadcastGroupSpec { }
public class ClusterBroadcastMultiNode2 : ClusterBroadcastGroupSpec { }

/// <summary>
/// Used to verify that https://github.com/akkadotnet/akka.net/issues/2266 is reproducible and can be fixed
/// </summary>
public abstract class ClusterBroadcastGroupSpec : MultiNodeClusterSpec
{
private readonly ClusterBroadcastGroupSpecConfig _config;
private readonly Lazy<IActorRef> _router;

protected ClusterBroadcastGroupSpec() : this(new ClusterBroadcastGroupSpecConfig())
{
}

protected ClusterBroadcastGroupSpec(ClusterBroadcastGroupSpecConfig config) : base(config)
{
_config = config;

_router = new Lazy<IActorRef>(() => Sys.ActorOf(FromConfig.Instance.Props(), "router1"));
}

private IEnumerable<Routee> CurrentRoutees(IActorRef router)
{
var routerAsk = router.Ask<Routees>(new GetRoutees(), GetTimeoutOrDefault(null));
return routerAsk.Result.Members;
}

private Address FullAddress(IActorRef actorRef)
{
if (string.IsNullOrEmpty(actorRef.Path.Address.Host) || !actorRef.Path.Address.Port.HasValue)
return Cluster.SelfAddress;
return actorRef.Path.Address;
}

private Dictionary<Address, int> ReceiveReplays(ClusterBroadcastGroupSpecConfig.IRouteeType routeeType, int expectedReplies)
{
var zero = Roles.Select(GetAddress).ToDictionary(c => c, c => 0);
var replays = ReceiveWhile(5.Seconds(), msg =>
{
var routee = msg as ClusterBroadcastGroupSpecConfig.Reply;
if (routee != null && routee.RouteeType.GetType() == routeeType.GetType())
return FullAddress(routee.ActorRef);
return null;
}, expectedReplies).Aggregate(zero, (replyMap, address) =>
{
replyMap[address]++;
return replyMap;
});

return replays;
}

[MultiNodeFact]
public void ClusterBroadcastGroup()
{
A_cluster_router_with_a_BroadcastGroup_router_must_start_cluster_with_2_nodes();
A_cluster_router_with_a_BroadcastGroup_router_must_lookup_routees_on_the_member_nodes_in_the_cluster();
}

private void A_cluster_router_with_a_BroadcastGroup_router_must_start_cluster_with_2_nodes()
{
Log.Info("Waiting for cluster up");

AwaitClusterUp(_config.First, _config.Second);

RunOn(() =>
{
Log.Info("first, roles: " + Cluster.SelfRoles);
}, _config.First);

RunOn(() =>
{
Log.Info("second, roles: " + Cluster.SelfRoles);
}, _config.Second);

Log.Info("Cluster Up");

EnterBarrier("after-1");
}

private void A_cluster_router_with_a_BroadcastGroup_router_must_lookup_routees_on_the_member_nodes_in_the_cluster()
{
// cluster consists of first and second
Sys.ActorOf(Props.Create(() => new ClusterBroadcastGroupSpecConfig.SomeActor(new ClusterBroadcastGroupSpecConfig.GroupRoutee())), "myserviceA");
EnterBarrier("myservice-started");

RunOn(() =>
{
// 2 nodes, 1 routees on each node
Within(10.Seconds(), () =>
{
AwaitAssert(() => CurrentRoutees(_router.Value).Count().Should().Be(2)); //only seems to pass with a single routee should be 2
});
var routeeCount = CurrentRoutees(_router.Value).Count();
var iterationCount = 10;
for (int i = 0; i < iterationCount; i++)
{
_router.Value.Tell("hit");
}
var replays = ReceiveReplays(new ClusterBroadcastGroupSpecConfig.GroupRoutee(), iterationCount*routeeCount);
replays[GetAddress(_config.First)].Should().Be(10);
replays[GetAddress(_config.Second)].Should().Be(10);
replays.Values.Sum().Should().Be(iterationCount*routeeCount);
}, _config.First);

EnterBarrier("after-2");
}
}
}

29 changes: 29 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterDeployerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public class ClusterDeployerSpec : AkkaSpec
cluster.allow-local-routees = off
cluster.use-role = backend
}
/user/service3 {
dispatcher = mydispatcher
mailbox = mymailbox
router = broadcast-group
routees.paths = [""/user/myservice""]
cluster.enabled = on
cluster.allow-local-routees = off
cluster.use-role = backend
}
}
akka.remote.helios.tcp.port = 0");

Expand Down Expand Up @@ -85,6 +94,26 @@ public void RemoteDeployer_must_be_able_to_parse_akka_actor_deployment_with_spec
deployment.Dispatcher.ShouldBe("mydispatcher");
}

[Fact]
public void BugFix2266RemoteDeployer_must_be_able_to_parse_broadcast_group_cluster_router_with_default_nr_of_routees_routees()
{
var service = "/user/service3";
var deployment = Sys.AsInstanceOf<ActorSystemImpl>().Provider.Deployer.Lookup(service.Split('/').Drop(1));
deployment.Should().NotBeNull();

deployment.Path.ShouldBe(service);
deployment.RouterConfig.GetType().ShouldBe(typeof(ClusterRouterGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.GetType().ShouldBe(typeof(BroadcastGroup));
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Local.AsInstanceOf<BroadcastGroup>().Paths.ShouldBe(new[] { "/user/myservice" });
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.TotalInstances.ShouldBe(10000);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AllowLocalRoutees.ShouldBe(false);
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.UseRole.ShouldBe("backend");
deployment.RouterConfig.AsInstanceOf<ClusterRouterGroup>().Settings.AsInstanceOf<ClusterRouterGroupSettings>().RouteesPaths.ShouldBe(new[] { "/user/myservice" });
deployment.Scope.ShouldBe(ClusterScope.Instance);
deployment.Mailbox.ShouldBe("mymailbox");
deployment.Dispatcher.ShouldBe("mydispatcher");
}

//todo: implement "have correct router mappings" test for adaptive load-balancing routers (not yet implemented)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public override Deploy ParseConfig(string key, Config config)
&& !config.HasPath("nr-of-instances"))
{
var maxTotalNrOfInstances = config
.WithFallback(ClusterConfigFactory.Default())
.WithFallback(Default)
.GetInt("cluster.max-total-nr-of-instances");
config2 = ConfigurationFactory.ParseString("nr-of-instances=" + maxTotalNrOfInstances)
.WithFallback(config);
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Cluster/Routing/ClusterRoutingConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ protected ClusterRouterActor(ClusterRouterSettingsBase settings)
}

Cluster = Cluster.Get(Context.System);
Nodes = ImmutableSortedSet.Create(Member.AddressOrdering, Cluster.ReadView.Members.Where(IsAvailable).Select(x => x.Address).ToArray());
Nodes = ImmutableSortedSet.Create(Member.AddressOrdering,
Cluster.ReadView.Members.Where(IsAvailable).Select(x => x.Address).ToArray());
}

public ClusterRouterSettingsBase Settings { get; protected set; }
Expand Down
9 changes: 6 additions & 3 deletions src/core/Akka/Actor/Deployer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@

namespace Akka.Actor
{
/// <summary>
/// Used to configure and deploy actors.
/// </summary>
public class Deployer
{
private readonly Config _default;
protected readonly Config Default;
private readonly Settings _settings;
private readonly AtomicReference<WildcardTree<Deploy>> _deployments = new AtomicReference<WildcardTree<Deploy>>(new WildcardTree<Deploy>());

public Deployer(Settings settings)
{
_settings = settings;
var config = settings.Config.GetConfig("akka.actor.deployment");
_default = config.GetConfig("default");
Default = config.GetConfig("default");

var rootObj = config.Root.GetObject();
if (rootObj == null) return;
Expand Down Expand Up @@ -90,7 +93,7 @@ public void SetDeploy(Deploy deploy)

public virtual Deploy ParseConfig(string key, Config config)
{
var deployment = config.WithFallback(_default);
var deployment = config.WithFallback(Default);
var routerType = deployment.GetString("router");
var router = CreateRouterConfig(routerType, deployment);
var dispatcher = deployment.GetString("dispatcher");
Expand Down
3 changes: 2 additions & 1 deletion src/examples/Chat/ChatServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ static void Main(string[] args)
applied-adapters = []
transport-protocol = tcp
port = 8081
hostname = localhost
hostname = 0.0.0.0
public-hostname = localhost
}
}
}
Expand Down

0 comments on commit 458daa9

Please sign in to comment.