Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow other nodes than the component nodes to have dependants #6225

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@
// when evaluating the configuration for a component will always be reported as
// unhealthy until the next successful evaluation.
//
// # Component Evaluation
// # Node Evaluation
//
// The process of converting the River block associated with a component into
// the appropriate Go struct is called "component evaluation."
// The process of converting the River block associated with a node into
// the appropriate Go struct is called "node evaluation."
//
// Components are only evaluated after all components they reference have been
// Nodes are only evaluated after all nodes they reference have been
// evaluated; cyclic dependencies are invalid.
//
// If a component updates its Exports at runtime, other components which directly
// or indirectly reference the updated component will have their Arguments
// If a node updates its Exports at runtime, other nodes which directly
// or indirectly reference the updated node will have their Arguments
// re-evaluated.
//
// The arguments and exports for a component will be left in their last valid
// state if a component shuts down or is given an invalid config. This prevents
// a domino effect of a single failed component taking down other components
// The arguments and exports for a node will be left in their last valid
// state if a node shuts down or is given an invalid config. This prevents
// a domino effect of a single failed node taking down other node
// which are otherwise healthy.
package flow

Expand Down Expand Up @@ -185,8 +185,8 @@ func newController(o controllerOptions) *Flow {
Logger: log,
TraceProvider: tracer,
DataPath: o.DataPath,
OnComponentUpdate: func(cn *controller.ComponentNode) {
// Changed components should be queued for reevaluation.
OnNodeWithDependantsUpdate: func(cn controller.NodeWithDependants) {
// Changed node with dependants should be queued for reevaluation.
f.updateQueue.Enqueue(cn)
},
OnExportsChange: o.OnExportsChange,
Expand Down Expand Up @@ -236,8 +236,8 @@ func (f *Flow) Run(ctx context.Context) {
return

case <-f.updateQueue.Chan():
// Evaluate all components that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two components have the same dependency, and the first time
// Evaluate all nodes that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two nodes have the same dependency, and the first time
// it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations.
all := f.updateQueue.DequeueAll()
f.loader.EvaluateDependants(ctx, all)
Expand Down
14 changes: 7 additions & 7 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,13 @@ func (l *Loader) OriginalGraph() *dag.Graph {
return l.originalGraph.Clone()
}

// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the
// workerPool. It should be called whenever components update their exports.
// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before
// EvaluateDependants sends nodes which depend directly on nodes in updatedNodes for evaluation to the
// workerPool. It should be called whenever nodes update their exports.
// It is beneficial to call EvaluateDependants with a batch of nodes, as it will enqueue the entire batch before
// the worker pool starts to evaluate them, resulting in smaller number of total evaluations when
// node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until
// it succeeds or until the ctx is cancelled.
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) {
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []NodeWithDependants) {
if len(updatedNodes) == 0 {
return
}
Expand All @@ -577,7 +577,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
l.mut.RLock()
defer l.mut.RUnlock()

dependenciesToParentsMap := make(map[dag.Node]*ComponentNode)
dependenciesToParentsMap := make(map[dag.Node]NodeWithDependants)
for _, parent := range updatedNodes {
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parent.ID(), parent.Exports())
Expand Down Expand Up @@ -637,9 +637,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone

// concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to
// a worker pool for asynchronous evaluation.
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) {
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent NodeWithDependants) {
start := time.Now()
l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds())
l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdateTime()).Seconds())
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
defer span.End()
Expand Down
20 changes: 10 additions & 10 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func TestLoader(t *testing.T) {
l, _ := logging.New(os.Stderr, logging.DefaultOptions)
return controller.LoaderOptions{
ComponentGlobals: controller.ComponentGlobals{
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnNodeWithDependantsUpdate: func(cn controller.NodeWithDependants) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return nil
},
Expand Down Expand Up @@ -204,11 +204,11 @@ func TestScopeWithFailingComponent(t *testing.T) {
l, _ := logging.New(os.Stderr, logging.DefaultOptions)
return controller.LoaderOptions{
ComponentGlobals: controller.ComponentGlobals{
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnNodeWithDependantsUpdate: func(cn controller.NodeWithDependants) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return fakeModuleController{}
},
Expand Down
69 changes: 37 additions & 32 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type DialFunc func(ctx context.Context, network, address string) (net.Conn, erro
// ComponentGlobals are used by ComponentNodes to build managed components. All
// ComponentNodes should use the same ComponentGlobals.
type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
ControllerID string // ID of controller.
NewModuleController func(id string) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnNodeWithDependantsUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
ControllerID string // ID of controller.
NewModuleController func(id string) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
}

// ComponentNode is a controller node which manages a user-defined component.
Expand All @@ -80,18 +80,18 @@ type ComponentGlobals struct {
// arguments and exports. ComponentNode manages the arguments for the component
// from a River block.
type ComponentNode struct {
id ComponentID
globalID string
label string
componentName string
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
reg component.Registration
managedOpts component.Options
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time
id ComponentID
globalID string
label string
componentName string
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
reg component.Registration
managedOpts component.Options
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnNodeWithDependantsUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time

mut sync.RWMutex
block *ast.BlockStmt // Current River block to derive args from
Expand All @@ -111,7 +111,7 @@ type ComponentNode struct {
exports component.Exports // Evaluated exports for the managed component
}

var _ BlockNode = (*ComponentNode)(nil)
var _ NodeWithDependants = (*ComponentNode)(nil)

// NewComponentNode creates a new ComponentNode from an initial ast.BlockStmt.
// The underlying managed component isn't created until Evaluate is called.
Expand All @@ -138,15 +138,15 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a
}

cn := &ComponentNode{
id: id,
globalID: globalID,
label: b.Label,
nodeID: nodeID,
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnComponentUpdate: globals.OnComponentUpdate,
id: id,
globalID: globalID,
label: b.Label,
nodeID: nodeID,
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnNodeWithDependantsUpdate: globals.OnNodeWithDependantsUpdate,

block: b,
eval: vm.New(b.Body),
Expand Down Expand Up @@ -351,6 +351,11 @@ func (cn *ComponentNode) Exports() component.Exports {
return cn.exports
}

// LastUpdateTime returns the time corresponding to the last time where the component changed its exports.
func (cn *ComponentNode) LastUpdateTime() time.Time {
return cn.lastUpdateTime.Load()
}

// setExports is called whenever the managed component updates. e must be the
// same type as the registered exports type of the managed component.
func (cn *ComponentNode) setExports(e component.Exports) {
Expand Down Expand Up @@ -380,7 +385,7 @@ func (cn *ComponentNode) setExports(e component.Exports) {
if changed {
// Inform the controller that we have new exports.
cn.lastUpdateTime.Store(time.Now())
cn.OnComponentUpdate(cn)
cn.OnNodeWithDependantsUpdate(cn)
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/flow/internal/controller/node_with_dependants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package controller

import (
"time"

"github.com/grafana/agent/component"
)

// NodeWithDependants must be implemented by BlockNode that can trigger other nodes to be evaluated.
type NodeWithDependants interface {
BlockNode

// LastUpdateTime returns the time corresponding to the last time where the node changed its exports.
LastUpdateTime() time.Time

// Exports returns the current set of exports from the managed component.
Exports() component.Exports

// ID returns the component ID of the managed component from its River block.
ID() ComponentID
}
Copy link
Member

@rfratto rfratto Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a few issues with this interface:

  • The name is potentially misleading. The name implies it's about a characteristic of a particular node, but the description is more about whether a BlockNode can emit an event informing its dependents to be updated. In fact, from the perspective of the DAG, any node can have dependents regardless of whether it implements this interface.

  • The set of methods is perhaps too broad; it's not obvious why a node which can emit updates to cause other nodes to be evaluated needs to have component exports, or a component ID.

Because of the points above, I think it's hard to justify introducing this interface as a new concept to keep inside one's mental model of the controller code.

I would recommend that we aim to not introduce this interface, opting instead for the following:

  1. Move LastUpdateTime() to BlockNode instead. This would be a reasonable expectation of a BlockNode; as all BlockNodes support evaluating, it makes sense for them to also report their last update time. (Other methods should not be moved, just LastUpdateTime.)
  2. Instead of changing code to accept NodeWithDependents, update it to accept a BlockNode instead. You will need to add some type switching when a more concrete type is necessary.
    • This will also have the added benefit of making some of the changed code a bit shorter; OnNodeWithDependantsUpdate is a bit harder to read than OnBlockNodeUpdate.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I was not really happy with the name in the first place so I'm happy to change this :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes to see how it looks but after putting more thought into it I'm not 100% happy with it:

  • For the component node, LastUpdateTime actually refers to the last time its exports were updated, which is not necessarily tied to when the node was updated or evaluated.
  • In the Evaluate dependant I use a type cast to cache the exports because the BlockNode does not have the Export() function but I feel like this should be the default behavior. All BlockNodes can be evaluated which means that they have a concept of "Arguments" (inputs). I think that it actually makes sense that they also have a concept of "Exports" (outputs) and that these exports should be cached if not nil.

But for now the definitions of Arguments and Exports are really tied to components and not nodes so it would require bigger changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the component node, LastUpdateTime actually refers to the last time its exports were updated, which is not necessarily tied to when the node was updated or evaluated.

Ah, I see. Honestly, I don't think a node should have to track its own export time anyway; the controller could be tracking that when OnBlockNodeUpdate is called. What do you think? Should that be scoped into this PR so we can take it out of the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, the change is small and the time is only used by one metric. I think that it is worth doing it in the PR as it prevents introducing redundant code. I will take care of it

26 changes: 13 additions & 13 deletions pkg/flow/internal/controller/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"sync"
)

// Queue is a thread-safe, insertion-ordered set of components.
// Queue is a thread-safe, insertion-ordered set of nodes.
//
// Queue is intended for tracking components that have updated their Exports
// Queue is intended for tracking nodes that have updated their Exports
// for later reevaluation.
type Queue struct {
mut sync.Mutex
queuedSet map[*ComponentNode]struct{}
queuedOrder []*ComponentNode
queuedSet map[NodeWithDependants]struct{}
queuedOrder []NodeWithDependants

updateCh chan struct{}
}
Expand All @@ -20,14 +20,14 @@ type Queue struct {
func NewQueue() *Queue {
return &Queue{
updateCh: make(chan struct{}, 1),
queuedSet: make(map[*ComponentNode]struct{}),
queuedOrder: make([]*ComponentNode, 0),
queuedSet: make(map[NodeWithDependants]struct{}),
queuedOrder: make([]NodeWithDependants, 0),
}
}

// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the
// component is already in the Queue.
func (q *Queue) Enqueue(c *ComponentNode) {
// Enqueue inserts a new NodeWithDependants into the Queue. Enqueue is a no-op if the
// NodeWithDependants is already in the Queue.
func (q *Queue) Enqueue(c NodeWithDependants) {
q.mut.Lock()
defer q.mut.Unlock()

Expand All @@ -47,14 +47,14 @@ func (q *Queue) Enqueue(c *ComponentNode) {
// Chan returns a channel which is written to when the queue is non-empty.
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }

// DequeueAll removes all components from the queue and returns them.
func (q *Queue) DequeueAll() []*ComponentNode {
// DequeueAll removes all NodeWithDependants from the queue and returns them.
func (q *Queue) DequeueAll() []NodeWithDependants {
q.mut.Lock()
defer q.mut.Unlock()

all := q.queuedOrder
q.queuedOrder = make([]*ComponentNode, 0)
q.queuedSet = make(map[*ComponentNode]struct{})
q.queuedOrder = make([]NodeWithDependants, 0)
q.queuedSet = make(map[NodeWithDependants]struct{})

return all
}