Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow other nodes than the component nodes to have dependants #6225

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,29 @@
// when evaluating the configuration for a component will always be reported as
// unhealthy until the next successful evaluation.
//
// # Component Evaluation
// # Node Evaluation
//
// The process of converting the River block associated with a component into
// the appropriate Go struct is called "component evaluation."
// The process of converting the River block associated with a node into
// the appropriate Go struct is called "node evaluation."
//
// Components are only evaluated after all components they reference have been
// Nodes are only evaluated after all nodes they reference have been
// evaluated; cyclic dependencies are invalid.
//
// If a component updates its Exports at runtime, other components which directly
// or indirectly reference the updated component will have their Arguments
// If a node updates its Exports at runtime, other nodes which directly
// or indirectly reference the updated node will have their Arguments
// re-evaluated.
//
// The arguments and exports for a component will be left in their last valid
// state if a component shuts down or is given an invalid config. This prevents
// a domino effect of a single failed component taking down other components
// The arguments and exports for a node will be left in their last valid
// state if a node shuts down or is given an invalid config. This prevents
// a domino effect of a single failed node taking down other node
// which are otherwise healthy.
package flow

import (
"context"
"fmt"
"sync"
"time"

"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/worker"
Expand Down Expand Up @@ -185,9 +186,9 @@ func newController(o controllerOptions) *Flow {
Logger: log,
TraceProvider: tracer,
DataPath: o.DataPath,
OnComponentUpdate: func(cn *controller.ComponentNode) {
// Changed components should be queued for reevaluation.
f.updateQueue.Enqueue(cn)
OnBlockNodeUpdate: func(cn controller.BlockNode) {
// Changed node should be queued for reevaluation.
f.updateQueue.Enqueue(&controller.QueuedNode{Node: cn, LastUpdatedTime: time.Now()})
},
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
Expand Down Expand Up @@ -236,8 +237,8 @@ func (f *Flow) Run(ctx context.Context) {
return

case <-f.updateQueue.Chan():
// Evaluate all components that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two components have the same dependency, and the first time
// Evaluate all nodes that have been updated. Sending the entire batch together will improve
// throughput - it prevents the situation where two nodes have the same dependency, and the first time
// it's picked up by the worker pool and the second time it's enqueued again, resulting in more evaluations.
all := f.updateQueue.DequeueAll()
f.loader.EvaluateDependants(ctx, all)
Expand Down
24 changes: 13 additions & 11 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,13 @@ func (l *Loader) OriginalGraph() *dag.Graph {
return l.originalGraph.Clone()
}

// EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the
// workerPool. It should be called whenever components update their exports.
// It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before
// EvaluateDependants sends nodes which depend directly on nodes in updatedNodes for evaluation to the
// workerPool. It should be called whenever nodes update their exports.
// It is beneficial to call EvaluateDependants with a batch of nodes, as it will enqueue the entire batch before
// the worker pool starts to evaluate them, resulting in smaller number of total evaluations when
// node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until
// it succeeds or until the ctx is cancelled.
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) {
func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedNode) {
if len(updatedNodes) == 0 {
return
}
Expand All @@ -577,12 +577,14 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
l.mut.RLock()
defer l.mut.RUnlock()

dependenciesToParentsMap := make(map[dag.Node]*ComponentNode)
dependenciesToParentsMap := make(map[dag.Node]*QueuedNode)
for _, parent := range updatedNodes {
// Make sure we're in-sync with the current exports of parent.
l.cache.CacheExports(parent.ID(), parent.Exports())
if componentNode, ok := parent.Node.(*ComponentNode); ok {
l.cache.CacheExports(componentNode.ID(), componentNode.Exports())
}
// We collect all nodes directly incoming to parent.
_ = dag.WalkIncomingNodes(l.graph, parent, func(n dag.Node) error {
_ = dag.WalkIncomingNodes(l.graph, parent.Node, func(n dag.Node) error {
dependenciesToParentsMap[n] = parent
return nil
})
Expand All @@ -595,7 +597,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
for n, parent := range dependenciesToParentsMap {
dependantCtx, span := tracer.Start(spanCtx, "SubmitForEvaluation", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.NodeID()))
span.SetAttributes(attribute.String("originator_id", parent.Node.NodeID()))

// Submit for asynchronous evaluation with retries and backoff. Don't use range variables in the closure.
var (
Expand All @@ -614,7 +616,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone
"and cannot keep up with evaluating components - will retry",
"err", err,
"node_id", n.NodeID(),
"originator_id", parent.NodeID(),
"originator_id", parent.Node.NodeID(),
"retries", retryBackoff.NumRetries(),
)
retryBackoff.Wait()
Expand All @@ -637,9 +639,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone

// concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to
// a worker pool for asynchronous evaluation.
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) {
func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *QueuedNode) {
start := time.Now()
l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds())
l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdatedTime).Seconds())
_, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", n.NodeID()))
defer span.End()
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestLoader(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return nil
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
return fakeModuleController{}
Expand Down
11 changes: 4 additions & 7 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/grafana/river/vm"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)

// ComponentID is a fully-qualified name of a component. Each element in
Expand Down Expand Up @@ -66,7 +65,7 @@ type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
ControllerID string // ID of controller.
Expand All @@ -90,8 +89,7 @@ type ComponentNode struct {
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
lastUpdateTime atomic.Time
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate

mut sync.RWMutex
block *ast.BlockStmt // Current River block to derive args from
Expand Down Expand Up @@ -146,7 +144,7 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnComponentUpdate: globals.OnComponentUpdate,
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,

block: b,
eval: vm.New(b.Body),
Expand Down Expand Up @@ -379,8 +377,7 @@ func (cn *ComponentNode) setExports(e component.Exports) {

if changed {
// Inform the controller that we have new exports.
cn.lastUpdateTime.Store(time.Now())
cn.OnComponentUpdate(cn)
cn.OnBlockNodeUpdate(cn)
}
}

Expand Down
33 changes: 19 additions & 14 deletions pkg/flow/internal/controller/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,37 @@ package controller

import (
"sync"
"time"
)

// Queue is a thread-safe, insertion-ordered set of components.
// Queue is a thread-safe, insertion-ordered set of nodes.
//
// Queue is intended for tracking components that have updated their Exports
// for later reevaluation.
// Queue is intended for tracking nodes that have been updated for later reevaluation.
type Queue struct {
mut sync.Mutex
queuedSet map[*ComponentNode]struct{}
queuedOrder []*ComponentNode
queuedSet map[*QueuedNode]struct{}
queuedOrder []*QueuedNode

updateCh chan struct{}
}

type QueuedNode struct {
Node BlockNode
LastUpdatedTime time.Time
}

// NewQueue returns a new queue.
func NewQueue() *Queue {
return &Queue{
updateCh: make(chan struct{}, 1),
queuedSet: make(map[*ComponentNode]struct{}),
queuedOrder: make([]*ComponentNode, 0),
queuedSet: make(map[*QueuedNode]struct{}),
queuedOrder: make([]*QueuedNode, 0),
}
}

// Enqueue inserts a new component into the Queue. Enqueue is a no-op if the
// component is already in the Queue.
func (q *Queue) Enqueue(c *ComponentNode) {
// Enqueue inserts a new BlockNode into the Queue. Enqueue is a no-op if the
// BlockNode is already in the Queue.
func (q *Queue) Enqueue(c *QueuedNode) {
q.mut.Lock()
defer q.mut.Unlock()

Expand All @@ -47,14 +52,14 @@ func (q *Queue) Enqueue(c *ComponentNode) {
// Chan returns a channel which is written to when the queue is non-empty.
func (q *Queue) Chan() <-chan struct{} { return q.updateCh }

// DequeueAll removes all components from the queue and returns them.
func (q *Queue) DequeueAll() []*ComponentNode {
// DequeueAll removes all BlockNode from the queue and returns them.
func (q *Queue) DequeueAll() []*QueuedNode {
q.mut.Lock()
defer q.mut.Unlock()

all := q.queuedOrder
q.queuedOrder = make([]*ComponentNode, 0)
q.queuedSet = make(map[*ComponentNode]struct{})
q.queuedOrder = make([]*QueuedNode, 0)
q.queuedSet = make(map[*QueuedNode]struct{})

return all
}
8 changes: 4 additions & 4 deletions pkg/flow/internal/controller/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestEnqueueDequeue(t *testing.T) {
tn := &ComponentNode{}
tn := &QueuedNode{}
q := NewQueue()
q.Enqueue(tn)
require.Lenf(t, q.queuedSet, 1, "queue should be 1")
Expand All @@ -26,7 +26,7 @@ func TestDequeue_Empty(t *testing.T) {
}

func TestDequeue_InOrder(t *testing.T) {
c1, c2, c3 := &ComponentNode{}, &ComponentNode{}, &ComponentNode{}
c1, c2, c3 := &QueuedNode{}, &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c2)
Expand All @@ -41,7 +41,7 @@ func TestDequeue_InOrder(t *testing.T) {
}

func TestDequeue_NoDuplicates(t *testing.T) {
c1, c2 := &ComponentNode{}, &ComponentNode{}
c1, c2 := &QueuedNode{}, &QueuedNode{}
q := NewQueue()
q.Enqueue(c1)
q.Enqueue(c1)
Expand All @@ -58,7 +58,7 @@ func TestDequeue_NoDuplicates(t *testing.T) {
}

func TestEnqueue_ChannelNotification(t *testing.T) {
c1 := &ComponentNode{}
c1 := &QueuedNode{}
q := NewQueue()

notificationsCount := atomic.Int32{}
Expand Down