Skip to content

Commit

Permalink
Allow other nodes than the component nodes to have dependants (grafan…
Browse files Browse the repository at this point in the history
wildum authored and BarunKGP committed Feb 20, 2024
1 parent 421f461 commit 67905f0
Showing 6 changed files with 57 additions and 52 deletions.
29 changes: 15 additions & 14 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
@@ -27,28 +27,29 @@
// when evaluating the configuration for a component will always be reported as
// unhealthy until the next successful evaluation.
//
// # Component Evaluation
// # Node Evaluation
//
// The process of converting the River block associated with a component into
// the appropriate Go struct is called "component evaluation."
// The process of converting the River block associated with a node into
// the appropriate Go struct is called "node evaluation."
//
// Components are only evaluated after all components they reference have been
// Nodes are only evaluated after all nodes they reference have been
// evaluated; cyclic dependencies are invalid.
//
// If a component updates its Exports at runtime, other components which directly
// or indirectly reference the updated component will have their Arguments
// If a node updates its Exports at runtime, other nodes which directly
// or indirectly reference the updated node will have their Arguments
// re-evaluated.
//
// The arguments and exports for a component will be left in their last valid
// state if a component shuts down or is given an invalid config. This prevents
// a domino effect of a single failed component taking down other components
// The arguments and exports for a node will be left in their last valid
// state if a node shuts down or is given an invalid config. This prevents
// a domino effect of a single failed node taking down other node
// which are otherwise healthy.
package flow

import (
"context"
"fmt"
"sync"
"time"

"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/worker"
@@ -185,9 +186,9 @@ func newController(o controllerOptions) *Flow {
Logger: log,
TraceProvider: tracer,
DataPath: o.DataPath,
OnComponentUpdate: func(cn *controller.ComponentNode) {
// Changed components should be queued for reevaluation.
f.updateQueue.Enqueue(cn)
OnBlockNodeUpdate: func(cn controller.BlockNode) {
// Changed node should be queued for reevaluation.
f.updateQueue.Enqueue(&controller.QueuedNode{Node: cn, LastUpdatedTime: time.Now()})
},
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
@@ -236,8 +237,8 @@ func (f *Flow) Run(ctx context.Context) {
return

case <-f.updateQueue.Chan():
// Evaluate all components that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two components have the same dependency, and the first time
// Evaluate all nodes that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two nodes have the same dependency, and the first time
// it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations.
all := f.updateQueue.DequeueAll()
f.loader.EvaluateDependants(ctx, all)
24 changes: 13 additions & 11 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
@@ -555,13 +555,13 @@ func (l *Loader) OriginalGraph() *dag.Graph {
return l.originalGraph.Clone()
}

// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the
// workerPool. It should be called whenever components update their exports.
// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before
// EvaluateDependants sends nodes which depend directly on nodes in updatedNodes for evaluation to the
// workerPool. It should be called whenever nodes update their exports.
// It is beneficial to call EvaluateDependants with a batch of nodes, as it will enqueue the entire batch before
// the worker pool starts to evaluate them, resulting in smaller number of total evaluations when
// node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until
// it succeeds or until the ctx is cancelled.
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) {
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedNode) {
if len(updatedNodes) == 0 {
return
}
@@ -577,12 +577,14 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
l.mut.RLock()
defer l.mut.RUnlock()

dependenciesToParentsMap := make(map[dag.Node]*ComponentNode)
dependenciesToParentsMap := make(map[dag.Node]*QueuedNode)
for _, parent := range updatedNodes {
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parent.ID(), parent.Exports())
if componentNode, ok := parent.Node.(*ComponentNode); ok {
l.cache.CacheExports(componentNode.ID(), componentNode.Exports())
}
// We collect all nodes directly incoming to parent.
_ = dag.WalkIncomingNodes(l.graph, parent, func(n dag.Node) error {
_ = dag.WalkIncomingNodes(l.graph, parent.Node, func(n dag.Node) error {
dependenciesToParentsMap[n] = parent
return nil
})
@@ -595,7 +597,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
for n, parent := range dependenciesToParentsMap {
dependantCtx, span := tracer.Start(spanCtx, "SubmitForEvaluation", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.Node.NodeID()))

// Submit for asynchronous evaluation with retries and backoff. Don't use range variables in the closure.
var (
@@ -614,7 +616,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
"and cannot keep up with evaluating components - will retry",
"err", err,
"node_id", n.NodeID(),
"originator_id", parent.NodeID(),
"originator_id", parent.Node.NodeID(),
"retries", retryBackoff.NumRetries(),
)
retryBackoff.Wait()
@@ -637,9 +639,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone

// concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to
// a worker pool for asynchronous evaluation.
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) {
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *QueuedNode) {
start := time.Now()
l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds())
l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdatedTime).Seconds())
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
defer span.End()
4 changes: 2 additions & 2 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ func TestLoader(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return nil
@@ -207,7 +207,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return fakeModuleController{}
11 changes: 4 additions & 7 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ import (
"github.com/grafana/river/vm"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)

// ComponentID is a fully-qualified name of a component. Each element in
@@ -66,7 +65,7 @@ type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
ControllerID string // ID of controller.
@@ -90,8 +89,7 @@ type ComponentNode struct {
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate

mut sync.RWMutex
block *ast.BlockStmt // Current River block to derive args from
@@ -146,7 +144,7 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnComponentUpdate: globals.OnComponentUpdate,
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,

block: b,
eval: vm.New(b.Body),
@@ -379,8 +377,7 @@ func (cn *ComponentNode) setExports(e component.Exports) {

if changed {
// Inform the controller that we have new exports.
cn.lastUpdateTime.Store(time.Now())
cn.OnComponentUpdate(cn)
cn.OnBlockNodeUpdate(cn)
}
}

33 changes: 19 additions & 14 deletions pkg/flow/internal/controller/queue.go
Original file line number Diff line number Diff line change
@@ -2,32 +2,37 @@ package controller

import (
"sync"
"time"
)

// Queue is a thread-safe, insertion-ordered set of components.
// Queue is a thread-safe, insertion-ordered set of nodes.
//
// Queue is intended for tracking components that have updated their Exports
// for later reevaluation.
// Queue is intended for tracking nodes that have been updated for later reevaluation.
type Queue struct {
mut sync.Mutex
queuedSet map[*ComponentNode]struct{}
queuedOrder []*ComponentNode
queuedSet map[*QueuedNode]struct{}
queuedOrder []*QueuedNode

updateCh chan struct{}
}

type QueuedNode struct {
Node BlockNode
LastUpdatedTime time.Time
}

// NewQueue returns a new queue.
func NewQueue() *Queue {
return &Queue{
updateCh: make(chan struct{}, 1),
queuedSet: make(map[*ComponentNode]struct{}),
queuedOrder: make([]*ComponentNode, 0),
queuedSet: make(map[*QueuedNode]struct{}),
queuedOrder: make([]*QueuedNode, 0),
}
}

// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the
// component is already in the Queue.
func (q *Queue) Enqueue(c *ComponentNode) {
// Enqueue inserts a new BlockNode into the Queue. Enqueue is a no-op if the
// BlockNode is already in the Queue.
func (q *Queue) Enqueue(c *QueuedNode) {
q.mut.Lock()
defer q.mut.Unlock()

@@ -47,14 +52,14 @@ func (q *Queue) Enqueue(c *ComponentNode) {
// Chan returns a channel which is written to when the queue is non-empty.
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }

// DequeueAll removes all components from the queue and returns them.
func (q *Queue) DequeueAll() []*ComponentNode {
// DequeueAll removes all BlockNode from the queue and returns them.
func (q *Queue) DequeueAll() []*QueuedNode {
q.mut.Lock()
defer q.mut.Unlock()

all := q.queuedOrder
q.queuedOrder = make([]*ComponentNode, 0)
q.queuedSet = make(map[*ComponentNode]struct{})
q.queuedOrder = make([]*QueuedNode, 0)
q.queuedSet = make(map[*QueuedNode]struct{})

return all
}
8 changes: 4 additions & 4 deletions pkg/flow/internal/controller/queue_test.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import (
)

func TestEnqueueDequeue(t *testing.T) {
tn := &ComponentNode{}
tn := &QueuedNode{}
q := NewQueue()
q.Enqueue(tn)
require.Lenf(t, q.queuedSet, 1, "queue should be 1")
@@ -26,7 +26,7 @@ func TestDequeue_Empty(t *testing.T) {
}

func TestDequeue_InOrder(t *testing.T) {
c1, c2, c3 := &ComponentNode{}, &ComponentNode{}, &ComponentNode{}
c1, c2, c3 := &QueuedNode{}, &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c2)
@@ -41,7 +41,7 @@ func TestDequeue_InOrder(t *testing.T) {
}

func TestDequeue_NoDuplicates(t *testing.T) {
c1, c2 := &ComponentNode{}, &ComponentNode{}
c1, c2 := &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c1)
@@ -58,7 +58,7 @@ func TestDequeue_NoDuplicates(t *testing.T) {
}

func TestEnqueue_ChannelNotification(t *testing.T) {
c1 := &ComponentNode{}
c1 := &QueuedNode{}
q := NewQueue()

notificationsCount := atomic.Int32{}

0 comments on commit 67905f0

Please sign in to comment.