Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import statement for module 2.0 experiment #5890

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ import (
_ "github.com/grafana/agent/component/loki/source/windowsevent" // Import loki.source.windowsevent
_ "github.com/grafana/agent/component/loki/write" // Import loki.write
_ "github.com/grafana/agent/component/mimir/rules/kubernetes" // Import mimir.rules.kubernetes
_ "github.com/grafana/agent/component/module" // Import module.module_component
_ "github.com/grafana/agent/component/module/file" // Import module.file
_ "github.com/grafana/agent/component/module/git" // Import module.git
_ "github.com/grafana/agent/component/module/http" // Import module.http
119 changes: 119 additions & 0 deletions component/module/module_component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package module

import (
"context"
"sync"
"sync/atomic"

"github.com/grafana/agent/component"
)

func init() {
component.Register(component.Registration{
Name: "module_component",
Args: Arguments{},
Exports: Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

// Arguments holds values which are used to configure the module.
type Arguments = map[string]any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this trick also work for exports? Right now I think you'd need to get exports via namespace.comp_name.exports.something instead of namespace.compo_name.something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one, yes that should work. I tried it with unit tests and it worked :)


// Component implements the module.file component.
type Component struct {
opts component.Options
mod *ModuleComponent

mut sync.RWMutex
args Arguments
content string
updatedOnce atomic.Bool
}

var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

// New creates a new module.file component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := NewModuleComponent(o)
if err != nil {
return nil, err
}

c := &Component{
opts: o,
mod: m,
args: args,
}
// we don't update on create because we don't have the content yet
return c, nil
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
c.mod.RunFlowController(ctx)
return nil
}

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.setArgs(newArgs)
c.updatedOnce.Store(true)
return c.reload()
}

// UpdateContent reloads the module with a new config
func (c *Component) UpdateContent(content string) error {
if content != c.getContent() {
c.setContent(content)
return c.reload()
}
return nil
}

func (c *Component) reload() error {
if c.getContent() == "" || !c.updatedOnce.Load() {
return nil // the module is not yet ready
}
return c.mod.LoadFlowSource(c.getArgs(), c.getContent())
}

// CurrentHealth implements component.HealthComponent.
func (c *Component) CurrentHealth() component.Health {
return c.mod.CurrentHealth()
}

// getArgs is a goroutine safe way to get args
func (c *Component) getArgs() Arguments {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args
}

// setArgs is a goroutine safe way to set args
func (c *Component) setArgs(args Arguments) {
c.mut.Lock()
c.args = args
c.mut.Unlock()
}

// getContent is a goroutine safe way to get content
func (c *Component) getContent() string {
c.mut.RLock()
defer c.mut.RUnlock()
return c.content
}

// setContent is a goroutine safe way to set content
func (c *Component) setContent(content string) {
c.mut.Lock()
c.content = content
c.mut.Unlock()
}
28 changes: 28 additions & 0 deletions integration-tests/tests/scrape-prom-metrics-modules/config.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
logging {
level = "debug"
}

import.file "scrape_module" {
filename = "module.river"
}

scrape_module "scrape_prom_metrics_modules" {
scrape_endpoint = "localhost:9001"
forward_to = [prometheus.remote_write.scrape_prom_metrics_modules.receiver]
}

prometheus.remote_write "scrape_prom_metrics_modules" {
endpoint {
url = "http://localhost:9009/api/v1/push"
send_native_histograms = true
metadata_config {
send_interval = "1s"
}
queue_config {
max_samples_per_send = 100
}
}
external_labels = {
test_name = "scrape_prom_metrics_modules",
}
}
19 changes: 19 additions & 0 deletions integration-tests/tests/scrape-prom-metrics-modules/module.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
argument "scrape_endpoint" {}

argument "forward_to" {}

argument "scrape_interval" {
optional = true
default = "1s"
}

prometheus.scrape "scrape_prom_metrics_modules" {
targets = [
{"__address__" = argument.scrape_endpoint.value},
]
forward_to = argument.forward_to.value
scrape_classic_histograms = true
enable_protobuf_negotiation = true
scrape_interval = argument.scrape_interval.value
scrape_timeout = "500ms"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"strconv"
"testing"

"github.com/grafana/agent/integration-tests/common"
"github.com/stretchr/testify/assert"
)

const promURL = "http://localhost:9009/prometheus/api/v1/query?query="

func metricQuery(metricName string) string {
return fmt.Sprintf("%s%s{test_name='scrape_prom_metrics_modules'}", promURL, metricName)
}

func TestScrapePromMetricsModules(t *testing.T) {
metrics := []string{
// TODO: better differentiate these metric types?
"golang_counter",
"golang_gauge",
"golang_histogram_bucket",
"golang_summary",
"golang_native_histogram",
}

for _, metric := range metrics {
metric := metric
t.Run(metric, func(t *testing.T) {
t.Parallel()
if metric == "golang_native_histogram" {
assertHistogramData(t, metricQuery(metric), metric)
} else {
assertMetricData(t, metricQuery(metric), metric)
}
})
}
}

func assertHistogramData(t *testing.T, query string, expectedMetric string) {
var metricResponse common.MetricResponse
assert.EventuallyWithT(t, func(c *assert.CollectT) {
err := common.FetchDataFromURL(query, &metricResponse)
assert.NoError(c, err)
if assert.NotEmpty(c, metricResponse.Data.Result) {
assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric)
assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules")
if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) {
histogram := metricResponse.Data.Result[0].Histogram
if assert.NotEmpty(c, histogram.Data.Count) {
count, _ := strconv.Atoi(histogram.Data.Count)
assert.Greater(c, count, 10, "Count should be at some point greater than 10.")
}
if assert.NotEmpty(c, histogram.Data.Sum) {
sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64)
assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.")
}
assert.NotEmpty(c, histogram.Data.Buckets)
assert.Nil(c, metricResponse.Data.Result[0].Value)
}
}
}, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit")
}

