diff --git a/component/all/all.go b/component/all/all.go index 3822deee7c9c..cc4820bf1e07 100644 --- a/component/all/all.go +++ b/component/all/all.go @@ -54,6 +54,7 @@ import ( _ "github.com/grafana/agent/component/loki/source/windowsevent" // Import loki.source.windowsevent _ "github.com/grafana/agent/component/loki/write" // Import loki.write _ "github.com/grafana/agent/component/mimir/rules/kubernetes" // Import mimir.rules.kubernetes + _ "github.com/grafana/agent/component/module" // Import module.module_component _ "github.com/grafana/agent/component/module/file" // Import module.file _ "github.com/grafana/agent/component/module/git" // Import module.git _ "github.com/grafana/agent/component/module/http" // Import module.http diff --git a/component/module/module_component.go b/component/module/module_component.go new file mode 100644 index 000000000000..0f17a854c7fa --- /dev/null +++ b/component/module/module_component.go @@ -0,0 +1,119 @@ +package module + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/grafana/agent/component" +) + +func init() { + component.Register(component.Registration{ + Name: "module_component", + Args: Arguments{}, + Exports: Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments holds values which are used to configure the module. +type Arguments = map[string]any + +// Component implements the module.file component. +type Component struct { + opts component.Options + mod *ModuleComponent + + mut sync.RWMutex + args Arguments + content string + updatedOnce atomic.Bool +} + +var ( + _ component.Component = (*Component)(nil) + _ component.HealthComponent = (*Component)(nil) +) + +// New creates a new module.file component. +func New(o component.Options, args Arguments) (*Component, error) { + m, err := NewModuleComponent(o) + if err != nil { + return nil, err + } + + c := &Component{ + opts: o, + mod: m, + args: args, + } + // we don't update on create because we don't have the content yet + return c, nil +} + +// Run implements component.Component. +func (c *Component) Run(ctx context.Context) error { + c.mod.RunFlowController(ctx) + return nil +} + +// Update implements component.Component. +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + c.setArgs(newArgs) + c.updatedOnce.Store(true) + return c.reload() +} + +// UpdateContent reloads the module with a new config +func (c *Component) UpdateContent(content string) error { + if content != c.getContent() { + c.setContent(content) + return c.reload() + } + return nil +} + +func (c *Component) reload() error { + if c.getContent() == "" || !c.updatedOnce.Load() { + return nil // the module is not yet ready + } + return c.mod.LoadFlowSource(c.getArgs(), c.getContent()) +} + +// CurrentHealth implements component.HealthComponent. +func (c *Component) CurrentHealth() component.Health { + return c.mod.CurrentHealth() +} + +// getArgs is a goroutine safe way to get args +func (c *Component) getArgs() Arguments { + c.mut.RLock() + defer c.mut.RUnlock() + return c.args +} + +// setArgs is a goroutine safe way to set args +func (c *Component) setArgs(args Arguments) { + c.mut.Lock() + c.args = args + c.mut.Unlock() +} + +// getContent is a goroutine safe way to get content +func (c *Component) getContent() string { + c.mut.RLock() + defer c.mut.RUnlock() + return c.content +} + +// setContent is a goroutine safe way to set content +func (c *Component) setContent(content string) { + c.mut.Lock() + c.content = content + c.mut.Unlock() +} diff --git a/integration-tests/tests/scrape-prom-metrics-modules/config.river b/integration-tests/tests/scrape-prom-metrics-modules/config.river new file mode 100644 index 000000000000..ecf186f8dbea --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-modules/config.river @@ -0,0 +1,28 @@ +logging { + level = "debug" +} + +import.file "scrape_module" { + filename = "module.river" +} + +scrape_module "scrape_prom_metrics_modules" { + scrape_endpoint = "localhost:9001" + forward_to = [prometheus.remote_write.scrape_prom_metrics_modules.receiver] +} + +prometheus.remote_write "scrape_prom_metrics_modules" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "scrape_prom_metrics_modules", + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-modules/module.river b/integration-tests/tests/scrape-prom-metrics-modules/module.river new file mode 100644 index 000000000000..702ed9134027 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-modules/module.river @@ -0,0 +1,19 @@ +argument "scrape_endpoint" {} + +argument "forward_to" {} + +argument "scrape_interval" { + optional = true + default = "1s" +} + +prometheus.scrape "scrape_prom_metrics_modules" { + targets = [ + {"__address__" = argument.scrape_endpoint.value}, + ] + forward_to = argument.forward_to.value + scrape_classic_histograms = true + enable_protobuf_negotiation = true + scrape_interval = argument.scrape_interval.value + scrape_timeout = "500ms" +} \ No newline at end of file diff --git a/integration-tests/tests/scrape-prom-metrics-modules/scrape_prom_metrics_modules_test.go b/integration-tests/tests/scrape-prom-metrics-modules/scrape_prom_metrics_modules_test.go new file mode 100644 index 000000000000..e4eee52da7f8 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-modules/scrape_prom_metrics_modules_test.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "strconv" + "testing" + + "github.com/grafana/agent/integration-tests/common" + "github.com/stretchr/testify/assert" +) + +const promURL = "http://localhost:9009/prometheus/api/v1/query?query=" + +func metricQuery(metricName string) string { + return fmt.Sprintf("%s%s{test_name='scrape_prom_metrics_modules'}", promURL, metricName) +} + +func TestScrapePromMetricsModules(t *testing.T) { + metrics := []string{ + // TODO: better differentiate these metric types? + "golang_counter", + "golang_gauge", + "golang_histogram_bucket", + "golang_summary", + "golang_native_histogram", + } + + for _, metric := range metrics { + metric := metric + t.Run(metric, func(t *testing.T) { + t.Parallel() + if metric == "golang_native_histogram" { + assertHistogramData(t, metricQuery(metric), metric) + } else { + assertMetricData(t, metricQuery(metric), metric) + } + }) + } +} + +func assertHistogramData(t *testing.T, query string, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules") + if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) { + histogram := metricResponse.Data.Result[0].Histogram + if assert.NotEmpty(c, histogram.Data.Count) { + count, _ := strconv.Atoi(histogram.Data.Count) + assert.Greater(c, count, 10, "Count should be at some point greater than 10.") + } + if assert.NotEmpty(c, histogram.Data.Sum) { + sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64) + assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.") + } + assert.NotEmpty(c, histogram.Data.Buckets) + assert.Nil(c, metricResponse.Data.Result[0].Value) + } + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit") +} + +func assertMetricData(t *testing.T, query, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules") + assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value) + assert.Nil(c, metricResponse.Data.Result[0].Histogram) + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index ec16333568ee..94b7ba2471c9 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -247,6 +247,7 @@ func (f *Flow) Run(ctx context.Context) { var ( components = f.loader.Components() services = f.loader.Services() + imports = f.loader.Imports() runnables = make([]controller.RunnableNode, 0, len(components)+len(services)) ) @@ -254,6 +255,10 @@ func (f *Flow) Run(ctx context.Context) { runnables = append(runnables, c) } + for _, i := range imports { + runnables = append(runnables, i) + } + // Only the root controller should run services, since modules share the // same service instance as the root. if !f.opts.IsModule { diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index e968e8102fe0..4b3f6e66e19e 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -37,11 +37,14 @@ type Loader struct { // also prevents log spamming with errors. backoffConfig backoff.Config - mut sync.RWMutex - graph *dag.Graph - originalGraph *dag.Graph - componentNodes []*ComponentNode - serviceNodes []*ServiceNode + mut sync.RWMutex + graph *dag.Graph + originalGraph *dag.Graph + componentNodes []*ComponentNode + serviceNodes []*ServiceNode + importNodes map[string]*ImportFileConfigNode + moduleComponentNodes []*ComponentNode + cache *valueCache blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing cm *controllerMetrics @@ -82,6 +85,7 @@ func NewLoader(opts LoaderOptions) *Loader { host: host, componentReg: reg, workerPool: opts.WorkerPool, + importNodes: map[string]*ImportFileConfigNode{}, // This is a reasonable default which should work for most cases. If a component is completely stuck, we would // retry and log an error every 10 seconds, at most. @@ -129,6 +133,7 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co } l.cache.SyncModuleArgs(args) + l.moduleComponentNodes = make([]*ComponentNode, 0) newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks) if diags.HasErrors() { return diags @@ -239,6 +244,17 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co return diags } +func (l *Loader) onImportContentChange(importLabel string, content string) { + for _, node := range l.moduleComponentNodes { + if node.componentName == importLabel { + err := node.UpdateModuleContent(content) + if err != nil { + level.Error(l.log).Log("msg", "could not update the content of the module", "err", err) + } + } + } +} + // Cleanup unregisters any existing metrics and optionally stops the worker pool. func (l *Loader) Cleanup(stopWorkerPool bool) { if stopWorkerPool { @@ -374,7 +390,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con ) for _, block := range configBlocks { - node, newConfigNodeDiags := NewConfigNode(block, l.globals) + node, newConfigNodeDiags := NewConfigNode(block, l.globals, l.onImportContentChange) diags = append(diags, newConfigNodeDiags...) if g.GetByID(node.NodeID()) != nil { @@ -413,6 +429,8 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con g.Add(c) } + l.importNodes = nodeMap.importFileMap + return diags } @@ -444,11 +462,10 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo c.UpdateBlock(block) } else { componentName := block.GetBlockName() - registration, exists := l.componentReg.Get(componentName) - if !exists { + if componentName == "module_component" { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("Unrecognized component name %q", componentName), + Message: fmt.Sprintf("Explicitly creating a %q in a river file is not allowed", componentName), StartPos: block.NamePos.Position(), EndPos: block.NamePos.Add(len(componentName) - 1).Position(), }) @@ -465,6 +482,30 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo continue } + if importNode, exists := l.importNodes[componentName]; exists { + reg, exists := l.componentReg.Get("module_component") + if !exists { + level.Error(l.log).Log("msg", "module_component should exist but the registration is not found") + continue + } + c = NewComponentNode(l.globals, reg, block) + g.Add(c) + g.AddEdge(dag.Edge{From: c, To: importNode}) + l.moduleComponentNodes = append(l.moduleComponentNodes, c) + continue + } + + registration, exists := l.componentReg.Get(componentName) + if !exists { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Unrecognized component name %q", componentName), + StartPos: block.NamePos.Position(), + EndPos: block.NamePos.Add(len(componentName) - 1).Position(), + }) + continue + } + // Create a new component c = NewComponentNode(l.globals, registration, block) } @@ -528,6 +569,12 @@ func (l *Loader) Services() []*ServiceNode { return l.serviceNodes } +func (l *Loader) Imports() map[string]*ImportFileConfigNode { + l.mut.RLock() + defer l.mut.RUnlock() + return l.importNodes +} + // Graph returns a copy of the DAG managed by the Loader. func (l *Loader) Graph() *dag.Graph { l.mut.RLock() diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index b99597809d4b..2f4b2f0b0bcc 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/component" + "github.com/grafana/agent/component/module" "github.com/grafana/agent/pkg/flow/logging" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/grafana/agent/pkg/flow/tracing" @@ -450,3 +451,30 @@ func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) { func (cn *ComponentNode) ModuleIDs() []string { return cn.moduleController.ModuleIDs() } + +// (@wildum) this function is not very nice, it will panic if the managed component is not a module_component +// and it builds the component before it is evaluated. But it's a bit of a chicken-egg problem: +// during the first evaluation cycle : +// - if we evaluate the component first, it does not have the content of its module to start its module. That means that it wont +// be able to set the expected exports for the components that depend on it. +// - if we evaluate the import first, the import node cannot update the module_component with its content because it has not been built yet +// (it exists only when its node has been evaluated) +// +// the current solution solves the problem by building the component before it is evaluated. It wont start the module yet because the first update +// has not been called. It will be called during the first eval of the component (the reflect.DeepEqual(cn.args, argsCopyValue) will always return false +// because we explicitly set the args to nil here) +func (cn *ComponentNode) UpdateModuleContent(newContent string) error { + cn.mut.RLock() + defer cn.mut.RUnlock() + + if cn.managed == nil { + empty := make(map[string]any, 0) + managed, err := cn.reg.Build(cn.managedOpts, empty) + if err != nil { + return fmt.Errorf("building component: %w", err) + } + cn.managed = managed + cn.args = nil + } + return cn.managed.(*module.Component).UpdateContent(newContent) +} diff --git a/pkg/flow/internal/controller/node_config.go b/pkg/flow/internal/controller/node_config.go index d583dc1e1061..ab526040f996 100644 --- a/pkg/flow/internal/controller/node_config.go +++ b/pkg/flow/internal/controller/node_config.go @@ -8,15 +8,16 @@ import ( ) const ( - argumentBlockID = "argument" - exportBlockID = "export" - loggingBlockID = "logging" - tracingBlockID = "tracing" + argumentBlockID = "argument" + exportBlockID = "export" + loggingBlockID = "logging" + tracingBlockID = "tracing" + importFileBlockID = "import.file" ) // NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt. // The underlying config isn't applied until Evaluate is called. -func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, diag.Diagnostics) { +func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals, onImportContentChange func(importLabel string, newContent string)) (BlockNode, diag.Diagnostics) { switch block.GetBlockName() { case argumentBlockID: return NewArgumentConfigNode(block, globals), nil @@ -26,6 +27,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewLoggingConfigNode(block, globals), nil case tracingBlockID: return NewTracingConfigNode(block, globals), nil + case importFileBlockID: + return NewImportFileConfigNode(block, globals, onImportContentChange), nil default: var diags diag.Diagnostics diags.Add(diag.Diagnostic{ @@ -42,20 +45,22 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d // This is helpful when validating node conditions specific to config node // types. type ConfigNodeMap struct { - logging *LoggingConfigNode - tracing *TracingConfigNode - argumentMap map[string]*ArgumentConfigNode - exportMap map[string]*ExportConfigNode + logging *LoggingConfigNode + tracing *TracingConfigNode + argumentMap map[string]*ArgumentConfigNode + exportMap map[string]*ExportConfigNode + importFileMap map[string]*ImportFileConfigNode } // NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called // to populate NewConfigNodeMap. func NewConfigNodeMap() *ConfigNodeMap { return &ConfigNodeMap{ - logging: nil, - tracing: nil, - argumentMap: map[string]*ArgumentConfigNode{}, - exportMap: map[string]*ExportConfigNode{}, + logging: nil, + tracing: nil, + argumentMap: map[string]*ArgumentConfigNode{}, + exportMap: map[string]*ExportConfigNode{}, + importFileMap: map[string]*ImportFileConfigNode{}, } } @@ -73,6 +78,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics { nodeMap.logging = n case *TracingConfigNode: nodeMap.tracing = n + case *ImportFileConfigNode: + nodeMap.importFileMap[n.Label()] = n default: diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, diff --git a/pkg/flow/internal/controller/node_config_import_file.go b/pkg/flow/internal/controller/node_config_import_file.go new file mode 100644 index 000000000000..01406b7b17af --- /dev/null +++ b/pkg/flow/internal/controller/node_config_import_file.go @@ -0,0 +1,226 @@ +package controller + +import ( + "context" + "fmt" + "path" + "path/filepath" + "reflect" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/local/file" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/river/ast" + "github.com/grafana/river/vm" + "github.com/prometheus/client_golang/prometheus" +) + +type ImportFileConfigNode struct { + label string + nodeID string + componentName string + globalID string + fileComponent *file.Component + managedOpts component.Options + registry *prometheus.Registry + onImportContentChange func(importID string, newContent string) + + mut sync.RWMutex + block *ast.BlockStmt // Current River blocks to derive config from + eval *vm.Evaluator + argument component.Arguments + + healthMut sync.RWMutex + evalHealth component.Health // Health of the last evaluate + runHealth component.Health // Health of running the component +} + +var _ BlockNode = (*ImportFileConfigNode)(nil) +var _ RunnableNode = (*ImportFileConfigNode)(nil) + +// NewImportFileConfigNode creates a new ImportFileConfigNode from an initial ast.BlockStmt. +// The underlying config isn't applied until Evaluate is called. +func NewImportFileConfigNode(block *ast.BlockStmt, globals ComponentGlobals, onImportContentChange func(importLabel string, newContent string)) *ImportFileConfigNode { + var ( + id = BlockComponentID(block) + nodeID = id.String() + ) + + initHealth := component.Health{ + Health: component.HealthTypeUnknown, + Message: "component created", + UpdateTime: time.Now(), + } + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + cn := &ImportFileConfigNode{ + globalID: globalID, + label: block.Label, + nodeID: BlockComponentID(block).String(), + componentName: block.GetBlockName(), + onImportContentChange: onImportContentChange, + + block: block, + eval: vm.New(block.Body), + evalHealth: initHealth, + runHealth: initHealth, + } + cn.managedOpts = getImportManagedOptions(globals, cn) + return cn +} + +func getImportManagedOptions(globals ComponentGlobals, cn *ImportFileConfigNode) component.Options { + cn.registry = prometheus.NewRegistry() + return component.Options{ + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.globalID, + }, cn.registry), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), + + DataPath: filepath.Join(globals.DataPath, cn.globalID), + + OnStateChange: cn.UpdateModulesContent, + + GetServiceData: func(name string) (interface{}, error) { + return globals.GetServiceData(name) + }, + } +} + +type importFileConfigBlock struct { + LocalFileArguments file.Arguments `river:",squash"` +} + +// SetToDefault implements river.Defaulter. +func (a *importFileConfigBlock) SetToDefault() { + a.LocalFileArguments = file.DefaultArguments +} + +// Evaluate implements BlockNode and updates the arguments for the managed config block +// by re-evaluating its River block with the provided scope. The managed config block +// will be built the first time Evaluate is called. +// +// Evaluate will return an error if the River block cannot be evaluated or if +// decoding to arguments fails. +func (cn *ImportFileConfigNode) Evaluate(scope *vm.Scope) error { + err := cn.evaluate(scope) + + switch err { + case nil: + cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") + default: + msg := fmt.Sprintf("component evaluation failed: %s", err) + cn.setEvalHealth(component.HealthTypeUnhealthy, msg) + } + return err +} + +func (cn *ImportFileConfigNode) setEvalHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.evalHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportFileConfigNode) evaluate(scope *vm.Scope) error { + cn.mut.Lock() + defer cn.mut.Unlock() + + var argument importFileConfigBlock + if err := cn.eval.Evaluate(scope, &argument); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + if cn.fileComponent == nil { + var err error + cn.fileComponent, err = file.New(cn.managedOpts, argument.LocalFileArguments) + if err != nil { + return fmt.Errorf("creating file component: %w", err) + } + cn.argument = argument + } + + if reflect.DeepEqual(cn.argument, argument) { + // Ignore components which haven't changed. This reduces the cost of + // calling evaluate for components where evaluation is expensive (e.g., if + // re-evaluating requires re-starting some internal logic). + return nil + } + + // Update the existing managed component + if err := cn.fileComponent.Update(argument); err != nil { + return fmt.Errorf("updating component: %w", err) + } + + return nil +} + +// Run runs the managed component in the calling goroutine until ctx is +// canceled. Evaluate must have been called at least once without returning an +// error before calling Run. +// +// Run will immediately return ErrUnevaluated if Evaluate has never been called +// successfully. Otherwise, Run will return nil. +func (cn *ImportFileConfigNode) Run(ctx context.Context) error { + cn.mut.RLock() + managed := cn.fileComponent + cn.mut.RUnlock() + + if managed == nil { + return ErrUnevaluated + } + + cn.setRunHealth(component.HealthTypeHealthy, "started component") + err := cn.fileComponent.Run(ctx) + + var exitMsg string + logger := cn.managedOpts.Logger + if err != nil { + level.Error(logger).Log("msg", "component exited with error", "err", err) + exitMsg = fmt.Sprintf("component shut down with error: %s", err) + } else { + level.Info(logger).Log("msg", "component exited") + exitMsg = "component shut down normally" + } + + cn.setRunHealth(component.HealthTypeExited, exitMsg) + return err +} + +func (cn *ImportFileConfigNode) UpdateModulesContent(e component.Exports) { + cn.onImportContentChange(cn.label, e.(file.Exports).Content.Value) +} + +func (cn *ImportFileConfigNode) setRunHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.runHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportFileConfigNode) Label() string { return cn.label } + +// Block implements BlockNode and returns the current block of the managed config node. +func (cn *ImportFileConfigNode) Block() *ast.BlockStmt { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.block +} + +// NodeID implements dag.Node and returns the unique ID for the config node. +func (cn *ImportFileConfigNode) NodeID() string { return cn.nodeID } diff --git a/pkg/flow/module_import_test.go b/pkg/flow/module_import_test.go new file mode 100644 index 000000000000..c6f0b7cd1fb0 --- /dev/null +++ b/pkg/flow/module_import_test.go @@ -0,0 +1,172 @@ +package flow_test + +// This file contains tests which verify that the Flow controller correctly updates and caches modules' arguments +// and exports in presence of multiple components. + +import ( + "context" + "os" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/internal/testcomponents" + "github.com/stretchr/testify/require" + + _ "github.com/grafana/agent/component/module/string" +) + +func TestImportModule(t *testing.T) { + // We use this module in a Flow config below. + module := ` + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } +` + filename := "my_module" + require.NoError(t, os.WriteFile(filename, []byte(module), 0664)) + + // We send the count increments via module and to the summation component and verify that the updates propagate. + config := ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "test" { + filename = "my_module" + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.exports.output + } +` + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + newModule := ` + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = -10 + } + ` + require.NoError(t, os.WriteFile(filename, []byte(newModule), 0664)) + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == -10 + }, 3*time.Second, 10*time.Millisecond) + require.NoError(t, os.Remove(filename)) +} + +func TestImportModuleNoArgs(t *testing.T) { + // We use this module in a Flow config below. + module := ` +testcomponents.passthrough "pt" { + input = 10 + lag = "1ms" +} + +export "output" { + value = testcomponents.passthrough.pt.output +} +` + filename := "my_module" + require.NoError(t, os.WriteFile(filename, []byte(module), 0664)) + + config := ` +import.file "test" { + filename = "my_module" +} + +test "myModule" { +} + +testcomponents.summation "sum" { + input = test.myModule.exports.output +} +` + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + newModule := ` + testcomponents.passthrough "pt" { + input = -10 + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } +` + require.NoError(t, os.WriteFile(filename, []byte(newModule), 0664)) + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == -10 + }, 3*time.Second, 10*time.Millisecond) + require.NoError(t, os.Remove(filename)) +} diff --git a/pkg/flow/source.go b/pkg/flow/source.go index acd7d2ce2f58..cee34831d00f 100644 --- a/pkg/flow/source.go +++ b/pkg/flow/source.go @@ -60,7 +60,7 @@ func ParseSource(name string, bb []byte) (*Source, error) { case *ast.BlockStmt: fullName := strings.Join(stmt.Name, ".") switch fullName { - case "logging", "tracing", "argument", "export": + case "logging", "tracing", "argument", "export", "import.file": configs = append(configs, stmt) default: components = append(components, stmt)