diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 76d62bdb9cb2..6e839abda36f 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -27,21 +27,21 @@ // when evaluating the configuration for a component will always be reported as // unhealthy until the next successful evaluation. // -// # Component Evaluation +// # Node Evaluation // -// The process of converting the River block associated with a component into -// the appropriate Go struct is called "component evaluation." +// The process of converting the River block associated with a node into +// the appropriate Go struct is called "node evaluation." // -// Components are only evaluated after all components they reference have been +// Nodes are only evaluated after all nodes they reference have been // evaluated; cyclic dependencies are invalid. // -// If a component updates its Exports at runtime, other components which directly -// or indirectly reference the updated component will have their Arguments +// If a node updates its Exports at runtime, other nodes which directly +// or indirectly reference the updated node will have their Arguments // re-evaluated. // -// The arguments and exports for a component will be left in their last valid -// state if a component shuts down or is given an invalid config. This prevents -// a domino effect of a single failed component taking down other components +// The arguments and exports for a node will be left in their last valid +// state if a node shuts down or is given an invalid config. This prevents +// a domino effect of a single failed node taking down other node // which are otherwise healthy. package flow @@ -49,6 +49,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" @@ -185,9 +186,9 @@ func newController(o controllerOptions) *Flow { Logger: log, TraceProvider: tracer, DataPath: o.DataPath, - OnComponentUpdate: func(cn *controller.ComponentNode) { - // Changed components should be queued for reevaluation. - f.updateQueue.Enqueue(cn) + OnBlockNodeUpdate: func(cn controller.BlockNode) { + // Changed node should be queued for reevaluation. + f.updateQueue.Enqueue(&controller.QueuedNode{Node: cn, LastUpdatedTime: time.Now()}) }, OnExportsChange: o.OnExportsChange, Registerer: o.Reg, @@ -236,8 +237,8 @@ func (f *Flow) Run(ctx context.Context) { return case <-f.updateQueue.Chan(): - // Evaluate all components that have been updated. Sending the entire batch together will improve - // throughput - it prevents the situation where two components have the same dependency, and the first time + // Evaluate all nodes that have been updated. Sending the entire batch together will improve + // throughput - it prevents the situation where two nodes have the same dependency, and the first time // it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations. all := f.updateQueue.DequeueAll() f.loader.EvaluateDependants(ctx, all) diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 4a22b2419f0a..280741b53b1a 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -555,13 +555,13 @@ func (l *Loader) OriginalGraph() *dag.Graph { return l.originalGraph.Clone() } -// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the -// workerPool. It should be called whenever components update their exports. -// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before +// EvaluateDependants sends nodes which depend directly on nodes in updatedNodes for evaluation to the +// workerPool. It should be called whenever nodes update their exports. +// It is beneficial to call EvaluateDependants with a batch of nodes, as it will enqueue the entire batch before // the worker pool starts to evaluate them, resulting in smaller number of total evaluations when // node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until // it succeeds or until the ctx is cancelled. -func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) { +func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedNode) { if len(updatedNodes) == 0 { return } @@ -577,12 +577,14 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone l.mut.RLock() defer l.mut.RUnlock() - dependenciesToParentsMap := make(map[dag.Node]*ComponentNode) + dependenciesToParentsMap := make(map[dag.Node]*QueuedNode) for _, parent := range updatedNodes { // Make sure we're in-sync with the current exports of parent. - l.cache.CacheExports(parent.ID(), parent.Exports()) + if componentNode, ok := parent.Node.(*ComponentNode); ok { + l.cache.CacheExports(componentNode.ID(), componentNode.Exports()) + } // We collect all nodes directly incoming to parent. - _ = dag.WalkIncomingNodes(l.graph, parent, func(n dag.Node) error { + _ = dag.WalkIncomingNodes(l.graph, parent.Node, func(n dag.Node) error { dependenciesToParentsMap[n] = parent return nil }) @@ -595,7 +597,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone for n, parent := range dependenciesToParentsMap { dependantCtx, span := tracer.Start(spanCtx, "SubmitForEvaluation", trace.WithSpanKind(trace.SpanKindInternal)) span.SetAttributes(attribute.String("node_id", n.NodeID())) - span.SetAttributes(attribute.String("originator_id", parent.NodeID())) + span.SetAttributes(attribute.String("originator_id", parent.Node.NodeID())) // Submit for asynchronous evaluation with retries and backoff. Don't use range variables in the closure. var ( @@ -614,7 +616,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone "and cannot keep up with evaluating components - will retry", "err", err, "node_id", n.NodeID(), - "originator_id", parent.NodeID(), + "originator_id", parent.Node.NodeID(), "retries", retryBackoff.NumRetries(), ) retryBackoff.Wait() @@ -637,9 +639,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone // concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to // a worker pool for asynchronous evaluation. -func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) { +func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *QueuedNode) { start := time.Now() - l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds()) + l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdatedTime).Seconds()) _, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal)) span.SetAttributes(attribute.String("node_id", n.NodeID())) defer span.End() diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index e93f757b1a2f..1322db4a69c8 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -73,7 +73,7 @@ func TestLoader(t *testing.T) { Logger: l, TraceProvider: noop.NewTracerProvider(), DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string) controller.ModuleController { return nil @@ -207,7 +207,7 @@ func TestScopeWithFailingComponent(t *testing.T) { Logger: l, TraceProvider: noop.NewTracerProvider(), DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string) controller.ModuleController { return fakeModuleController{} diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index b99597809d4b..9543bee728e9 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/river/vm" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" - "go.uber.org/atomic" ) // ComponentID is a fully-qualified name of a component. Each element in @@ -66,7 +65,7 @@ type ComponentGlobals struct { Logger *logging.Logger // Logger shared between all managed components. TraceProvider trace.TracerProvider // Tracer shared between all managed components. DataPath string // Shared directory where component data may be stored - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports Registerer prometheus.Registerer // Registerer for serving agent and component metrics ControllerID string // ID of controller. @@ -90,8 +89,7 @@ type ComponentNode struct { registry *prometheus.Registry exportsType reflect.Type moduleController ModuleController - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate - lastUpdateTime atomic.Time + OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate mut sync.RWMutex block *ast.BlockStmt // Current River block to derive args from @@ -146,7 +144,7 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a reg: reg, exportsType: getExportsType(reg), moduleController: globals.NewModuleController(globalID), - OnComponentUpdate: globals.OnComponentUpdate, + OnBlockNodeUpdate: globals.OnBlockNodeUpdate, block: b, eval: vm.New(b.Body), @@ -379,8 +377,7 @@ func (cn *ComponentNode) setExports(e component.Exports) { if changed { // Inform the controller that we have new exports. - cn.lastUpdateTime.Store(time.Now()) - cn.OnComponentUpdate(cn) + cn.OnBlockNodeUpdate(cn) } } diff --git a/pkg/flow/internal/controller/queue.go b/pkg/flow/internal/controller/queue.go index a8cd1b5bae05..65c1448573b9 100644 --- a/pkg/flow/internal/controller/queue.go +++ b/pkg/flow/internal/controller/queue.go @@ -2,32 +2,37 @@ package controller import ( "sync" + "time" ) -// Queue is a thread-safe, insertion-ordered set of components. +// Queue is a thread-safe, insertion-ordered set of nodes. // -// Queue is intended for tracking components that have updated their Exports -// for later reevaluation. +// Queue is intended for tracking nodes that have been updated for later reevaluation. type Queue struct { mut sync.Mutex - queuedSet map[*ComponentNode]struct{} - queuedOrder []*ComponentNode + queuedSet map[*QueuedNode]struct{} + queuedOrder []*QueuedNode updateCh chan struct{} } +type QueuedNode struct { + Node BlockNode + LastUpdatedTime time.Time +} + // NewQueue returns a new queue. func NewQueue() *Queue { return &Queue{ updateCh: make(chan struct{}, 1), - queuedSet: make(map[*ComponentNode]struct{}), - queuedOrder: make([]*ComponentNode, 0), + queuedSet: make(map[*QueuedNode]struct{}), + queuedOrder: make([]*QueuedNode, 0), } } -// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the -// component is already in the Queue. -func (q *Queue) Enqueue(c *ComponentNode) { +// Enqueue inserts a new BlockNode into the Queue. Enqueue is a no-op if the +// BlockNode is already in the Queue. +func (q *Queue) Enqueue(c *QueuedNode) { q.mut.Lock() defer q.mut.Unlock() @@ -47,14 +52,14 @@ func (q *Queue) Enqueue(c *ComponentNode) { // Chan returns a channel which is written to when the queue is non-empty. func (q *Queue) Chan() <-chan struct{} { return q.updateCh } -// DequeueAll removes all components from the queue and returns them. -func (q *Queue) DequeueAll() []*ComponentNode { +// DequeueAll removes all BlockNode from the queue and returns them. +func (q *Queue) DequeueAll() []*QueuedNode { q.mut.Lock() defer q.mut.Unlock() all := q.queuedOrder - q.queuedOrder = make([]*ComponentNode, 0) - q.queuedSet = make(map[*ComponentNode]struct{}) + q.queuedOrder = make([]*QueuedNode, 0) + q.queuedSet = make(map[*QueuedNode]struct{}) return all } diff --git a/pkg/flow/internal/controller/queue_test.go b/pkg/flow/internal/controller/queue_test.go index c93fb14ef8fc..c0a7cd930675 100644 --- a/pkg/flow/internal/controller/queue_test.go +++ b/pkg/flow/internal/controller/queue_test.go @@ -9,7 +9,7 @@ import ( ) func TestEnqueueDequeue(t *testing.T) { - tn := &ComponentNode{} + tn := &QueuedNode{} q := NewQueue() q.Enqueue(tn) require.Lenf(t, q.queuedSet, 1, "queue should be 1") @@ -26,7 +26,7 @@ func TestDequeue_Empty(t *testing.T) { } func TestDequeue_InOrder(t *testing.T) { - c1, c2, c3 := &ComponentNode{}, &ComponentNode{}, &ComponentNode{} + c1, c2, c3 := &QueuedNode{}, &QueuedNode{}, &QueuedNode{} q := NewQueue() q.Enqueue(c1) q.Enqueue(c2) @@ -41,7 +41,7 @@ func TestDequeue_InOrder(t *testing.T) { } func TestDequeue_NoDuplicates(t *testing.T) { - c1, c2 := &ComponentNode{}, &ComponentNode{} + c1, c2 := &QueuedNode{}, &QueuedNode{} q := NewQueue() q.Enqueue(c1) q.Enqueue(c1) @@ -58,7 +58,7 @@ func TestDequeue_NoDuplicates(t *testing.T) { } func TestEnqueue_ChannelNotification(t *testing.T) { - c1 := &ComponentNode{} + c1 := &QueuedNode{} q := NewQueue() notificationsCount := atomic.Int32{}