func assertMetricData(t *testing.T, query, expectedMetric string) {
var metricResponse common.MetricResponse
assert.EventuallyWithT(t, func(c *assert.CollectT) {
err := common.FetchDataFromURL(query, &metricResponse)
assert.NoError(c, err)
if assert.NotEmpty(c, metricResponse.Data.Result) {
assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric)
assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules")
assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value)
assert.Nil(c, metricResponse.Data.Result[0].Histogram)
}
}, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit")
}
5 changes: 5 additions & 0 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
@@ -247,13 +247,18 @@ func (f *Flow) Run(ctx context.Context) {
var (
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services))
)
for _, c := range components {
runnables = append(runnables, c)
}

for _, i := range imports {
runnables = append(runnables, i)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
65 changes: 56 additions & 9 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
@@ -37,11 +37,14 @@ type Loader struct {
// also prevents log spamming with errors.
backoffConfig backoff.Config

mut sync.RWMutex
graph *dag.Graph
originalGraph *dag.Graph
componentNodes []*ComponentNode
serviceNodes []*ServiceNode
mut sync.RWMutex
graph *dag.Graph
originalGraph *dag.Graph
componentNodes []*ComponentNode
serviceNodes []*ServiceNode
importNodes map[string]*ImportFileConfigNode
moduleComponentNodes []*ComponentNode

cache *valueCache
blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing
cm *controllerMetrics
@@ -82,6 +85,7 @@ func NewLoader(opts LoaderOptions) *Loader {
host: host,
componentReg: reg,
workerPool: opts.WorkerPool,
importNodes: map[string]*ImportFileConfigNode{},

// This is a reasonable default which should work for most cases. If a component is completely stuck, we would
// retry and log an error every 10 seconds, at most.
@@ -129,6 +133,7 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co
}
l.cache.SyncModuleArgs(args)

l.moduleComponentNodes = make([]*ComponentNode, 0)
newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks)
if diags.HasErrors() {
return diags
@@ -239,6 +244,17 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co
return diags
}

func (l *Loader) onImportContentChange(importLabel string, content string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a few other options for how the updates to contents of an import can be populated and some of them may alleviate many of the issues that you mention in the comments. Let's discuss on our call.

for _, node := range l.moduleComponentNodes {
if node.componentName == importLabel {
err := node.UpdateModuleContent(content)
if err != nil {
level.Error(l.log).Log("msg", "could not update the content of the module", "err", err)
}
}
}
}

// Cleanup unregisters any existing metrics and optionally stops the worker pool.
func (l *Loader) Cleanup(stopWorkerPool bool) {
if stopWorkerPool {
@@ -374,7 +390,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
)

for _, block := range configBlocks {
node, newConfigNodeDiags := NewConfigNode(block, l.globals)
node, newConfigNodeDiags := NewConfigNode(block, l.globals, l.onImportContentChange)
diags = append(diags, newConfigNodeDiags...)

if g.GetByID(node.NodeID()) != nil {
@@ -413,6 +429,8 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
g.Add(c)
}

l.importNodes = nodeMap.importFileMap

return diags
}

@@ -444,11 +462,10 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo
c.UpdateBlock(block)
} else {
componentName := block.GetBlockName()
registration, exists := l.componentReg.Get(componentName)
if !exists {
if componentName == "module_component" {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("Unrecognized component name %q", componentName),
Message: fmt.Sprintf("Explicitly creating a %q in a river file is not allowed", componentName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could be not adding this component to registry at all. It's already quite special.

StartPos: block.NamePos.Position(),
EndPos: block.NamePos.Add(len(componentName) - 1).Position(),
})
@@ -465,6 +482,30 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo
continue
}

if importNode, exists := l.importNodes[componentName]; exists {
reg, exists := l.componentReg.Get("module_component")
if !exists {
level.Error(l.log).Log("msg", "module_component should exist but the registration is not found")
continue
}
c = NewComponentNode(l.globals, reg, block)
g.Add(c)
g.AddEdge(dag.Edge{From: c, To: importNode})
l.moduleComponentNodes = append(l.moduleComponentNodes, c)
continue
}

registration, exists := l.componentReg.Get(componentName)
if !exists {
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("Unrecognized component name %q", componentName),
StartPos: block.NamePos.Position(),
EndPos: block.NamePos.Add(len(componentName) - 1).Position(),
})
continue
}

