Skip to content

Commit

Permalink
Allow other nodes than the component nodes to have dependants (#6225)
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum authored Jan 24, 2024
1 parent 2338ed5 commit 5e9b0bd
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 52 deletions.
29 changes: 15 additions & 14 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,29 @@
// when evaluating the configuration for a component will always be reported as
// unhealthy until the next successful evaluation.
//
// # Component Evaluation
// # Node Evaluation
//
// The process of converting the River block associated with a component into
// the appropriate Go struct is called "component evaluation."
// The process of converting the River block associated with a node into
// the appropriate Go struct is called "node evaluation."
//
// Components are only evaluated after all components they reference have been
// Nodes are only evaluated after all nodes they reference have been
// evaluated; cyclic dependencies are invalid.
//
// If a component updates its Exports at runtime, other components which directly
// or indirectly reference the updated component will have their Arguments
// If a node updates its Exports at runtime, other nodes which directly
// or indirectly reference the updated node will have their Arguments
// re-evaluated.
//
// The arguments and exports for a component will be left in their last valid
// state if a component shuts down or is given an invalid config. This prevents
// a domino effect of a single failed component taking down other components
// The arguments and exports for a node will be left in their last valid
// state if a node shuts down or is given an invalid config. This prevents
// a domino effect of a single failed node taking down other node
// which are otherwise healthy.
package flow

import (
"context"
"fmt"
"sync"
"time"

"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/worker"
Expand Down Expand Up @@ -185,9 +186,9 @@ func newController(o controllerOptions) *Flow {
Logger: log,
TraceProvider: tracer,
DataPath: o.DataPath,
OnComponentUpdate: func(cn *controller.ComponentNode) {
// Changed components should be queued for reevaluation.
f.updateQueue.Enqueue(cn)
OnBlockNodeUpdate: func(cn controller.BlockNode) {
// Changed node should be queued for reevaluation.
f.updateQueue.Enqueue(&controller.QueuedNode{Node: cn, LastUpdatedTime: time.Now()})
},
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
Expand Down Expand Up @@ -236,8 +237,8 @@ func (f *Flow) Run(ctx context.Context) {
return

case <-f.updateQueue.Chan():
// Evaluate all components that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two components have the same dependency, and the first time
// Evaluate all nodes that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two nodes have the same dependency, and the first time
// it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations.
all := f.updateQueue.DequeueAll()
f.loader.EvaluateDependants(ctx, all)
Expand Down
24 changes: 13 additions & 11 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,13 @@ func (l *Loader) OriginalGraph() *dag.Graph {
return l.originalGraph.Clone()
}

// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the
// workerPool. It should be called whenever components update their exports.
// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before
// EvaluateDependants sends nodes which depend directly on nodes in updatedNodes for evaluation to the
// workerPool. It should be called whenever nodes update their exports.
// It is beneficial to call EvaluateDependants with a batch of nodes, as it will enqueue the entire batch before
// the worker pool starts to evaluate them, resulting in smaller number of total evaluations when
// node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until
// it succeeds or until the ctx is cancelled.
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) {
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedNode) {
if len(updatedNodes) == 0 {
return
}
Expand All @@ -577,12 +577,14 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
l.mut.RLock()
defer l.mut.RUnlock()

dependenciesToParentsMap := make(map[dag.Node]*ComponentNode)
dependenciesToParentsMap := make(map[dag.Node]*QueuedNode)
for _, parent := range updatedNodes {
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parent.ID(), parent.Exports())
if componentNode, ok := parent.Node.(*ComponentNode); ok {
l.cache.CacheExports(componentNode.ID(), componentNode.Exports())
}
// We collect all nodes directly incoming to parent.
_ = dag.WalkIncomingNodes(l.graph, parent, func(n dag.Node) error {
_ = dag.WalkIncomingNodes(l.graph, parent.Node, func(n dag.Node) error {
dependenciesToParentsMap[n] = parent
return nil
})
Expand All @@ -595,7 +597,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
for n, parent := range dependenciesToParentsMap {
dependantCtx, span := tracer.Start(spanCtx, "SubmitForEvaluation", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.Node.NodeID()))

// Submit for asynchronous evaluation with retries and backoff. Don't use range variables in the closure.
var (
Expand All @@ -614,7 +616,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
"and cannot keep up with evaluating components - will retry",
"err", err,
"node_id", n.NodeID(),
"originator_id", parent.NodeID(),
"originator_id", parent.Node.NodeID(),
"retries", retryBackoff.NumRetries(),
)
retryBackoff.Wait()
Expand All @@ -637,9 +639,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone

// concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to
// a worker pool for asynchronous evaluation.
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) {
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *QueuedNode) {
start := time.Now()
l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds())
l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdatedTime).Seconds())
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
defer span.End()
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestLoader(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return nil
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return fakeModuleController{}
Expand Down
11 changes: 4 additions & 7 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/grafana/river/vm"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)

// ComponentID is a fully-qualified name of a component. Each element in
Expand Down Expand Up @@ -66,7 +65,7 @@ type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
ControllerID string // ID of controller.
Expand All @@ -90,8 +89,7 @@ type ComponentNode struct {
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate

mut sync.RWMutex
block *ast.BlockStmt // Current River block to derive args from
Expand Down Expand Up @@ -146,7 +144,7 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnComponentUpdate: globals.OnComponentUpdate,
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,

block: b,
eval: vm.New(b.Body),
Expand Down Expand Up @@ -379,8 +377,7 @@ func (cn *ComponentNode) setExports(e component.Exports) {

if changed {
// Inform the controller that we have new exports.
cn.lastUpdateTime.Store(time.Now())
cn.OnComponentUpdate(cn)
cn.OnBlockNodeUpdate(cn)
}
}

Expand Down
33 changes: 19 additions & 14 deletions pkg/flow/internal/controller/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,37 @@ package controller

import (
"sync"
"time"
)

// Queue is a thread-safe, insertion-ordered set of components.
// Queue is a thread-safe, insertion-ordered set of nodes.
//
// Queue is intended for tracking components that have updated their Exports
// for later reevaluation.
// Queue is intended for tracking nodes that have been updated for later reevaluation.
type Queue struct {
mut sync.Mutex
queuedSet map[*ComponentNode]struct{}
queuedOrder []*ComponentNode
queuedSet map[*QueuedNode]struct{}
queuedOrder []*QueuedNode

updateCh chan struct{}
}

type QueuedNode struct {
Node BlockNode
LastUpdatedTime time.Time
}

// NewQueue returns a new queue.
func NewQueue() *Queue {
return &Queue{
updateCh: make(chan struct{}, 1),
queuedSet: make(map[*ComponentNode]struct{}),
queuedOrder: make([]*ComponentNode, 0),
queuedSet: make(map[*QueuedNode]struct{}),
queuedOrder: make([]*QueuedNode, 0),
}
}

// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the
// component is already in the Queue.
func (q *Queue) Enqueue(c *ComponentNode) {
// Enqueue inserts a new BlockNode into the Queue. Enqueue is a no-op if the
// BlockNode is already in the Queue.
func (q *Queue) Enqueue(c *QueuedNode) {
q.mut.Lock()
defer q.mut.Unlock()

Expand All @@ -47,14 +52,14 @@ func (q *Queue) Enqueue(c *ComponentNode) {
// Chan returns a channel which is written to when the queue is non-empty.
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }

// DequeueAll removes all components from the queue and returns them.
func (q *Queue) DequeueAll() []*ComponentNode {
// DequeueAll removes all BlockNode from the queue and returns them.
func (q *Queue) DequeueAll() []*QueuedNode {
q.mut.Lock()
defer q.mut.Unlock()

all := q.queuedOrder
q.queuedOrder = make([]*ComponentNode, 0)
q.queuedSet = make(map[*ComponentNode]struct{})
q.queuedOrder = make([]*QueuedNode, 0)
q.queuedSet = make(map[*QueuedNode]struct{})

return all
}
8 changes: 4 additions & 4 deletions pkg/flow/internal/controller/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestEnqueueDequeue(t *testing.T) {
tn := &ComponentNode{}
tn := &QueuedNode{}
q := NewQueue()
q.Enqueue(tn)
require.Lenf(t, q.queuedSet, 1, "queue should be 1")
Expand All @@ -26,7 +26,7 @@ func TestDequeue_Empty(t *testing.T) {
}

func TestDequeue_InOrder(t *testing.T) {
c1, c2, c3 := &ComponentNode{}, &ComponentNode{}, &ComponentNode{}
c1, c2, c3 := &QueuedNode{}, &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c2)
Expand All @@ -41,7 +41,7 @@ func TestDequeue_InOrder(t *testing.T) {
}

func TestDequeue_NoDuplicates(t *testing.T) {
c1, c2 := &ComponentNode{}, &ComponentNode{}
c1, c2 := &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c1)
Expand All @@ -58,7 +58,7 @@ func TestDequeue_NoDuplicates(t *testing.T) {
}

func TestEnqueue_ChannelNotification(t *testing.T) {
c1 := &ComponentNode{}
c1 := &QueuedNode{}
q := NewQueue()

notificationsCount := atomic.Int32{}
Expand Down

0 comments on commit 5e9b0bd

Please sign in to comment.