From 206eeee000d649c4a043725aa61bea9197efc7c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Milewski?= Date: Thu, 11 Jul 2024 02:30:57 +0200 Subject: [PATCH] Activity: fix for token streams clearing delay --- Core/Stateflows/Activities/Engine/Executor.cs | 91 +++++++++---------- .../Tests/Decision.cs | 61 +++++++++++++ 2 files changed, 104 insertions(+), 48 deletions(-) diff --git a/Core/Stateflows/Activities/Engine/Executor.cs b/Core/Stateflows/Activities/Engine/Executor.cs index ecdfb71..99039f2 100644 --- a/Core/Stateflows/Activities/Engine/Executor.cs +++ b/Core/Stateflows/Activities/Engine/Executor.cs @@ -544,38 +544,54 @@ public async Task DoHandleNodeAsync(Node node, NodeScope nodeScope, IEnumerable< return; } - IEnumerable streams = Array.Empty(); + var activated = false; + + IEnumerable inputTokens = Array.Empty(); + + lock (node) + { + var streams = !Context.NodesToExecute.Contains(node) + ? Context.GetActivatedStreams(node, nodeScope.ThreadId) + : Array.Empty(); + + activated = + ( // initial node case + node.Type == NodeType.Initial + ) || + ( // input node case - has to have input + node.Type == NodeType.Input && + (input?.Any() ?? false) + ) || + ( // no implicit join node - has any incoming streams + streams.Any() && + !node.Options.HasFlag(NodeOptions.ImplicitJoin) + ) || + ( // implicit join node - has incoming streams on all edges + node.IncomingEdges.Count == streams.Count() && + node.Options.HasFlag(NodeOptions.ImplicitJoin) + ) || + ( + Context.NodesToExecute.Contains(node) + ); - if (!Context.NodesToExecute.Contains(node)) - { - lock (node) + inputTokens = input ?? streams.SelectMany(stream => stream.Tokens).Distinct().ToArray(); + + if (activated) { - streams = Context.GetActivatedStreams(node, nodeScope.ThreadId); + foreach (var stream in streams) + { + if (!stream.IsPersistent) + { + Context.ClearStream(stream.EdgeIdentifier, nodeScope.ThreadId); + } + } + } + else + { + ReportNodeAttemptedExecution(node, streams); } } - var activated = - ( // initial node case - node.Type == NodeType.Initial - ) || - ( // input node case - has to have input - node.Type == NodeType.Input && - (input?.Any() ?? false) - ) || - ( // no implicit join node - has any incoming streams - streams.Any() && - !node.Options.HasFlag(NodeOptions.ImplicitJoin) - ) || - ( // implicit join node - has incoming streams on all edges - node.IncomingEdges.Count == streams.Count() && - node.Options.HasFlag(NodeOptions.ImplicitJoin) - ) || - ( - Context.NodesToExecute.Contains(node) - ); - - var inputTokens = input ?? streams.SelectMany(stream => stream.Tokens).Distinct().ToArray(); - nodeScope = nodeScope.CreateChildScope(node); var actionContext = new ActionContext(Context, nodeScope, node, inputTokens, selectionTokens); @@ -602,14 +618,8 @@ public async Task DoHandleNodeAsync(Node node, NodeScope nodeScope, IEnumerable< return; } - //var inputTokens = input ?? streams.SelectMany(stream => stream.Tokens).Distinct().ToArray(); - ReportNodeExecuting(node, inputTokens); - //nodeScope = nodeScope.CreateChildScope(node); - - //var actionContext = new ActionContext(Context, nodeScope, node, inputTokens, selectionTokens); - await node.Action.WhenAll(actionContext); if (node.Type == NodeType.AcceptEventAction) @@ -677,21 +687,6 @@ public async Task DoHandleNodeAsync(Node node, NodeScope nodeScope, IEnumerable< await Task.WhenAll(nodes.Select(n => DoHandleNodeAsync(n, nodeScope)).ToArray()); } } - - lock (node) - { - foreach (var stream in streams) - { - if (!stream.IsPersistent) - { - Context.ClearStream(stream.EdgeIdentifier, nodeScope.ThreadId); - } - } - } - } - else - { - ReportNodeAttemptedExecution(node, streams); } await Inspector.AfterNodeActivateAsync(null); diff --git a/Tests/Activity.IntegrationTests/Tests/Decision.cs b/Tests/Activity.IntegrationTests/Tests/Decision.cs index 18b4d61..5bac90e 100644 --- a/Tests/Activity.IntegrationTests/Tests/Decision.cs +++ b/Tests/Activity.IntegrationTests/Tests/Decision.cs @@ -49,6 +49,53 @@ protected override void InitializeStateflows(IStateflowsBuilder builder) TokenCount2 += c.GetTokensOfType().Count(); }) ) + .AddActivity("multipleTokensDecision", b => b + .AddInitial(b => b + .AddControlFlow("generate") + ) + .AddAction( + "generate", + async c => + { + c.OutputRange(Enumerable.Range(0, 10)); + c.Output("test"); + }, + b => b + .AddFlow("main") + .AddFlow("main") + ) + .AddStructuredActivity("main", b => b + .AddInput(b => b + .AddFlow>() + .AddFlow("final1") + .AddFlow("final2") + ) + .AddDecision(b => b + .AddFlow("final1", b => b.AddGuard(async c => c.Token % 2 == 0)) + .AddElseFlow("final2") + ) + .AddAction( + "final1", + async c => + { + ExecutionCount1++; + TokenCount1 += c.GetTokensOfType().Count(); + await Task.Delay(1); + }, + b => b.AddFlow() + ) + .AddAction( + "final2", + async c => + { + ExecutionCount2++; + TokenCount2 += c.GetTokensOfType().Count(); + }, + b => b.AddFlow() + ) + .AddOutput() + ) + ) .AddActivity("controlDecision", b => b .AddInitial(b => b .AddControlFlow("setup") @@ -85,6 +132,20 @@ public async Task TokenDecision() Assert.AreEqual(5, TokenCount2); } + [TestMethod] + public async Task MultipleTokenDecision() + { + if (ActivityLocator.TryLocateActivity(new ActivityId("multipleTokensDecision", "x"), out var a)) + { + await a.InitializeAsync(); + } + + Assert.AreEqual(1, ExecutionCount1); + Assert.AreEqual(5, TokenCount1); + Assert.AreEqual(1, ExecutionCount2); + Assert.AreEqual(5, TokenCount2); + } + [TestMethod] public async Task ControlDecision() {