// Create a new component
c = NewComponentNode(l.globals, registration, block)
}
@@ -528,6 +569,12 @@ func (l *Loader) Services() []*ServiceNode {
return l.serviceNodes
}

func (l *Loader) Imports() map[string]*ImportFileConfigNode {
l.mut.RLock()
defer l.mut.RUnlock()
return l.importNodes
}

// Graph returns a copy of the DAG managed by the Loader.
func (l *Loader) Graph() *dag.Graph {
l.mut.RLock()
28 changes: 28 additions & 0 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/pkg/flow/tracing"
@@ -450,3 +451,30 @@ func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) {
func (cn *ComponentNode) ModuleIDs() []string {
return cn.moduleController.ModuleIDs()
}

// (@wildum) this function is not very nice, it will panic if the managed component is not a module_component
// and it builds the component before it is evaluated. But it's a bit of a chicken-egg problem:
// during the first evaluation cycle :
// - if we evaluate the component first, it does not have the content of its module to start its module. That means that it wont
// be able to set the expected exports for the components that depend on it.
// - if we evaluate the import first, the import node cannot update the module_component with its content because it has not been built yet
// (it exists only when its node has been evaluated)
//
// the current solution solves the problem by building the component before it is evaluated. It wont start the module yet because the first update
// has not been called. It will be called during the first eval of the component (the reflect.DeepEqual(cn.args, argsCopyValue) will always return false
// because we explicitly set the args to nil here)
func (cn *ComponentNode) UpdateModuleContent(newContent string) error {
cn.mut.RLock()
defer cn.mut.RUnlock()

if cn.managed == nil {
empty := make(map[string]any, 0)
managed, err := cn.reg.Build(cn.managedOpts, empty)
if err != nil {
return fmt.Errorf("building component: %w", err)
}
cn.managed = managed
cn.args = nil
}
return cn.managed.(*module.Component).UpdateContent(newContent)
}
33 changes: 20 additions & 13 deletions pkg/flow/internal/controller/node_config.go
Original file line number Diff line number Diff line change
@@ -8,15 +8,16 @@ import (
)

const (
argumentBlockID = "argument"
exportBlockID = "export"
loggingBlockID = "logging"
tracingBlockID = "tracing"
argumentBlockID = "argument"
exportBlockID = "export"
loggingBlockID = "logging"
tracingBlockID = "tracing"
importFileBlockID = "import.file"
)

// NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt.
// The underlying config isn't applied until Evaluate is called.
func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, diag.Diagnostics) {
func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals, onImportContentChange func(importLabel string, newContent string)) (BlockNode, diag.Diagnostics) {
switch block.GetBlockName() {
case argumentBlockID:
return NewArgumentConfigNode(block, globals), nil
@@ -26,6 +27,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewLoggingConfigNode(block, globals), nil
case tracingBlockID:
return NewTracingConfigNode(block, globals), nil
case importFileBlockID:
return NewImportFileConfigNode(block, globals, onImportContentChange), nil
default:
var diags diag.Diagnostics
diags.Add(diag.Diagnostic{
@@ -42,20 +45,22 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
// This is helpful when validating node conditions specific to config node
// types.
type ConfigNodeMap struct {
logging *LoggingConfigNode
tracing *TracingConfigNode
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
logging *LoggingConfigNode
tracing *TracingConfigNode
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
importFileMap map[string]*ImportFileConfigNode
}

// NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called
// to populate NewConfigNodeMap.
func NewConfigNodeMap() *ConfigNodeMap {
return &ConfigNodeMap{
logging: nil,
tracing: nil,
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
logging: nil,
tracing: nil,
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
importFileMap: map[string]*ImportFileConfigNode{},
}
}

@@ -73,6 +78,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics {
nodeMap.logging = n
case *TracingConfigNode:
nodeMap.tracing = n
case *ImportFileConfigNode:
nodeMap.importFileMap[n.Label()] = n
default:
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
226 changes: 226 additions & 0 deletions pkg/flow/internal/controller/node_config_import_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package controller

import (
"context"
"fmt"
"path"
"path/filepath"
"reflect"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/local/file"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/river/ast"
"github.com/grafana/river/vm"
"github.com/prometheus/client_golang/prometheus"
)

type ImportFileConfigNode struct {
label string
nodeID string
componentName string
globalID string
fileComponent *file.Component
managedOpts component.Options
registry *prometheus.Registry
onImportContentChange func(importID string, newContent string)

mut sync.RWMutex
block *ast.BlockStmt // Current River blocks to derive config from
eval *vm.Evaluator
argument component.Arguments

healthMut sync.RWMutex
evalHealth component.Health // Health of the last evaluate
runHealth component.Health // Health of running the component
}

var _ BlockNode = (*ImportFileConfigNode)(nil)
var _ RunnableNode = (*ImportFileConfigNode)(nil)

// NewImportFileConfigNode creates a new ImportFileConfigNode from an initial ast.BlockStmt.
// The underlying config isn't applied until Evaluate is called.
func NewImportFileConfigNode(block *ast.BlockStmt, globals ComponentGlobals, onImportContentChange func(importLabel string, newContent string)) *ImportFileConfigNode {
var (
id = BlockComponentID(block)
nodeID = id.String()
)

initHealth := component.Health{
Health: component.HealthTypeUnknown,
Message: "component created",
UpdateTime: time.Now(),
}
globalID := nodeID
if globals.ControllerID != "" {
globalID = path.Join(globals.ControllerID, nodeID)
}
cn := &ImportFileConfigNode{
globalID: globalID,
label: block.Label,
nodeID: BlockComponentID(block).String(),
componentName: block.GetBlockName(),
onImportContentChange: onImportContentChange,

block: block,
eval: vm.New(block.Body),
evalHealth: initHealth,
runHealth: initHealth,
}
cn.managedOpts = getImportManagedOptions(globals, cn)
return cn
}

func getImportManagedOptions(globals ComponentGlobals, cn *ImportFileConfigNode) component.Options {
cn.registry = prometheus.NewRegistry()
return component.Options{
ID: cn.globalID,
Logger: log.With(globals.Logger, "component", cn.globalID),
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{
"component_id": cn.globalID,
}, cn.registry),
Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID),

DataPath: filepath.Join(globals.DataPath, cn.globalID),

OnStateChange: cn.UpdateModulesContent,

GetServiceData: func(name string) (interface{}, error) {
return globals.GetServiceData(name)
},
}
}

type importFileConfigBlock struct {
LocalFileArguments file.Arguments `river:",squash"`
}

// SetToDefault implements river.Defaulter.
func (a *importFileConfigBlock) SetToDefault() {
a.LocalFileArguments = file.DefaultArguments
}

// Evaluate implements BlockNode and updates the arguments for the managed config block
// by re-evaluating its River block with the provided scope. The managed config block
// will be built the first time Evaluate is called.
//
// Evaluate will return an error if the River block cannot be evaluated or if
// decoding to arguments fails.
func (cn *ImportFileConfigNode) Evaluate(scope *vm.Scope) error {
err := cn.evaluate(scope)

switch err {
case nil:
cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")
default:
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
}
return err
}

func (cn *ImportFileConfigNode) setEvalHealth(t component.HealthType, msg string) {
cn.healthMut.Lock()
defer cn.healthMut.Unlock()

cn.evalHealth = component.Health{
Health: t,
Message: msg,
UpdateTime: time.Now(),
}
}

func (cn *ImportFileConfigNode) evaluate(scope *vm.Scope) error {
cn.mut.Lock()
defer cn.mut.Unlock()

var argument importFileConfigBlock
if err := cn.eval.Evaluate(scope, &argument); err != nil {
return fmt.Errorf("decoding River: %w", err)
}
if cn.fileComponent == nil {
var err error
cn.fileComponent, err = file.New(cn.managedOpts, argument.LocalFileArguments)
if err != nil {
return fmt.Errorf("creating file component: %w", err)
}
cn.argument = argument
}

if reflect.DeepEqual(cn.argument, argument) {
// Ignore components which haven't changed. This reduces the cost of
// calling evaluate for components where evaluation is expensive (e.g., if
// re-evaluating requires re-starting some internal logic).
return nil
}

// Update the existing managed component
if err := cn.fileComponent.Update(argument); err != nil {
return fmt.Errorf("updating component: %w", err)
}

return nil
}

// Run runs the managed component in the calling goroutine until ctx is
// canceled. Evaluate must have been called at least once without returning an
// error before calling Run.
//
// Run will immediately return ErrUnevaluated if Evaluate has never been called
// successfully. Otherwise, Run will return nil.
func (cn *ImportFileConfigNode) Run(ctx context.Context) error {
cn.mut.RLock()
managed := cn.fileComponent
cn.mut.RUnlock()

if managed == nil {
return ErrUnevaluated
}

cn.setRunHealth(component.HealthTypeHealthy, "started component")
err := cn.fileComponent.Run(ctx)

var exitMsg string
logger := cn.managedOpts.Logger
if err != nil {
level.Error(logger).Log("msg", "component exited with error", "err", err)
exitMsg = fmt.Sprintf("component shut down with error: %s", err)
} else {
level.Info(logger).Log("msg", "component exited")
exitMsg = "component shut down normally"
}

cn.setRunHealth(component.HealthTypeExited, exitMsg)
return err
}

func (cn *ImportFileConfigNode) UpdateModulesContent(e component.Exports) {
cn.onImportContentChange(cn.label, e.(file.Exports).Content.Value)
}

func (cn *ImportFileConfigNode) setRunHealth(t component.HealthType, msg string) {
cn.healthMut.Lock()
defer cn.healthMut.Unlock()

cn.runHealth = component.Health{
Health: t,
Message: msg,
UpdateTime: time.Now(),
}
}

func (cn *ImportFileConfigNode) Label() string { return cn.label }

// Block implements BlockNode and returns the current block of the managed config node.
func (cn *ImportFileConfigNode) Block() *ast.BlockStmt {
cn.mut.RLock()
defer cn.mut.RUnlock()
return cn.block
}

// NodeID implements dag.Node and returns the unique ID for the config node.
func (cn *ImportFileConfigNode) NodeID() string { return cn.nodeID }
172 changes: 172 additions & 0 deletions pkg/flow/module_import_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package flow_test

// This file contains tests which verify that the Flow controller correctly updates and caches modules' arguments
// and exports in presence of multiple components.

import (
"context"
"os"
"testing"
"time"

"github.com/grafana/agent/pkg/flow"
"github.com/grafana/agent/pkg/flow/internal/testcomponents"
"github.com/stretchr/testify/require"

_ "github.com/grafana/agent/component/module/string"
)

func TestImportModule(t *testing.T) {
// We use this module in a Flow config below.
module := `
argument "input" {
optional = false
}
testcomponents.passthrough "pt" {
input = argument.input.value
lag = "1ms"
}
export "output" {
value = testcomponents.passthrough.pt.output
}
`
filename := "my_module"
require.NoError(t, os.WriteFile(filename, []byte(module), 0664))

// We send the count increments via module and to the summation component and verify that the updates propagate.
config := `
testcomponents.count "inc" {
frequency = "10ms"
max = 10
}
import.file "test" {
filename = "my_module"
}
test "myModule" {
input = testcomponents.count.inc.count
}
testcomponents.summation "sum" {
input = test.myModule.exports.output
}
`

ctrl := flow.New(testOptions(t))
f, err := flow.ParseSource(t.Name(), []byte(config))
require.NoError(t, err)
require.NotNil(t, f)

err = ctrl.LoadSource(f, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
ctrl.Run(ctx)
close(done)
}()
defer func() {
cancel()
<-done
}()

require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded == 10
}, 3*time.Second, 10*time.Millisecond)

newModule := `
argument "input" {
optional = false
}
testcomponents.passthrough "pt" {
input = argument.input.value
lag = "1ms"
}
export "output" {
value = -10
}
`
require.NoError(t, os.WriteFile(filename, []byte(newModule), 0664))
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded == -10
}, 3*time.Second, 10*time.Millisecond)
require.NoError(t, os.Remove(filename))
}

