Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SM: execution steps; A: interrupted output #34

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Core/Stateflows.Common/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Generic;
using Stateflows.Activities;

namespace Stateflows.Common
Expand Down
4 changes: 2 additions & 2 deletions Core/Stateflows/Activities/Context/Classes/RootContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public Dictionary<string, Stream> GetStreams(Guid threadId)
}
}

internal IEnumerable<Stream> GetActivatedStreams(Node node, Guid threadId)
internal IEnumerable<Stream> GetNodeStreams(Node node, Guid threadId, bool activatedOnly)
{
lock (Streams)
{
return node.IncomingEdges
.Select(edge => GetStream(edge.Identifier, threadId))
.Where(stream => stream.IsActivated)
.Where(stream => stream.IsActivated || !activatedOnly)
.ToArray();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Linq;
using System.Collections.Generic;
using Stateflows.Activities.Models;
using Stateflows.Activities.Context.Interfaces;
using Stateflows.Utils;
using Stateflows.Activities.Engine;
using Stateflows.Activities.Models;
using Stateflows.Activities.Context.Interfaces;

namespace Stateflows.Activities.Context.Classes
{
Expand All @@ -20,7 +20,7 @@ public SourceNodeContext(Node node, RootContext context, NodeScope nodeScope)

public IEnumerable<TToken> GetTokensOfType<TToken>()
=> Context
.GetActivatedStreams(Node, ThreadId)
.GetNodeStreams(Node, ThreadId, true)
.SelectMany(stream => stream.Tokens)
.OfType<TokenHolder<TToken>>()
.ToTokens()
Expand Down
25 changes: 18 additions & 7 deletions Core/Stateflows/Activities/Engine/Executor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Stateflows.Activities.Streams;
using Stateflows.Activities.Registration;
using Stateflows.Activities.Context.Classes;
using Stateflows.Activities.Enums;

namespace Stateflows.Activities.Engine
{
Expand Down Expand Up @@ -641,7 +642,7 @@ public async Task<InitializationStatus> DoInitializeActivityAsync<TInitializatio
return result;
}

public async Task<bool> HandleExceptionAsync(Node node, Exception exception, BaseContext context)
public async Task<ExceptionHandlingResult> HandleExceptionAsync(Node node, Exception exception, ActionContext context)
{
Node handler = null;
var currentNode = node;
Expand Down Expand Up @@ -689,14 +690,23 @@ public async Task<bool> HandleExceptionAsync(Node node, Exception exception, Bas

await handler.Action.WhenAll(exceptionContext);

DoHandleOutput(exceptionContext);
if (node.ExceptionHandlers.Contains(handler))
{
context.OutputTokens.AddRange(exceptionContext.OutputTokens);
}
else
{
DoHandleOutput(exceptionContext);
}

ReportExceptionHandled(node, exceptionName, exceptionContext.OutputTokens.Where(t => t is TokenHolder<ControlToken>).ToArray(), Context);

return true;
return node.ExceptionHandlers.Contains(handler)
? ExceptionHandlingResult.HandledDirectly
: ExceptionHandlingResult.HandledIndirectly;
}

return false;
return ExceptionHandlingResult.NotHandled;
}

public async Task DoHandleNodeAsync(Node node, Edge upstreamEdge, NodeScope nodeScope, IEnumerable<TokenHolder> input = null, IEnumerable<TokenHolder> selectionTokens = null)
Expand All @@ -713,7 +723,7 @@ public async Task DoHandleNodeAsync(Node node, Edge upstreamEdge, NodeScope node
lock (GetLock(node))
{
var streams = !Context.NodesToExecute.Contains(node)
? Context.GetActivatedStreams(node, nodeScope.ThreadId)
? Context.GetNodeStreams(node, nodeScope.ThreadId, node.Type != NodeType.Output)
: Array.Empty<Stream>();

activated =
Expand Down Expand Up @@ -832,13 +842,14 @@ public async Task DoHandleNodeAsync(Node node, Edge upstreamEdge, NodeScope node

ReportNodeExecuted(node, outputTokens.Where(t => t is TokenHolder<ControlToken>).ToArray(), Context);

if (outputTokens.Any(t => t is TokenHolder<ControlToken>))
//if (outputTokens.Any(t => t is TokenHolder<ControlToken>))
{
var tokenNames = outputTokens.Select(token => token.Name).Distinct().ToArray();

var nodes = (
await Task.WhenAll(
node.Edges
.Where(edge => edge.Target.Type == NodeType.Output || outputTokens.Any(t => t is TokenHolder<ControlToken>))
.Where(edge => tokenNames.Contains(edge.TokenType.GetTokenName()) || edge.Weight == 0)
.OrderBy(edge => edge.IsElse)
.Select(async edge => (
Expand Down Expand Up @@ -961,7 +972,7 @@ private static void ReportMissingFlows(IEnumerable<Edge> flows)
}
}

private void DoHandleOutput(ActionContext context)
internal void DoHandleOutput(ActionContext context)
=> context.Context.GetOutputTokens(context.Node.Identifier, context.NodeScope.ThreadId).AddRange(context.OutputTokens);

private async Task<bool> DoHandleEdgeAsync(Edge edge, ActionContext context)
Expand Down
9 changes: 9 additions & 0 deletions Core/Stateflows/Activities/Enums/ExceptionHandlingResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Stateflows.Activities.Enums
{
internal enum ExceptionHandlingResult
{
NotHandled,
HandledDirectly,
HandledIndirectly
}
}
37 changes: 13 additions & 24 deletions Core/Stateflows/Activities/Models/Node.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ internal class Node : Element
public int ChunkSize { get; set; }
public bool Anchored { get; set; } = true;

public Logic<ActivityActionAsync> Action { get; } = new Logic<ActivityActionAsync>()
{
Name = Constants.Action
};
public Logic<ActivityActionAsync> Action { get; } = new Logic<ActivityActionAsync>(Constants.Action);

public List<Edge> Edges { get; set; } = new List<Edge>();
public List<Edge> IncomingEdges { get; set; } = new List<Edge>();
Expand Down Expand Up @@ -94,27 +91,19 @@ public void ScanForDeclaredTypes(Type nodeType)
.ToList();
}

private Logic<ActivityEventActionAsync> initialize = null;
public Logic<ActivityEventActionAsync> Initialize
=> initialize ??= new Logic<ActivityEventActionAsync>()
{
Name = Constants.Initialize
};
public Logic<ActivityEventActionAsync> Initialize { get; } =
new Logic<ActivityEventActionAsync>(Constants.Initialize);

private Logic<ActivityEventActionAsync> finalize = null;
public Logic<ActivityEventActionAsync> Finalize
=> finalize ??= new Logic<ActivityEventActionAsync>()
{
Name = Constants.Finalize
};
public Logic<ActivityEventActionAsync> Finalize { get; } =
new Logic<ActivityEventActionAsync>(Constants.Finalize);

private IEnumerable<Node> initialNodes = null;
private IEnumerable<Node> initialNodes;
public IEnumerable<Node> InitialNodes
=> initialNodes ??= Nodes.Values
.Where(n => n.Type == NodeType.Initial);

private Node inputNode = null;
private bool inputNodeSet = false;
private Node inputNode;
private bool inputNodeSet;
public Node InputNode
{
get
Expand All @@ -129,8 +118,8 @@ public Node InputNode
}
}

private Node outputNode = null;
private bool outputNodeSet = false;
private Node outputNode;
private bool outputNodeSet;
public Node OutputNode
{
get
Expand All @@ -145,17 +134,17 @@ public Node OutputNode
}
}

private IEnumerable<Node> acceptEventActionNodes = null;
private IEnumerable<Node> acceptEventActionNodes;
public IEnumerable<Node> AcceptEventActionNodes
=> acceptEventActionNodes ??= Nodes.Values
.Where(n => n.Type == NodeType.AcceptEventAction || n.Type == NodeType.TimeEventAction);

private IEnumerable<Node> danglingTimeEventActionNodes = null;
private IEnumerable<Node> danglingTimeEventActionNodes;
public IEnumerable<Node> DanglingTimeEventActionNodes
=> danglingTimeEventActionNodes ??= AcceptEventActionNodes
.Where(n => !n.IncomingEdges.Any() && n.ActualEventTypes.Any(type => type.IsSubclassOf(typeof(TimeEvent))));

private IEnumerable<Node> exceptionHandlers = null;
private IEnumerable<Node> exceptionHandlers;
public IEnumerable<Node> ExceptionHandlers
=> exceptionHandlers ??= Edges
.Select(e => e.Target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ public IActivityBuilder AddInitializer(Type initializerType, string initializerN
{
if (!Result.Initializers.TryGetValue(initializerName, out var initializer))
{
initializer = new Logic<ActivityPredicateAsync>()
{
Name = Constants.Initialize
};
initializer = new Logic<ActivityPredicateAsync>(Constants.Initialize);

Result.Initializers.Add(initializerName, initializer);
Result.InitializerTypes.Add(initializerType);
Expand All @@ -53,10 +50,7 @@ public IActivityBuilder AddInitializer(Type initializerType, string initializerN

public IActivityBuilder AddDefaultInitializer(Func<IActivityInitializationContext, Task<bool>> actionAsync)
{
Result.DefaultInitializer = new Logic<ActivityPredicateAsync>()
{
Name = Constants.Initialize
};
Result.DefaultInitializer = new Logic<ActivityPredicateAsync>(Constants.Initialize);

Result.DefaultInitializer.Actions.Add(c =>
{
Expand Down Expand Up @@ -164,16 +158,6 @@ public IActivityBuilder AddExceptionHandler<TExceptionHandler>()
return this;
}

//public IActivityBuilder AddExceptionHandler<TException>(ExceptionHandlerDelegateAsync<TException> exceptionHandler)
// where TException : Exception
//{
// AddExceptionHandler(serviceProvider
// => new AnonymousExceptionHandler<TException>(new NodeScope(serviceProvider, Node, Guid.NewGuid()), Node, exceptionHandler)
// );

// return this;
//}

public IActivityBuilder AddExceptionHandler(ActivityExceptionHandlerFactory exceptionHandlerFactory)
{
Result.ExceptionHandlerFactories.Add(exceptionHandlerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using Stateflows.Activities.Context.Interfaces;
using Stateflows.Activities.Registration.Builders;
using Stateflows.Activities.Registration.Interfaces;
using System.Threading;
using Stateflows.Activities.Enums;

namespace Stateflows.Activities.Registration
{
Expand Down Expand Up @@ -119,17 +121,14 @@ public BaseActivityBuilder AddNode(NodeType type, string nodeName, ActionDelegat

node.Action.Actions.Add(async c =>
{
var context = c as ActionContext;
var faulty = false;
try
{
var inspector = c.Activity.GetExecutor().Inspector;
await inspector.BeforeNodeExecuteAsync(c as ActionContext);
await inspector.BeforeNodeExecuteAsync(context);
await actionAsync(c);
await inspector.AfterNodeExecuteAsync(c as ActionContext);

if (!(c as BaseContext).Context.Executor.StructuralTypes.Contains(node.Type))
{
c.Output(new ControlToken());
}
await inspector.AfterNodeExecuteAsync(context);
}
catch (Exception e)
{
Expand All @@ -139,14 +138,27 @@ public BaseActivityBuilder AddNode(NodeType type, string nodeName, ActionDelegat
}
else
{
if (!await (c as BaseContext).Context.Executor.HandleExceptionAsync(node, e, c as BaseContext))
var executor = context.Context.Executor;
var result = await executor.HandleExceptionAsync(node, e, context);
if (result == ExceptionHandlingResult.NotHandled)
{
faulty = true;
throw;
}
else
{
faulty = result == ExceptionHandlingResult.HandledIndirectly;
}
}
}
}
);
finally
{
if (!faulty && !context.Context.Executor.StructuralTypes.Contains(node.Type))
{
c.Output(new ControlToken());
}
}
});

Node.Nodes.Add(node.Identifier, node);
Result.AllNodes.Add(node.Identifier, node);
Expand Down
10 changes: 2 additions & 8 deletions Core/Stateflows/Activities/Registration/Builders/FlowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public FlowBuilder(Edge edge, IServiceCollection services)

public IObjectFlowBuilder<TToken> AddGuard(GuardDelegateAsync<TToken> guardAsync)
{
var logic = new Logic<TokenPipelineActionAsync>()
{
Name = Constants.Guard
};
var logic = new Logic<TokenPipelineActionAsync>(Constants.Guard);

logic.Actions.Add(async context =>
{
Expand Down Expand Up @@ -77,10 +74,7 @@ await guardAsync(new TokenFlowContext<TToken>(context, token.Payload))

public IObjectFlowBuilder<TTransformedToken> AddTransformation<TTransformedToken>(TransformationDelegateAsync<TToken, TTransformedToken> transformationAsync)
{
var logic = new Logic<TokenPipelineActionAsync>()
{
Name = Constants.Guard
};
var logic = new Logic<TokenPipelineActionAsync>(Constants.Guard);

logic.Actions.Add(async context =>
{
Expand Down
2 changes: 1 addition & 1 deletion Core/Stateflows/Common/Attributes/BehaviorAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public abstract class BehaviorAttribute : Attribute

public int Version { get; set; }

public BehaviorAttribute(string name, int version = 1)
protected BehaviorAttribute(string name, int version = 1)
{
Name = name;
Version = version;
Expand Down
2 changes: 1 addition & 1 deletion Core/Stateflows/Common/Engine/StateflowsEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Task StartAsync(CancellationToken cancellationToken)
{
executionTask = Task.Run(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
while (!CancellationTokenSource.Token.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
if (!EventQueue.WaitAsync(CancellationTokenSource.Token).GetAwaiter().GetResult())
{
Expand Down
7 changes: 6 additions & 1 deletion Core/Stateflows/Common/Models/Logic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ namespace Stateflows.Common.Models
internal class Logic<TDelegate>
where TDelegate : Delegate
{
public Logic(string name)
{
Name = name;
}

public List<TDelegate> Actions { get; set; } = new List<TDelegate>();

public string Name { get; set; }
public string Name { get; }
}
}
2 changes: 1 addition & 1 deletion Core/Stateflows/Common/Scheduler/ScheduleExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task ExecuteAsync()
{
_ = SendAsyncMethod
.MakeGenericMethod(timeEvent.GetType())
.Invoke(behavior, new object[] { timeEvent });
.Invoke(behavior, new object[] { timeEvent, null });
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion Core/Stateflows/StateMachines/Context/Classes/BaseContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
using Stateflows.Common;
using Stateflows.Common.Context;
using Stateflows.Common.Interfaces;
Expand All @@ -16,6 +17,8 @@ public BaseContext(RootContext context)

public object ExecutionTrigger => Context.ExecutionTriggerHolder.BoxedPayload;

public IEnumerable<IExecutionStep> ExecutionSteps => Context.ExecutionSteps;

public StateMachineContext stateMachine;
public StateMachineContext StateMachine => stateMachine ??= new StateMachineContext(Context);

Expand Down
Loading
Loading