func TestImportModuleNoArgs(t *testing.T) {
// We use this module in a Flow config below.
module := `
testcomponents.passthrough "pt" {
input = 10
lag = "1ms"
}
export "output" {
value = testcomponents.passthrough.pt.output
}
`
filename := "my_module"
require.NoError(t, os.WriteFile(filename, []byte(module), 0664))

config := `
import.file "test" {
filename = "my_module"
}
test "myModule" {
}
testcomponents.summation "sum" {
input = test.myModule.exports.output
}
`

ctrl := flow.New(testOptions(t))
f, err := flow.ParseSource(t.Name(), []byte(config))
require.NoError(t, err)
require.NotNil(t, f)

err = ctrl.LoadSource(f, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
ctrl.Run(ctx)
close(done)
}()
defer func() {
cancel()
<-done
}()

require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded == 10
}, 3*time.Second, 10*time.Millisecond)

newModule := `
testcomponents.passthrough "pt" {
input = -10
lag = "1ms"
}
export "output" {
value = testcomponents.passthrough.pt.output
}
`
require.NoError(t, os.WriteFile(filename, []byte(newModule), 0664))
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
return export.LastAdded == -10
}, 3*time.Second, 10*time.Millisecond)
require.NoError(t, os.Remove(filename))
}
2 changes: 1 addition & 1 deletion pkg/flow/source.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ func ParseSource(name string, bb []byte) (*Source, error) {
case *ast.BlockStmt:
fullName := strings.Join(stmt.Name, ".")
switch fullName {
case "logging", "tracing", "argument", "export":
case "logging", "tracing", "argument", "export", "import.file":
configs = append(configs, stmt)
default:
components = append(components, stmt)