diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 4b9f7077ed57..11e6f260ca05 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -19,5 +19,7 @@ jobs: go-version: "1.21" - name: Set OTEL Exporter Endpoint run: echo "OTEL_EXPORTER_ENDPOINT=172.17.0.1:4318" >> $GITHUB_ENV + - name: Set Module HTTP Endpoint + run: echo "MODULE_HTTP_ENDPOINT=http://172.17.0.1:8090/module.river" >> $GITHUB_ENV - name: Run tests run: make integration-test diff --git a/component/module/file/file.go b/component/module/file/file.go index e40c5dc9ca48..22d37bbe44bf 100644 --- a/component/module/file/file.go +++ b/component/module/file/file.go @@ -9,13 +9,15 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/local/file" "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/river/rivertypes" ) func init() { component.Register(component.Registration{ - Name: "module.file", - Args: Arguments{}, + Name: "module.file", + Args: Arguments{}, + //nolint:staticcheck Exports: module.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { @@ -58,6 +60,7 @@ var ( // New creates a new module.file component. func New(o component.Options, args Arguments) (*Component, error) { + //nolint:staticcheck m, err := module.NewModuleComponent(o) if err != nil { return nil, err @@ -88,7 +91,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone if !c.inUpdate.Load() && c.isCreated.Load() { // Any errors found here are reported via component health - _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value) + _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions()) } } @@ -135,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error { // Force a content load here and bubble up any error. This will catch problems // on initial load. - return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value) + return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions()) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/git/git.go b/component/module/git/git.go index 607fcd4577a6..047ef5722f68 100644 --- a/component/module/git/git.go +++ b/component/module/git/git.go @@ -13,13 +13,15 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/module" "github.com/grafana/agent/internal/vcs" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/logging/level" ) func init() { component.Register(component.Registration{ - Name: "module.git", - Args: Arguments{}, + Name: "module.git", + Args: Arguments{}, + //nolint:staticcheck Exports: module.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { @@ -74,6 +76,7 @@ var ( // New creates a new module.git component. func New(o component.Options, args Arguments) (*Component, error) { + //nolint:staticcheck m, err := module.NewModuleComponent(o) if err != nil { return nil, err @@ -239,7 +242,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error { return err } - return c.mod.LoadFlowSource(args.Arguments, string(bb)) + return c.mod.LoadFlowSource(args.Arguments, string(bb), config.DefaultLoaderConfigOptions()) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/http/http.go b/component/module/http/http.go index bc1be2158fdb..0bcb6f4d9de7 100644 --- a/component/module/http/http.go +++ b/component/module/http/http.go @@ -9,13 +9,15 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/module" remote_http "github.com/grafana/agent/component/remote/http" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/river/rivertypes" ) func init() { component.Register(component.Registration{ - Name: "module.http", - Args: Arguments{}, + Name: "module.http", + Args: Arguments{}, + //nolint:staticcheck Exports: module.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { @@ -57,6 +59,7 @@ var ( // New creates a new module.http component. func New(o component.Options, args Arguments) (*Component, error) { + //nolint:staticcheck m, err := module.NewModuleComponent(o) if err != nil { return nil, err @@ -87,7 +90,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http. if !c.inUpdate.Load() && c.isCreated.Load() { // Any errors found here are reported via component health - _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value) + _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions()) } } @@ -134,7 +137,7 @@ func (c *Component) Update(args component.Arguments) error { // Force a content load here and bubble up any error. This will catch problems // on initial load. - return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value) + return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions()) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/module.go b/component/module/module.go index 7995fdbca5c9..4c0a4c62bb27 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/logging/level" ) @@ -16,22 +17,40 @@ type ModuleComponent struct { opts component.Options mod component.Module - mut sync.RWMutex - health component.Health - latestContent string - latestArgs map[string]any + mut sync.RWMutex + health component.Health + latestContent string + latestArgs map[string]any + latestLoaderConfigOptions config.LoaderConfigOptions } -// Exports holds values which are exported from the run module. +// Deprecated: Exports holds values which are exported from the run module. New modules use map[string]any directly. type Exports struct { // Exports exported from the running module. Exports map[string]any `river:"exports,block"` } -// NewModuleComponent initializes a new ModuleComponent. +var _ component.Component = (*ModuleComponent)(nil) + +// NewModuleComponentV2 initializes a new ModuleComponent. +// Compared to the previous constructor, the export is simply map[string]any instead of the Exports type containing the map. +func NewModuleComponentV2(o component.Options) (*ModuleComponent, error) { + c := &ModuleComponent{ + opts: o, + latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(), + } + var err error + c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) { + c.opts.OnStateChange(exports) + }) + return c, err +} + +// Deprecated: Use NewModuleComponentV2 instead. func NewModuleComponent(o component.Options) (*ModuleComponent, error) { c := &ModuleComponent{ - opts: o, + opts: o, + latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(), } var err error c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) { @@ -43,12 +62,12 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) { // LoadFlowSource loads the flow controller with the current component source. // It will set the component health in addition to return the error so that the consumer can rely on either or both. // If the content is the same as the last time it was successfully loaded, it will not be reloaded. -func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error { - if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { +func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string, options config.LoaderConfigOptions) error { + if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() && reflect.DeepEqual(options, c.getLatestLoaderConfigOptions()) { return nil } - err := c.mod.LoadConfig([]byte(contentValue), args) + err := c.mod.LoadConfig([]byte(contentValue), args, options) if err != nil { c.setHealth(component.Health{ Health: component.HealthTypeUnhealthy, @@ -61,6 +80,7 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin c.setLatestArgs(args) c.setLatestContent(contentValue) + c.setLatestLoaderConfigOptions(options) c.setHealth(component.Health{ Health: component.HealthTypeHealthy, Message: "module content loaded", @@ -70,6 +90,17 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin return nil } +// Run implements component.Component. +func (c *ModuleComponent) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +// Update implements component.Component. +func (c *ModuleComponent) Update(_ component.Arguments) error { + return nil +} + // RunFlowController runs the flow controller that all module components start. func (c *ModuleComponent) RunFlowController(ctx context.Context) { err := c.mod.Run(ctx) @@ -104,6 +135,18 @@ func (c *ModuleComponent) getLatestContent() string { return c.latestContent } +func (c *ModuleComponent) setLatestLoaderConfigOptions(options config.LoaderConfigOptions) { + c.mut.Lock() + defer c.mut.Unlock() + c.latestLoaderConfigOptions = options +} + +func (c *ModuleComponent) getLatestLoaderConfigOptions() config.LoaderConfigOptions { + c.mut.RLock() + defer c.mut.RUnlock() + return c.latestLoaderConfigOptions +} + func (c *ModuleComponent) setLatestArgs(args map[string]any) { c.mut.Lock() defer c.mut.Unlock() diff --git a/component/module/string/string.go b/component/module/string/string.go index bd3e6193f441..1d6550085413 100644 --- a/component/module/string/string.go +++ b/component/module/string/string.go @@ -5,13 +5,15 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/river/rivertypes" ) func init() { component.Register(component.Registration{ - Name: "module.string", - Args: Arguments{}, + Name: "module.string", + Args: Arguments{}, + //nolint:staticcheck Exports: module.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { @@ -42,6 +44,7 @@ var ( // New creates a new module.string component. func New(o component.Options, args Arguments) (*Component, error) { + //nolint:staticcheck m, err := module.NewModuleComponent(o) if err != nil { return nil, err @@ -66,7 +69,7 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) - return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value) + return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value, config.DefaultLoaderConfigOptions()) } // CurrentHealth implements component.HealthComponent. diff --git a/component/registry.go b/component/registry.go index 11cc593b0ddd..0d348d61074d 100644 --- a/component/registry.go +++ b/component/registry.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" @@ -44,7 +45,7 @@ type Module interface { // LoadConfig parses River config and loads it into the Module. // LoadConfig can be called multiple times, and called prior to // [Module.Run]. - LoadConfig(config []byte, args map[string]any) error + LoadConfig(config []byte, args map[string]any, options config.LoaderConfigOptions) error // Run starts the Module. No components within the Module // will be run until Run is called. diff --git a/integration-tests/configs/http-module/module.river b/integration-tests/configs/http-module/module.river new file mode 100644 index 000000000000..808b79a86b4b --- /dev/null +++ b/integration-tests/configs/http-module/module.river @@ -0,0 +1,21 @@ +declare "myModule" { + argument "scrape_endpoint" {} + + argument "forward_to" {} + + argument "scrape_interval" { + optional = true + default = "1s" + } + + prometheus.scrape "scrape_prom_metrics_module_file" { + targets = [ + {"__address__" = argument.scrape_endpoint.value}, + ] + forward_to = argument.forward_to.value + scrape_classic_histograms = true + enable_protobuf_negotiation = true + scrape_interval = argument.scrape_interval.value + scrape_timeout = "500ms" + } +} \ No newline at end of file diff --git a/integration-tests/docker-compose.yaml b/integration-tests/docker-compose.yaml index a94a05db21d9..d5503de64b62 100644 --- a/integration-tests/docker-compose.yaml +++ b/integration-tests/docker-compose.yaml @@ -29,4 +29,11 @@ services: dockerfile: ./integration-tests/configs/prom-gen/Dockerfile context: .. ports: - - "9001:9001" \ No newline at end of file + - "9001:9001" + + http-module: + image: nginx:alpine + ports: + - "8090:80" + volumes: + - ./configs/http-module/module.river:/usr/share/nginx/html/module.river:ro \ No newline at end of file diff --git a/integration-tests/tests/module-file/config.river b/integration-tests/tests/module-file/config.river new file mode 100644 index 000000000000..a8e776d5cd24 --- /dev/null +++ b/integration-tests/tests/module-file/config.river @@ -0,0 +1,38 @@ +import.file "import_loki" { + filename = "loki.river" +} + +import_loki.loki "loki" {} + +import.file "import_prom_scrape" { + filename = "prom-scrape.river" +} + +declare "target" { + export "output" { + value = "localhost:9001" + } +} + +target "promTarget" {} + +import_prom_scrape.scrape "promScraper" { + scrape_endpoint = target.promTarget.output + forward_to = [prometheus.remote_write.module_file.receiver] +} + +prometheus.remote_write "module_file" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "module_file", + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/logfile.river b/integration-tests/tests/module-file/logfile.river new file mode 100644 index 000000000000..a9b2b9762b3a --- /dev/null +++ b/integration-tests/tests/module-file/logfile.river @@ -0,0 +1,5 @@ +declare "getLogFile" { + export "output" { + value = [{__path__ = "logs.txt"}] + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/logs.txt b/integration-tests/tests/module-file/logs.txt new file mode 100644 index 000000000000..ed6c24c81170 --- /dev/null +++ b/integration-tests/tests/module-file/logs.txt @@ -0,0 +1,13 @@ +[2023-10-02 14:25:43] INFO: Starting the web application... +[2023-10-02 14:25:45] DEBUG: Database connection established. +[2023-10-02 14:26:01] INFO: User 'john_doe' logged in. +[2023-10-02 14:26:05] WARNING: User 'john_doe' attempted to access restricted area. +[2023-10-02 14:26:10] ERROR: Failed to retrieve data for item ID: 1234. +[2023-10-02 14:26:15] INFO: User 'john_doe' logged out. +[2023-10-02 14:27:00] INFO: User 'admin' logged in. +[2023-10-02 14:27:05] INFO: Data backup started. +[2023-10-02 14:30:00] INFO: Data backup completed successfully. +[2023-10-02 14:31:23] ERROR: Database connection lost. Retrying in 5 seconds... +[2023-10-02 14:31:28] INFO: Database reconnected. +[2023-10-02 14:32:00] INFO: User 'admin' logged out. +[2023-10-02 14:32:05] INFO: Shutting down the web application... diff --git a/integration-tests/tests/module-file/loki.river b/integration-tests/tests/module-file/loki.river new file mode 100644 index 000000000000..24f5af09e874 --- /dev/null +++ b/integration-tests/tests/module-file/loki.river @@ -0,0 +1,17 @@ +import.file "logTarget" { + filename = "logfile.river" +} + +import.file "write" { + filename = "loki_write.river" +} + +declare "loki" { + logTarget.getLogFile "logFile" {} + loki.source.file "test" { + targets = logTarget.getLogFile.logFile.output + forward_to = [write.loki_write.default.receiver] + } + write.loki_write "default" {} +} + diff --git a/integration-tests/tests/module-file/loki_write.river b/integration-tests/tests/module-file/loki_write.river new file mode 100644 index 000000000000..03c9ed44b66b --- /dev/null +++ b/integration-tests/tests/module-file/loki_write.river @@ -0,0 +1,13 @@ +declare "loki_write" { + loki.write "test" { + endpoint { + url = "http://localhost:3100/loki/api/v1/push" + } + external_labels = { + test_name = "module_file", + } + } + export "receiver" { + value = loki.write.test.receiver + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/module_file_test.go b/integration-tests/tests/module-file/module_file_test.go new file mode 100644 index 000000000000..1bc6e1a4d722 --- /dev/null +++ b/integration-tests/tests/module-file/module_file_test.go @@ -0,0 +1,30 @@ +package main + +import ( + "testing" + + "github.com/grafana/agent/integration-tests/common" + "github.com/stretchr/testify/assert" +) + +const lokiUrl = "http://localhost:3100/loki/api/v1/query?query={test_name=%22module_file%22}" + +func TestScrapePromMetricsModuleFile(t *testing.T) { + common.MimirMetricsTest(t, common.PromDefaultMetrics, common.PromDefaultHistogramMetric, "module_file") +} + +func TestReadLogFile(t *testing.T) { + var logResponse common.LogResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(lokiUrl, &logResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, logResponse.Data.Result) { + assert.Equal(c, logResponse.Data.Result[0].Stream["filename"], "logs.txt") + logs := make([]string, len(logResponse.Data.Result[0].Values)) + for i, valuePair := range logResponse.Data.Result[0].Values { + logs[i] = valuePair[1] + } + assert.Contains(c, logs, "[2023-10-02 14:25:43] INFO: Starting the web application...") + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} diff --git a/integration-tests/tests/module-file/prom-scrape.river b/integration-tests/tests/module-file/prom-scrape.river new file mode 100644 index 000000000000..f6e0b31aadcd --- /dev/null +++ b/integration-tests/tests/module-file/prom-scrape.river @@ -0,0 +1,21 @@ +declare "scrape" { + argument "scrape_endpoint" {} + + argument "forward_to" {} + + argument "scrape_interval" { + optional = true + default = "1s" + } + + prometheus.scrape "module_file" { + targets = [ + {"__address__" = argument.scrape_endpoint.value}, + ] + forward_to = argument.forward_to.value + scrape_classic_histograms = true + enable_protobuf_negotiation = true + scrape_interval = argument.scrape_interval.value + scrape_timeout = "500ms" + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-git/config.river b/integration-tests/tests/scrape-prom-metrics-module-git/config.river new file mode 100644 index 000000000000..68dffaef6ffd --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-git/config.river @@ -0,0 +1,27 @@ +import.git "scrape_module" { + // TODO: change this to use either a docker container hosting a git repo just for the integration tests (not easy) + // or put the module.river file in the agent-modules repo (easy) + repository = "https://github.com/wildum/module.git" + path = "module.river" +} + +scrape_module.myModule "scrape_prom_metrics_module_git" { + scrape_endpoint = "localhost:9001" + forward_to = [prometheus.remote_write.scrape_prom_metrics_module_git.receiver] +} + +prometheus.remote_write "scrape_prom_metrics_module_git" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "scrape_prom_metrics_module_git", + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go b/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go new file mode 100644 index 000000000000..ad309d54bb92 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go @@ -0,0 +1,11 @@ +package main + +import ( + "testing" + + "github.com/grafana/agent/integration-tests/common" +) + +func TestScrapePromMetricsModuleGit(t *testing.T) { + common.MimirMetricsTest(t, common.PromDefaultMetrics, common.PromDefaultHistogramMetric, "scrape_prom_metrics_module_git") +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-http/config.river b/integration-tests/tests/scrape-prom-metrics-module-http/config.river new file mode 100644 index 000000000000..dcb93d4d86a3 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-http/config.river @@ -0,0 +1,24 @@ +import.http "scrape_module" { + url = coalesce(env("MODULE_HTTP_ENDPOINT"), "http://localhost:8090/module.river") +} + +scrape_module.myModule "scrape_prom_metrics_module_http" { + scrape_endpoint = "localhost:9001" + forward_to = [prometheus.remote_write.scrape_prom_metrics_module_http.receiver] +} + +prometheus.remote_write "scrape_prom_metrics_module_http" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "scrape_prom_metrics_module_http", + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go b/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go new file mode 100644 index 000000000000..d6cb64a2ac4e --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go @@ -0,0 +1,11 @@ +package main + +import ( + "testing" + + "github.com/grafana/agent/integration-tests/common" +) + +func TestScrapePromMetricsModuleHTTP(t *testing.T) { + common.MimirMetricsTest(t, common.PromDefaultMetrics, common.PromDefaultHistogramMetric, "scrape_prom_metrics_module_http") +} diff --git a/pkg/flow/componenttest/testfailmodule.go b/pkg/flow/componenttest/testfailmodule.go index 011659f95564..f7d624166f75 100644 --- a/pkg/flow/componenttest/testfailmodule.go +++ b/pkg/flow/componenttest/testfailmodule.go @@ -6,15 +6,18 @@ import ( "github.com/grafana/agent/component" mod "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/config" ) func init() { component.Register(component.Registration{ - Name: "test.fail.module", - Args: TestFailArguments{}, + Name: "test.fail.module", + Args: TestFailArguments{}, + //nolint:staticcheck Exports: mod.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + //nolint:staticcheck m, err := mod.NewModuleComponent(opts) if err != nil { return nil, err @@ -22,7 +25,7 @@ func init() { if args.(TestFailArguments).Fail { return nil, fmt.Errorf("module told to fail") } - err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) + err = m.LoadFlowSource(nil, args.(TestFailArguments).Content, config.DefaultLoaderConfigOptions()) if err != nil { return nil, err } @@ -58,7 +61,7 @@ func (t *TestFailModule) Run(ctx context.Context) error { func (t *TestFailModule) UpdateContent(content string) error { t.content = content - err := t.mc.LoadFlowSource(nil, t.content) + err := t.mc.LoadFlowSource(nil, t.content, config.DefaultLoaderConfigOptions()) return err } diff --git a/pkg/flow/config/loader.go b/pkg/flow/config/loader.go new file mode 100644 index 000000000000..539e267b212a --- /dev/null +++ b/pkg/flow/config/loader.go @@ -0,0 +1,13 @@ +package config + +// LoaderConfigOptions is used to provide a set of options when a new config is loaded. +type LoaderConfigOptions struct { + // AdditionalDeclareContents can be used to pass custom components definition to the loader. + // This is needed when a custom component is instantiated within a custom component and the corresponding + // declare of the nested custom component is imported/declared at the root. + AdditionalDeclareContents map[string]string +} + +func DefaultLoaderConfigOptions() LoaderConfigOptions { + return LoaderConfigOptions{} +} diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 6e839abda36f..0a58cc9081a9 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -51,6 +51,7 @@ import ( "sync" "time" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" "github.com/grafana/agent/pkg/flow/logging" @@ -248,13 +249,18 @@ func (f *Flow) Run(ctx context.Context) { var ( components = f.loader.Components() services = f.loader.Services() + imports = f.loader.Imports() - runnables = make([]controller.RunnableNode, 0, len(components)+len(services)) + runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports)) ) for _, c := range components { runnables = append(runnables, c) } + for _, i := range imports { + runnables = append(runnables, i) + } + // Only the root controller should run services, since modules share the // same service instance as the root. if !f.opts.IsModule { @@ -278,10 +284,14 @@ func (f *Flow) Run(ctx context.Context) { // The controller will only start running components after Load is called once // without any configuration errors. func (f *Flow) LoadSource(source *Source, args map[string]any) error { + return f.loadSource(source, args, config.DefaultLoaderConfigOptions()) +} + +func (f *Flow) loadSource(source *Source, args map[string]any, options config.LoaderConfigOptions) error { f.loadMut.Lock() defer f.loadMut.Unlock() - diags := f.loader.Apply(args, source.components, source.configBlocks) + diags := f.loader.Apply(args, source.blocks, source.configBlocks, source.declares, options) if !f.loadedOnce.Load() && diags.HasErrors() { // The first call to Load should not run any components if there were // errors in the configuration file. diff --git a/pkg/flow/flow_services_test.go b/pkg/flow/flow_services_test.go index a4bf2b4cb848..19881f5c7542 100644 --- a/pkg/flow/flow_services_test.go +++ b/pkg/flow/flow_services_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/testcomponents" "github.com/grafana/agent/pkg/flow/internal/testservices" @@ -276,7 +277,7 @@ func TestComponents_Using_Services_In_Modules(t *testing.T) { mod, err := opts.ModuleController.NewModule("", nil) require.NoError(t, err, "Failed to create module") - err = mod.LoadConfig([]byte(`service_consumer "example" {}`), nil) + err = mod.LoadConfig([]byte(`service_consumer "example" {}`), nil, config.DefaultLoaderConfigOptions()) require.NoError(t, err, "Failed to load module config") return &testcomponents.Fake{ diff --git a/pkg/flow/internal/controller/component_node_manager.go b/pkg/flow/internal/controller/component_node_manager.go new file mode 100644 index 000000000000..cd9eb5f08e92 --- /dev/null +++ b/pkg/flow/internal/controller/component_node_manager.go @@ -0,0 +1,247 @@ +package controller + +import ( + "fmt" + "strings" + + "github.com/grafana/river/ast" +) + +// ComponentNodeManager manages component nodes. +type ComponentNodeManager struct { + importNodes map[string]*ImportConfigNode + declareNodes map[string]*DeclareNode + globals ComponentGlobals + componentReg ComponentRegistry + customComponentDependencies map[string][]CustomComponentDependency + additionalDeclareContents map[string]string +} + +// NewComponentNodeManager creates a new ComponentNodeManager. +func NewComponentNodeManager(globals ComponentGlobals, componentReg ComponentRegistry) *ComponentNodeManager { + return &ComponentNodeManager{ + importNodes: map[string]*ImportConfigNode{}, + declareNodes: map[string]*DeclareNode{}, + customComponentDependencies: map[string][]CustomComponentDependency{}, + globals: globals, + componentReg: componentReg, + } +} + +// Reload resets the state of the component node manager and stores the provided additionalDeclareContents. +func (m *ComponentNodeManager) Reload(additionalDeclareContents map[string]string) { + m.additionalDeclareContents = additionalDeclareContents + m.customComponentDependencies = make(map[string][]CustomComponentDependency) + m.importNodes = map[string]*ImportConfigNode{} + m.declareNodes = map[string]*DeclareNode{} +} + +// CreateComponentNode creates a new builtin component or a new custom component. +func (m *ComponentNodeManager) CreateComponentNode(componentName string, block *ast.BlockStmt) (ComponentNode, error) { + if m.isCustomComponent(componentName) { + return NewCustomComponentNode(m.globals, block, m.getCustomComponentConfig), nil + } else { + registration, exists := m.componentReg.Get(componentName) + if !exists { + return nil, fmt.Errorf("unrecognized component name %q", componentName) + } + return NewBuiltinComponentNode(m.globals, registration, block), nil + } +} + +// GetCustomComponentDependencies retrieves and caches the dependencies that declare might have to other declares. +func (m *ComponentNodeManager) getCustomComponentDependencies(declareNode *DeclareNode) ([]CustomComponentDependency, error) { + var dependencies []CustomComponentDependency + var err error + if deps, ok := m.customComponentDependencies[declareNode.label]; ok { + dependencies = deps + } else { + dependencies, err = m.FindCustomComponentDependencies(declareNode.Declare()) + if err != nil { + return nil, err + } + m.customComponentDependencies[declareNode.label] = dependencies + } + return dependencies, nil +} + +// isCustomComponent searches for a declare corresponding to the given component name. +func (m *ComponentNodeManager) isCustomComponent(componentName string) bool { + namespace := strings.Split(componentName, ".")[0] + _, declareExists := m.declareNodes[namespace] + _, importExists := m.importNodes[namespace] + _, additionalDeclareContentExists := m.additionalDeclareContents[componentName] + + return declareExists || importExists || additionalDeclareContentExists +} + +// GetCorrespondingLocalDeclare returns the declareNode matching the declareLabel of the provided CustomComponentNode if present. +func (m *ComponentNodeManager) GetCorrespondingLocalDeclare(cc *CustomComponentNode) (*DeclareNode, bool) { + declareNode, exist := m.declareNodes[cc.declareLabel] + return declareNode, exist +} + +// GetCorrespondingImportedDeclare returns the importNode matching the importLabel of the provided CustomComponentNode if present. +func (m *ComponentNodeManager) GetCorrespondingImportedDeclare(cc *CustomComponentNode) (*ImportConfigNode, bool) { + importNode, exist := m.importNodes[cc.importLabel] + return importNode, exist +} + +// CustomComponentConfig represents the config needed by a custom component to load. +type CustomComponentConfig struct { + declareContent string // represents the corresponding declare as plain string + additionalDeclareContents map[string]string // represents the additional declare that might be needed by the component to build custom components +} + +// getCustomComponentConfig returns the custom component config for a given custom component. +func (m *ComponentNodeManager) getCustomComponentConfig(cc *CustomComponentNode) (CustomComponentConfig, error) { + var customComponentConfig CustomComponentConfig + var found bool + var err error + if cc.importLabel == "" { + customComponentConfig, found = m.getCustomComponentConfigFromLocalDeclares(cc) + if !found { + customComponentConfig, found = m.getCustomComponentConfigFromParent(cc) + } + } else { + customComponentConfig, found, err = m.getCustomComponentConfigFromImportedDeclares(cc) + if err != nil { + return customComponentConfig, err + } + if !found { + customComponentConfig, found = m.getCustomComponentConfigFromParent(cc) + // Custom components that receive their config from imported declares in a parent controller can only access the imported declares coming from the same import. + customComponentConfig.additionalDeclareContents = filterAdditionalDeclareContents(cc.importLabel, customComponentConfig.additionalDeclareContents) + } + } + if !found { + return customComponentConfig, fmt.Errorf("custom component config not found for component %s", cc.componentName) + } + return customComponentConfig, nil +} + +// getCustomComponentConfigFromLocalDeclares retrieves the config of a custom component from the local declares. +func (m *ComponentNodeManager) getCustomComponentConfigFromLocalDeclares(cc *CustomComponentNode) (CustomComponentConfig, bool) { + node, exists := m.declareNodes[cc.declareLabel] + if !exists { + return CustomComponentConfig{}, false + } + return CustomComponentConfig{ + declareContent: node.Declare().content, + additionalDeclareContents: m.getLocalAdditionalDeclareContents(cc.componentName), + }, true +} + +// getCustomComponentConfigFromParent retrieves the config of a custom component from the parent controller. +func (m *ComponentNodeManager) getCustomComponentConfigFromParent(cc *CustomComponentNode) (CustomComponentConfig, bool) { + declareContent, exists := m.additionalDeclareContents[cc.componentName] + if !exists { + return CustomComponentConfig{}, false + } + return CustomComponentConfig{ + declareContent: declareContent, + additionalDeclareContents: m.additionalDeclareContents, + }, true +} + +// getImportedCustomComponentConfig retrieves the config of a custom component from the imported declares. +func (m *ComponentNodeManager) getCustomComponentConfigFromImportedDeclares(cc *CustomComponentNode) (CustomComponentConfig, bool, error) { + node, exists := m.importNodes[cc.importLabel] + if !exists { + return CustomComponentConfig{}, false, nil + } + declare, err := node.GetImportedDeclareByLabel(cc.declareLabel) + if err != nil { + return CustomComponentConfig{}, false, err + } + return CustomComponentConfig{ + declareContent: declare.content, + additionalDeclareContents: m.getImportAdditionalDeclareContents(node), + }, true, nil +} + +// getImportAdditionalDeclareContents provides the additional declares that a custom component might need. +func (m *ComponentNodeManager) getImportAdditionalDeclareContents(node *ImportConfigNode) map[string]string { + additionalDeclareContents := make(map[string]string, len(node.ImportedDeclares())) + for importedDeclareLabel, importedDeclare := range node.ImportedDeclares() { + additionalDeclareContents[importedDeclareLabel] = importedDeclare.content + } + return additionalDeclareContents +} + +// getLocalAdditionalDeclareContents provides the additional declares that a custom component might need. +func (m *ComponentNodeManager) getLocalAdditionalDeclareContents(componentName string) map[string]string { + additionalDeclareContents := make(map[string]string) + for _, customComponentDependency := range m.customComponentDependencies[componentName] { + if customComponentDependency.importNode != nil { + for importedDeclareLabel, importedDeclare := range customComponentDependency.importNode.ImportedDeclares() { + // The label of the importNode is added as a prefix to the declare label to create a scope. + // This is useful in the scenario where a custom component of an imported declare is defined inside of a local declare. + // In this case, this custom component should only have have access to the imported declares of its corresponding import node. + additionalDeclareContents[customComponentDependency.importNode.label+"."+importedDeclareLabel] = importedDeclare.content + } + } else if customComponentDependency.declareNode != nil { + additionalDeclareContents[customComponentDependency.declareLabel] = customComponentDependency.declareNode.Declare().content + } else { + additionalDeclareContents[customComponentDependency.componentName] = m.additionalDeclareContents[customComponentDependency.componentName] + } + } + return additionalDeclareContents +} + +// filterAdditionalDeclareContents prevents custom components from accessing declared content out of their scope. +func filterAdditionalDeclareContents(importLabel string, additionalDeclareContents map[string]string) map[string]string { + filteredAdditionalDeclareContents := make(map[string]string) + for declareLabel, declareContent := range additionalDeclareContents { + // The scope is defined by the importLabel prefix in the declareLabel of the declare block. + if strings.HasPrefix(declareLabel, importLabel) { + filteredAdditionalDeclareContents[strings.TrimPrefix(declareLabel, importLabel+".")] = declareContent + } + } + return filteredAdditionalDeclareContents +} + +// CustomComponentDependency represents a dependency that a custom component has to a declare block. +type CustomComponentDependency struct { + componentName string + importLabel string + declareLabel string + importNode *ImportConfigNode + declareNode *DeclareNode +} + +// FindCustomComponentDependencies traverses the AST of the provided declare and collects references to known custom components. +// Panics if declare is nil. +func (m *ComponentNodeManager) FindCustomComponentDependencies(declare *Declare) ([]CustomComponentDependency, error) { + uniqueReferences := make(map[string]CustomComponentDependency) + m.findCustomComponentDependencies(declare.block.Body, uniqueReferences) + + references := make([]CustomComponentDependency, 0, len(uniqueReferences)) + for _, ref := range uniqueReferences { + references = append(references, ref) + } + + return references, nil +} + +func (m *ComponentNodeManager) findCustomComponentDependencies(stmts ast.Body, uniqueReferences map[string]CustomComponentDependency) { + for _, stmt := range stmts { + switch stmt := stmt.(type) { + case *ast.BlockStmt: + componentName := strings.Join(stmt.Name, ".") + switch componentName { + case "declare": + m.findCustomComponentDependencies(stmt.Body, uniqueReferences) + default: + potentialImportLabel, potentialDeclareLabel := ExtractImportAndDeclareLabels(componentName) + if declareNode, ok := m.declareNodes[potentialDeclareLabel]; ok { + uniqueReferences[componentName] = CustomComponentDependency{componentName: componentName, importLabel: "", declareLabel: potentialDeclareLabel, declareNode: declareNode} + } else if importNode, ok := m.importNodes[potentialImportLabel]; ok { + uniqueReferences[componentName] = CustomComponentDependency{componentName: componentName, importLabel: potentialImportLabel, declareLabel: potentialDeclareLabel, importNode: importNode} + } else if _, ok := m.additionalDeclareContents[componentName]; ok { + uniqueReferences[componentName] = CustomComponentDependency{componentName: componentName, importLabel: potentialImportLabel, declareLabel: potentialDeclareLabel} + } + } + } + } +} diff --git a/pkg/flow/internal/controller/declare.go b/pkg/flow/internal/controller/declare.go new file mode 100644 index 000000000000..20465ddaef9f --- /dev/null +++ b/pkg/flow/internal/controller/declare.go @@ -0,0 +1,16 @@ +package controller + +import "github.com/grafana/river/ast" + +// Declare represents the content of a declare block as AST and as plain string. +type Declare struct { + block *ast.BlockStmt + // TODO: we would not need this content field if the content of the block was saved in ast.BlockStmt when parsing. + // Not only it looks redundant but it allows discrepancies between the block and the content. + content string +} + +// NewDeclare creates a new Declare from its AST and its plain string content. +func NewDeclare(block *ast.BlockStmt, content string) *Declare { + return &Declare{block: block, content: content} +} diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 5ff95ab864f9..7dd29a0ecba3 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/flow/internal/worker" "github.com/grafana/agent/pkg/flow/logging/level" @@ -25,26 +26,26 @@ import ( // The Loader builds and evaluates ComponentNodes from River blocks. type Loader struct { - log log.Logger - tracer trace.TracerProvider - globals ComponentGlobals - services []service.Service - host service.Host - componentReg ComponentRegistry - workerPool worker.Pool + log log.Logger + tracer trace.TracerProvider + globals ComponentGlobals + services []service.Service + host service.Host + workerPool worker.Pool // backoffConfig is used to backoff when an updated component's dependencies cannot be submitted to worker // pool for evaluation in EvaluateDependants, because the queue is full. This is an unlikely scenario, but when // it happens we should avoid retrying too often to give other goroutines a chance to progress. Having a backoff // also prevents log spamming with errors. backoffConfig backoff.Config - mut sync.RWMutex - graph *dag.Graph - originalGraph *dag.Graph - componentNodes []ComponentNode - serviceNodes []*ServiceNode + mut sync.RWMutex + graph *dag.Graph + originalGraph *dag.Graph + componentNodes []ComponentNode + serviceNodes []*ServiceNode + componentNodeManager *ComponentNodeManager + cache *valueCache - blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing cm *controllerMetrics cc *controllerCollector moduleExportIndex int @@ -76,13 +77,13 @@ func NewLoader(opts LoaderOptions) *Loader { } l := &Loader{ - log: log.With(globals.Logger, "controller_id", globals.ControllerID), - tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), - globals: globals, - services: services, - host: host, - componentReg: reg, - workerPool: opts.WorkerPool, + log: log.With(globals.Logger, "controller_id", globals.ControllerID), + tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), + globals: globals, + services: services, + host: host, + workerPool: opts.WorkerPool, + componentNodeManager: NewComponentNodeManager(globals, reg), // This is a reasonable default which should work for most cases. If a component is completely stuck, we would // retry and log an error every 10 seconds, at most. @@ -118,7 +119,11 @@ func NewLoader(opts LoaderOptions) *Loader { // The provided parentContext can be used to provide global variables and // functions to components. A child context will be constructed from the parent // to expose values of other components. -func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) diag.Diagnostics { +// +// Declares are pieces of config that can be used as a blueprints to instantiate custom components. +// The declares argument corresponds to declares written directly in the river config. +// Other declares may come via the import config blocks (provided by the configBlocks argument) and via the LoaderConfigOptions. +func (l *Loader) Apply(args map[string]any, blocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt, declares []*Declare, options config.LoaderConfigOptions) diag.Diagnostics { start := time.Now() l.mut.Lock() defer l.mut.Unlock() @@ -130,14 +135,18 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co } l.cache.SyncModuleArgs(args) - newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks) + // Reload the component node manager when a new config is applied. + // This step is important to remove configs that might have been removed in the new config. + l.componentNodeManager.Reload(options.AdditionalDeclareContents) + + newGraph, diags := l.loadNewGraph(args, blocks, configBlocks, declares) if diags.HasErrors() { return diags } var ( - components = make([]ComponentNode, 0, len(componentBlocks)) - componentIDs = make([]ComponentID, 0, len(componentBlocks)) + components = make([]ComponentNode, 0) + componentIDs = make([]ComponentID, 0) services = make([]*ServiceNode, 0, len(l.services)) ) @@ -186,7 +195,6 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co }) } } - case *ServiceNode: services = append(services, n) @@ -203,7 +211,6 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co }) } } - case BlockNode: if err = l.evaluate(logger, n); err != nil { diags.Add(diag.Diagnostic{ @@ -232,7 +239,6 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co l.serviceNodes = services l.graph = &newGraph l.cache.SyncIDs(componentIDs) - l.blocks = componentBlocks if l.globals.OnExportsChange != nil && l.cache.ExportChangeIndex() != l.moduleExportIndex { l.moduleExportIndex = l.cache.ExportChangeIndex() l.globals.OnExportsChange(l.cache.CreateModuleExports()) @@ -253,11 +259,11 @@ func (l *Loader) Cleanup(stopWorkerPool bool) { } // loadNewGraph creates a new graph from the provided blocks and validates it. -func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) (dag.Graph, diag.Diagnostics) { +func (l *Loader) loadNewGraph(args map[string]any, blocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt, declares []*Declare) (dag.Graph, diag.Diagnostics) { var g dag.Graph - // Split component blocks into blocks for components and services. - componentBlocks, serviceBlocks := l.splitComponentBlocks(componentBlocks) + // Split blocks into blocks for components and services. + componentBlocks, serviceBlocks := l.categorizeBlocks(blocks) // Fill our graph with service blocks, which must be added before any other // block. @@ -267,6 +273,10 @@ func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockS configBlockDiags := l.populateConfigBlockNodes(args, &g, configBlocks) diags = append(diags, configBlockDiags...) + // Fill our graph with declare nodes + declareDiags := l.populateDeclareNodes(&g, declares) + diags = append(diags, declareDiags...) + // Fill our graph with components. componentNodeDiags := l.populateComponentNodes(&g, componentBlocks) diags = append(diags, componentNodeDiags...) @@ -291,7 +301,7 @@ func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockS return g, diags } -func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks, serviceBlocks []*ast.BlockStmt) { +func (l *Loader) categorizeBlocks(blocks []*ast.BlockStmt) (componentBlocks, serviceBlocks []*ast.BlockStmt) { componentBlocks = make([]*ast.BlockStmt, 0, len(blocks)) serviceBlocks = make([]*ast.BlockStmt, 0, len(l.services)) @@ -311,6 +321,24 @@ func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks, return componentBlocks, serviceBlocks } +func (l *Loader) populateDeclareNodes(g *dag.Graph, declares []*Declare) diag.Diagnostics { + var diags diag.Diagnostics + l.componentNodeManager.declareNodes = map[string]*DeclareNode{} + for _, declare := range declares { + node := NewDeclareNode(declare) + if g.GetByID(node.NodeID()) != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("cannot add declare node %q; node with same ID already exists", node.NodeID()), + }) + continue + } + g.Add(node) + l.componentNodeManager.declareNodes[node.label] = node + } + return diags +} + // populateServiceNodes adds service nodes to the graph. func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockStmt) diag.Diagnostics { var diags diag.Diagnostics @@ -425,6 +453,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con g.Add(c) } + l.componentNodeManager.importNodes = nodeMap.importMap return diags } @@ -449,44 +478,52 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo } blockMap[id] = block - // Check the graph from the previous call to Load to see we can copy an - // existing instance of ComponentNode. + // Check the graph from the previous call to Load to see we can copy an existing instance of ComponentNode. if exist := l.graph.GetByID(id); exist != nil { c = exist.(ComponentNode) c.UpdateBlock(block) } else { componentName := block.GetBlockName() - registration, exists := l.componentReg.Get(componentName) - if !exists { + if block.Label == "" { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("Unrecognized component name %q", componentName), + Message: fmt.Sprintf("Component %q must have a label", componentName), StartPos: block.NamePos.Position(), EndPos: block.NamePos.Add(len(componentName) - 1).Position(), }) continue } - - if block.Label == "" { + var err error + c, err = l.componentNodeManager.CreateComponentNode(componentName, block) + if err != nil { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("Component %q must have a label", componentName), + Message: err.Error(), StartPos: block.NamePos.Position(), EndPos: block.NamePos.Add(len(componentName) - 1).Position(), }) continue } - - // Create a new component - c = NewBuiltinComponentNode(l.globals, registration, block) } - g.Add(c) } - return diags } +func (l *Loader) wireCustomComponentDependencies(g *dag.Graph, cc *CustomComponentNode, declareNode *DeclareNode) error { + dependencies, err := l.componentNodeManager.getCustomComponentDependencies(declareNode) + if err != nil { + return err + } + // Add edges between the CustomComponentNode and all import nodes that it needs. + for _, dependency := range dependencies { + if dependency.importNode != nil { + g.AddEdge(dag.Edge{From: cc, To: dependency.importNode}) + } + } + return nil +} + // Wire up all the related nodes func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { var diags diag.Diagnostics @@ -507,6 +544,20 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { g.AddEdge(dag.Edge{From: n, To: dep}) } + case *DeclareNode: + // A DeclareNode has no edge, it only holds a static content. + continue + case *CustomComponentNode: + err := l.wireCustomComponentNode(g, n) + if err != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Error while wiring the custom component %s: %v", n.label, err), + StartPos: n.block.NamePos.Position(), + EndPos: n.block.NamePos.Add(len(n.componentName) - 1).Position(), + }) + continue + } } // Finally, wire component references. @@ -520,6 +571,18 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { return diags } +func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode) error { + if declareNode, exists := l.componentNodeManager.GetCorrespondingLocalDeclare(cc); exists { + err := l.wireCustomComponentDependencies(g, cc, declareNode) + if err != nil { + return err + } + } else if importNode, exists := l.componentNodeManager.GetCorrespondingImportedDeclare(cc); exists { + g.AddEdge(dag.Edge{From: cc, To: importNode}) + } + return nil +} + // Variables returns the Variables the Loader exposes for other Flow components // to reference. func (l *Loader) Variables() map[string]interface{} { @@ -540,6 +603,12 @@ func (l *Loader) Services() []*ServiceNode { return l.serviceNodes } +func (l *Loader) Imports() map[string]*ImportConfigNode { + l.mut.RLock() + defer l.mut.RUnlock() + return l.componentNodeManager.importNodes +} + // Graph returns a copy of the DAG managed by the Loader. func (l *Loader) Graph() *dag.Graph { l.mut.RLock() diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 1322db4a69c8..06a5529a8608 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/flow/logging" @@ -129,7 +130,7 @@ func TestLoader(t *testing.T) { ` l := controller.NewLoader(newLoaderOptions()) diags := applyFromContent(t, l, []byte(invalidFile), nil) - require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`) + require.ErrorContains(t, diags.ErrorOrNil(), `unrecognized component name "doesnotexist`) }) t.Run("Partial load with invalid reference", func(t *testing.T) { @@ -227,12 +228,13 @@ func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, t.Helper() var ( - diags diag.Diagnostics - componentBlocks []*ast.BlockStmt - configBlocks []*ast.BlockStmt = nil + diags diag.Diagnostics + blocks []*ast.BlockStmt + configBlocks []*ast.BlockStmt = nil + declares []*controller.Declare ) - componentBlocks, diags = fileToBlock(t, componentBytes) + blocks, diags = fileToBlock(t, componentBytes) if diags.HasErrors() { return diags } @@ -244,7 +246,7 @@ func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, } } - applyDiags := l.Apply(nil, componentBlocks, configBlocks) + applyDiags := l.Apply(nil, blocks, configBlocks, declares, config.DefaultLoaderConfigOptions()) diags = append(diags, applyDiags...) return diags diff --git a/pkg/flow/internal/controller/node_config.go b/pkg/flow/internal/controller/node_config.go index d583dc1e1061..0b679f074109 100644 --- a/pkg/flow/internal/controller/node_config.go +++ b/pkg/flow/internal/controller/node_config.go @@ -3,6 +3,7 @@ package controller import ( "fmt" + "github.com/grafana/agent/pkg/flow/internal/importsource" "github.com/grafana/river/ast" "github.com/grafana/river/diag" ) @@ -26,6 +27,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewLoggingConfigNode(block, globals), nil case tracingBlockID: return NewTracingConfigNode(block, globals), nil + case importsource.BlockImportFile, importsource.BlockImportGit, importsource.BlockImportHTTP: + return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil default: var diags diag.Diagnostics diags.Add(diag.Diagnostic{ @@ -46,6 +49,7 @@ type ConfigNodeMap struct { tracing *TracingConfigNode argumentMap map[string]*ArgumentConfigNode exportMap map[string]*ExportConfigNode + importMap map[string]*ImportConfigNode } // NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called @@ -56,6 +60,7 @@ func NewConfigNodeMap() *ConfigNodeMap { tracing: nil, argumentMap: map[string]*ArgumentConfigNode{}, exportMap: map[string]*ExportConfigNode{}, + importMap: map[string]*ImportConfigNode{}, } } @@ -73,6 +78,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics { nodeMap.logging = n case *TracingConfigNode: nodeMap.tracing = n + case *ImportConfigNode: + nodeMap.importMap[n.Label()] = n default: diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, diff --git a/pkg/flow/internal/controller/node_config_import.go b/pkg/flow/internal/controller/node_config_import.go new file mode 100644 index 000000000000..118c5b808c38 --- /dev/null +++ b/pkg/flow/internal/controller/node_config_import.go @@ -0,0 +1,457 @@ +package controller + +import ( + "context" + "fmt" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/internal/importsource" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/river/ast" + "github.com/grafana/river/parser" + "github.com/grafana/river/vm" + "github.com/prometheus/client_golang/prometheus" +) + +type ImportConfigNode struct { + id ComponentID + label string + nodeID string + componentName string + globalID string + globals ComponentGlobals // Need a copy of the globals to create other import nodes. + block *ast.BlockStmt // Current River blocks to derive config from + source importsource.ImportSource + + registry *prometheus.Registry + OnBlockNodeUpdate func(cn BlockNode) + logger log.Logger + + importChildrenUpdateChan chan struct{} + + importChildrenMut sync.RWMutex + importConfigNodesChildren map[string]*ImportConfigNode + importChildrenRunning bool + + contentMut sync.RWMutex + importedDeclares map[string]*Declare + inContentUpdate bool + content string + + healthMut sync.RWMutex + evalHealth component.Health // Health of the last evaluate + runHealth component.Health // Health of running the component +} + +var _ RunnableNode = (*ImportConfigNode)(nil) + +// NewImportConfigNode creates a new ImportConfigNode from an initial ast.BlockStmt. +// The underlying config isn't applied until Evaluate is called. +func NewImportConfigNode(block *ast.BlockStmt, globals ComponentGlobals, sourceType importsource.SourceType) *ImportConfigNode { + var ( + id = BlockComponentID(block) + nodeID = id.String() + ) + + initHealth := component.Health{ + Health: component.HealthTypeUnknown, + Message: "component created", + UpdateTime: time.Now(), + } + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + cn := &ImportConfigNode{ + id: id, + globalID: globalID, + label: block.Label, + globals: globals, + nodeID: BlockComponentID(block).String(), + componentName: block.GetBlockName(), + OnBlockNodeUpdate: globals.OnBlockNodeUpdate, + block: block, + evalHealth: initHealth, + runHealth: initHealth, + importChildrenUpdateChan: make(chan struct{}), + } + managedOpts := getImportManagedOptions(globals, cn) + cn.logger = managedOpts.Logger + cn.source = importsource.NewImportSource(sourceType, managedOpts, vm.New(block.Body), cn.onContentUpdate) + return cn +} + +func getImportManagedOptions(globals ComponentGlobals, cn *ImportConfigNode) component.Options { + cn.registry = prometheus.NewRegistry() + return component.Options{ + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.globalID, + }, cn.registry), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), + + DataPath: filepath.Join(globals.DataPath, cn.globalID), + + GetServiceData: func(name string) (interface{}, error) { + return globals.GetServiceData(name) + }, + } +} + +// Evaluate implements BlockNode and updates the arguments for the managed config block +// by re-evaluating its River block with the provided scope. The managed config block +// will be built the first time Evaluate is called. +// +// Evaluate will return an error if the River block cannot be evaluated or if +// decoding to arguments fails. +func (cn *ImportConfigNode) Evaluate(scope *vm.Scope) error { + err := cn.evaluate(scope) + + switch err { + case nil: + cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") + default: + msg := fmt.Sprintf("component evaluation failed: %s", err) + cn.setEvalHealth(component.HealthTypeUnhealthy, msg) + } + return err +} + +func (cn *ImportConfigNode) setEvalHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.evalHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportConfigNode) evaluate(scope *vm.Scope) error { + return cn.source.Evaluate(scope) +} + +// processNodeBody processes the body of a node. +func (cn *ImportConfigNode) processNodeBody(node *ast.File, content string) { + for _, stmt := range node.Body { + switch stmt := stmt.(type) { + case *ast.BlockStmt: + fullName := strings.Join(stmt.Name, ".") + switch fullName { + case "declare": + cn.processDeclareBlock(stmt, content) + case importsource.BlockImportFile, importsource.BlockImportGit, importsource.BlockImportHTTP: + cn.processImportBlock(stmt, fullName) + default: + level.Error(cn.logger).Log("msg", "only declare and import blocks are allowed in a module", "forbidden", fullName) + } + default: + level.Error(cn.logger).Log("msg", "only declare and import blocks are allowed in a module") + } + } +} + +// processDeclareBlock processes a declare block. +func (cn *ImportConfigNode) processDeclareBlock(stmt *ast.BlockStmt, content string) { + cn.contentMut.Lock() + defer cn.contentMut.Unlock() + if _, ok := cn.importedDeclares[stmt.Label]; ok { + level.Error(cn.logger).Log("msg", "declare block redefined", "name", stmt.Label) + return + } + cn.importedDeclares[stmt.Label] = NewDeclare(stmt, content[stmt.LCurlyPos.Position().Offset+1:stmt.RCurlyPos.Position().Offset-1]) +} + +// processDeclareBlock processes an import block. +func (cn *ImportConfigNode) processImportBlock(stmt *ast.BlockStmt, fullName string) { + sourceType := importsource.GetSourceType(fullName) + if _, ok := cn.importConfigNodesChildren[stmt.Label]; ok { + level.Error(cn.logger).Log("msg", "import block redefined", "name", stmt.Label) + return + } + childGlobals := cn.globals + // Children have a special OnNodeWithDependantsUpdate function which will surface all the imported declares to the root import config node. + childGlobals.OnBlockNodeUpdate = cn.OnChildrenContentUpdate + cn.importConfigNodesChildren[stmt.Label] = NewImportConfigNode(stmt, childGlobals, sourceType) +} + +// onContentUpdate is triggered every time the managed import component has new content. +func (cn *ImportConfigNode) onContentUpdate(content string) { + cn.importChildrenMut.Lock() + defer cn.importChildrenMut.Unlock() + cn.contentMut.Lock() + // If the source sent the same content, there is no need to reload. + if cn.content == content { + return + } + cn.content = content + cn.contentMut.Unlock() + cn.importConfigNodesChildren = make(map[string]*ImportConfigNode) + + cn.contentMut.Lock() + cn.inContentUpdate = true + defer func() { + cn.contentMut.Lock() + cn.inContentUpdate = false + cn.contentMut.Unlock() + }() + cn.importedDeclares = make(map[string]*Declare) + cn.contentMut.Unlock() + + node, err := parser.ParseFile(cn.label, []byte(content)) + if err != nil { + level.Error(cn.logger).Log("msg", "failed to parse file on update", "err", err) + return + } + cn.processNodeBody(node, content) + err = cn.evaluateChildren() + if err != nil { + level.Error(cn.logger).Log("msg", "failed to evaluate nested import", "err", err) + return + } + + if cn.importChildrenRunning { + cn.importChildrenUpdateChan <- struct{}{} + } + + cn.OnBlockNodeUpdate(cn) +} + +// evaluateChildren evaluates the import nodes managed by this import node. +func (cn *ImportConfigNode) evaluateChildren() error { + for _, child := range cn.importConfigNodesChildren { + err := child.Evaluate(&vm.Scope{ + Parent: nil, + Variables: make(map[string]interface{}), + }) + if err != nil { + return fmt.Errorf("imported node %s failed to evaluate, %v", child.label, err) + } + } + return nil +} + +// runChildren run the import nodes managed by this import node. +// The children list can be updated onContentUpdate. In this case we need to stop the running children and run the new set of children. +func (cn *ImportConfigNode) runChildren(parentCtx context.Context) error { + errChildrenChan := make(chan error) + var wg sync.WaitGroup + var ctx context.Context + var cancel context.CancelFunc + + startChildren := func(ctx context.Context, children map[string]*ImportConfigNode, wg *sync.WaitGroup) { + for _, child := range children { + wg.Add(1) + go func(child *ImportConfigNode) { + defer wg.Done() + if err := child.Run(ctx); err != nil { + errChildrenChan <- err + } + }(child) + } + } + + childrenDone := func(wg *sync.WaitGroup, doneChan chan struct{}) { + wg.Wait() + close(doneChan) + } + + ctx, cancel = context.WithCancel(parentCtx) + cn.importChildrenMut.Lock() + startChildren(ctx, cn.importConfigNodesChildren, &wg) // initial start of children + cn.importChildrenRunning = true + cn.importChildrenMut.Unlock() + + doneChan := make(chan struct{}) + go childrenDone(&wg, doneChan) // start goroutine to check in case all children finish + + for { + select { + case <-cn.importChildrenUpdateChan: + cancel() // cancel all running children + <-doneChan // wait for the children to finish + + wg = sync.WaitGroup{} + errChildrenChan = make(chan error) + doneChan = make(chan struct{}) + + ctx, cancel = context.WithCancel(parentCtx) // create a new context + cn.importChildrenMut.Lock() + startChildren(ctx, cn.importConfigNodesChildren, &wg) // start the new set of children + cn.importChildrenMut.Unlock() + go childrenDone(&wg, doneChan) // start goroutine to check in case all new children finish + case err := <-errChildrenChan: + // One child stopped because of an error. + cancel() + return err + case <-doneChan: + // All children were cancelled without error. + cancel() + return nil + } + } +} + +// OnChildrenContentUpdate passes their imported content to their parents. +// To avoid collisions, the content is scoped via namespaces. +func (cn *ImportConfigNode) OnChildrenContentUpdate(child BlockNode) { + cn.contentMut.Lock() + defer cn.contentMut.Unlock() + switch child := child.(type) { + case *ImportConfigNode: + for importedDeclareLabel, importedDeclare := range child.importedDeclares { + label := child.label + "." + importedDeclareLabel + cn.importedDeclares[label] = importedDeclare + } + } + // This avoids OnNodeWithDependantsUpdate to be called multiple times in a row when the content changes. + if !cn.inContentUpdate { + cn.OnBlockNodeUpdate(cn) + } +} + +// GetImportedDeclareByLabel returns a declare block imported by the node. +func (cn *ImportConfigNode) GetImportedDeclareByLabel(declareLabel string) (*Declare, error) { + cn.contentMut.Lock() + defer cn.contentMut.Unlock() + if declare, ok := cn.importedDeclares[declareLabel]; ok { + return declare, nil + } + return nil, fmt.Errorf("declareLabel %s not found in imported node %s", declareLabel, cn.label) +} + +// Run runs the managed component in the calling goroutine until ctx is +// canceled. Evaluate must have been called at least once without returning an +// error before calling Run. +// +// Run will immediately return ErrUnevaluated if Evaluate has never been called +// successfully. Otherwise, Run will return nil. +func (cn *ImportConfigNode) Run(ctx context.Context) error { + cn.importChildrenMut.Lock() + importChildren := len(cn.importConfigNodesChildren) + cn.importChildrenMut.Unlock() + if cn.source == nil { + return ErrUnevaluated + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() // This will stop the children and the managed component. + + errChan := make(chan error, 1) + + if importChildren > 0 { + go func() { + errChan <- cn.runChildren(ctx) + }() + } + + cn.setRunHealth(component.HealthTypeHealthy, "started component") + + go func() { + errChan <- cn.source.Run(ctx) + }() + + err := <-errChan + + var exitMsg string + if err != nil { + level.Error(cn.logger).Log("msg", "component exited with error", "err", err) + exitMsg = fmt.Sprintf("component shut down with error: %s", err) + } else { + level.Info(cn.logger).Log("msg", "component exited") + exitMsg = "component shut down normally" + } + + cn.setRunHealth(component.HealthTypeExited, exitMsg) + return err +} + +func (cn *ImportConfigNode) setRunHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.runHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportConfigNode) Label() string { return cn.label } + +// Block implements BlockNode and returns the current block of the managed config node. +func (cn *ImportConfigNode) Block() *ast.BlockStmt { + return cn.block +} + +// NodeID implements dag.Node and returns the unique ID for the config node. +func (cn *ImportConfigNode) NodeID() string { return cn.nodeID } + +// This node has no exports. +func (cn *ImportConfigNode) Exports() component.Exports { + return nil +} + +func (cn *ImportConfigNode) ID() ComponentID { return cn.id } + +// Arguments returns the current arguments of the managed component. +func (cn *ImportConfigNode) Arguments() component.Arguments { + return cn.source.Arguments() +} + +// Component returns the instance of the managed component. Component may be +// nil if the ComponentNode has not been successfully evaluated yet. +func (cn *ImportConfigNode) Component() component.Component { + return cn.source.Component() +} + +// ImportedDeclares returns all declare blocks that it imported. +func (cn *ImportConfigNode) ImportedDeclares() map[string]*Declare { + cn.contentMut.RLock() + defer cn.contentMut.RUnlock() + return cn.importedDeclares +} + +// ImportConfigNodesChildren returns the ImportConfigNodesChildren of this ImportConfigNode. +func (cn *ImportConfigNode) ImportConfigNodesChildren() map[string]*ImportConfigNode { + cn.importChildrenMut.Lock() + defer cn.importChildrenMut.Unlock() + return cn.importConfigNodesChildren +} + +// CurrentHealth returns the current health of the ComponentNode. +// +// The health of a ComponentNode is determined by combining: +// +// 1. Health from the call to Run(). +// 2. Health from the last call to Evaluate(). +// 3. Health reported from the component. +func (cn *ImportConfigNode) CurrentHealth() component.Health { + cn.healthMut.RLock() + defer cn.healthMut.RUnlock() + return component.LeastHealthy(cn.runHealth, cn.evalHealth, cn.source.CurrentHealth()) +} + +// FileComponent does not have DebugInfo +func (cn *ImportConfigNode) DebugInfo() interface{} { + return nil +} + +// This component does not manage modules. +func (cn *ImportConfigNode) ModuleIDs() []string { + return nil +} + +// Registry returns the prometheus registry of the component. +func (cn *ImportConfigNode) Registry() *prometheus.Registry { + return cn.registry +} diff --git a/pkg/flow/internal/controller/node_custom_component.go b/pkg/flow/internal/controller/node_custom_component.go new file mode 100644 index 000000000000..a14c4a167da2 --- /dev/null +++ b/pkg/flow/internal/controller/node_custom_component.go @@ -0,0 +1,368 @@ +package controller + +import ( + "context" + "fmt" + "path" + "path/filepath" + "reflect" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/config" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/river/ast" + "github.com/grafana/river/vm" + "github.com/prometheus/client_golang/prometheus" +) + +// CustomComponentNode is a controller node which manages a custom component. +// +// CustomComponentNode manages the underlying custom component and caches its current +// arguments and exports. +type CustomComponentNode struct { + id ComponentID + globalID string + label string + componentName string + importLabel string + declareLabel string + nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. + managedOpts component.Options + registry *prometheus.Registry + moduleController ModuleController + OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate + + GetCustomComponentConfig func(*CustomComponentNode) (CustomComponentConfig, error) // Retrieve the custom component config. + + mut sync.RWMutex + block *ast.BlockStmt // Current River block to derive args from + eval *vm.Evaluator + managed *module.ModuleComponent // Inner managed custom component + args component.Arguments // Evaluated arguments for the managed component + + // NOTE(rfratto): health and exports have their own mutex because they may be + // set asynchronously while mut is still being held (i.e., when calling Evaluate + // and the managed custom component immediately creates new exports) + + healthMut sync.RWMutex + evalHealth component.Health // Health of the last evaluate + runHealth component.Health // Health of running the component + + exportsMut sync.RWMutex + exports component.Exports // Evaluated exports for the managed custom component +} + +// ExtractImportAndDeclareLabels extract an importLabel and a declareLabel from a componentName. +func ExtractImportAndDeclareLabels(componentName string) (string, string) { + parts := strings.Split(componentName, ".") + if len(parts) == 0 { + return "", "" + } + // If this is a local declare. + importLabel := "" + declareLabel := parts[0] + // If this is an imported custom component. + if len(parts) > 1 { + importLabel = parts[0] + declareLabel = parts[1] + } + return importLabel, declareLabel +} + +var _ ComponentNode = (*CustomComponentNode)(nil) + +// NewCustomComponentNode creates a new CustomComponentNode from an initial ast.BlockStmt. +// The underlying managed custom component isn't created until Evaluate is called. +func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, GetCustomComponentConfig func(*CustomComponentNode) (CustomComponentConfig, error)) *CustomComponentNode { + var ( + id = BlockComponentID(b) + nodeID = id.String() + ) + + initHealth := component.Health{ + Health: component.HealthTypeUnknown, + Message: "node declare component created", + UpdateTime: time.Now(), + } + + // We need to generate a globally unique component ID to give to the + // component and for use with telemetry data which doesn't support + // reconstructing the global ID. For everything else (HTTP, data), we can + // just use the controller-local ID as those values are guaranteed to be + // globally unique. + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + + componentName := b.GetBlockName() + + importLabel, declareLabel := ExtractImportAndDeclareLabels(componentName) + + cn := &CustomComponentNode{ + id: id, + globalID: globalID, + label: b.Label, + nodeID: nodeID, + componentName: componentName, + importLabel: importLabel, + declareLabel: declareLabel, + moduleController: globals.NewModuleController(globalID), + OnBlockNodeUpdate: globals.OnBlockNodeUpdate, + GetCustomComponentConfig: GetCustomComponentConfig, + + block: b, + eval: vm.New(b.Body), + + evalHealth: initHealth, + runHealth: initHealth, + } + cn.managedOpts = getCustomManagedOptions(globals, cn) + + return cn +} + +func getCustomManagedOptions(globals ComponentGlobals, cn *CustomComponentNode) component.Options { + cn.registry = prometheus.NewRegistry() + return component.Options{ + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.globalID, + }, cn.registry), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), + + DataPath: filepath.Join(globals.DataPath, cn.globalID), + + OnStateChange: cn.setExports, + ModuleController: cn.moduleController, + + GetServiceData: func(name string) (interface{}, error) { + return globals.GetServiceData(name) + }, + } +} + +// ID returns the component ID of the managed component from its River block. +func (cn *CustomComponentNode) ID() ComponentID { return cn.id } + +// Label returns the label for the block or "" if none was specified. +func (cn *CustomComponentNode) Label() string { return cn.label } + +// NodeID implements dag.Node and returns the unique ID for this node. The +// NodeID is the string representation of the component's ID from its River +// block. +func (cn *CustomComponentNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments for the +// managed component. The new block isn't used until the next time Evaluate is +// invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// CustomComponentNode. +func (cn *CustomComponentNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(cn.id) { + panic("UpdateBlock called with an River block with a different component ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} + +// Evaluate implements BlockNode and updates the arguments by re-evaluating its River block with the provided scope and the custom component by +// retrieving the component definition from the corresponding import or declare node. +// The managed custom component will be built the first time Evaluate is called. +// +// Evaluate will return an error if the River block cannot be evaluated, if +// decoding to arguments fails or if the custom component definition cannot be retrieved. +func (cn *CustomComponentNode) Evaluate(scope *vm.Scope) error { + err := cn.evaluate(scope) + + switch err { + case nil: + cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") + default: + msg := fmt.Sprintf("component evaluation failed: %s", err) + cn.setEvalHealth(component.HealthTypeUnhealthy, msg) + } + return err +} + +func (cn *CustomComponentNode) evaluate(scope *vm.Scope) error { + cn.mut.Lock() + defer cn.mut.Unlock() + + var args map[string]any + if err := cn.eval.Evaluate(scope, &args); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + + cn.args = args + + if cn.managed == nil { + // We haven't built the managed custom component successfully yet. + managed, err := module.NewModuleComponentV2(cn.managedOpts) + if err != nil { + return fmt.Errorf("building custom component: %w", err) + } + cn.managed = managed + } + + customComponentConfig, err := cn.GetCustomComponentConfig(cn) + if err != nil { + return fmt.Errorf("retrieving custom component config: %w", err) + } + + loaderConfig := config.LoaderConfigOptions{ + AdditionalDeclareContents: customComponentConfig.additionalDeclareContents, + } + + // Reload the custom component with new config + if err := cn.managed.LoadFlowSource(args, customComponentConfig.declareContent, loaderConfig); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (cn *CustomComponentNode) Run(ctx context.Context) error { + cn.mut.RLock() + managed := cn.managed + logger := cn.managedOpts.Logger + cn.mut.RUnlock() + + if managed == nil { + return ErrUnevaluated + } + + cn.setRunHealth(component.HealthTypeHealthy, "started custom component") + cn.managed.RunFlowController(ctx) + + level.Info(logger).Log("msg", "custom component exited") + cn.setRunHealth(component.HealthTypeExited, "custom component shut down") + return nil +} + +// Arguments returns the current arguments of the managed custom component. +func (cn *CustomComponentNode) Arguments() component.Arguments { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.args +} + +// Block implements BlockNode and returns the current block of the managed custom component. +func (cn *CustomComponentNode) Block() *ast.BlockStmt { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.block +} + +// Exports returns the current set of exports from the managed custom component. +// Exports returns nil if the managed custom component does not have exports. +func (cn *CustomComponentNode) Exports() component.Exports { + cn.exportsMut.RLock() + defer cn.exportsMut.RUnlock() + return cn.exports +} + +// setExports is called whenever the managed custom component updates. e must be the +// same type as the registered exports type of the managed custom component. +func (cn *CustomComponentNode) setExports(e component.Exports) { + // Some components may aggressively reexport values even though no exposed + // state has changed. This may be done for components which always supply + // exports whenever their arguments are evaluated without tracking internal + // state to see if anything actually changed. + // + // To avoid needlessly reevaluating components we'll ignore unchanged + // exports. + var changed bool + + cn.exportsMut.Lock() + if !reflect.DeepEqual(cn.exports, e) { + changed = true + cn.exports = e + } + cn.exportsMut.Unlock() + + if changed { + // Inform the controller that we have new exports. + cn.OnBlockNodeUpdate(cn) + } +} + +// CurrentHealth returns the current health of the CustomComponentNode. +// +// The health of a CustomComponentNode is determined by combining: +// +// 1. Health from the call to Run(). +// 2. Health from the last call to Evaluate(). +// 3. Health reported from the custom component. +func (cn *CustomComponentNode) CurrentHealth() component.Health { + cn.healthMut.RLock() + defer cn.healthMut.RUnlock() + return component.LeastHealthy(cn.runHealth, cn.evalHealth, cn.managed.CurrentHealth()) +} + +// TODO implement debugInfo? +func (cn *CustomComponentNode) DebugInfo() interface{} { + cn.mut.RLock() + defer cn.mut.RUnlock() + return nil +} + +// setEvalHealth sets the internal health from a call to Evaluate. See Health +// for information on how overall health is calculated. +func (cn *CustomComponentNode) setEvalHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.evalHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +// setRunHealth sets the internal health from a call to Run. See Health for +// information on how overall health is calculated. +func (cn *CustomComponentNode) setRunHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.runHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +// ModuleIDs returns the current list of custom components that this component is +// managing. +func (cn *CustomComponentNode) ModuleIDs() []string { + return cn.moduleController.ModuleIDs() +} + +// ComponentName returns the name of the component. +func (cn *CustomComponentNode) ComponentName() string { + return cn.componentName +} + +// Component returns the instance of the managed component. Component may be +// nil if the CustomComponentNode has not been successfully evaluated yet. +func (cn *CustomComponentNode) Component() component.Component { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.managed +} + +// Registry returns the prometheus registry of the component. +func (cn *CustomComponentNode) Registry() *prometheus.Registry { + return cn.registry +} diff --git a/pkg/flow/internal/controller/node_declare.go b/pkg/flow/internal/controller/node_declare.go new file mode 100644 index 000000000000..ba097008ab5f --- /dev/null +++ b/pkg/flow/internal/controller/node_declare.go @@ -0,0 +1,45 @@ +package controller + +import ( + "github.com/grafana/river/ast" + "github.com/grafana/river/vm" +) + +type DeclareNode struct { + label string + nodeID string + componentName string + // A declare content is static, it does not change during the lifetime of the node. + declare *Declare +} + +var _ BlockNode = (*DeclareNode)(nil) + +// NewDeclareNode creates a new declare node with a content which will be loaded by custom components. +func NewDeclareNode(declare *Declare) *DeclareNode { + return &DeclareNode{ + label: declare.block.Label, + nodeID: BlockComponentID(declare.block).String(), + componentName: declare.block.GetBlockName(), + declare: declare, + } +} + +func (cn *DeclareNode) Declare() *Declare { + return cn.declare +} + +// Evaluate does nothing for this node. +func (cn *DeclareNode) Evaluate(scope *vm.Scope) error { + return nil +} + +func (cn *DeclareNode) Label() string { return cn.label } + +// Block implements BlockNode and returns the current block of the managed config node. +func (cn *DeclareNode) Block() *ast.BlockStmt { + return cn.declare.block +} + +// NodeID implements dag.Node and returns the unique ID for the config node. +func (cn *DeclareNode) NodeID() string { return cn.nodeID } diff --git a/pkg/flow/internal/importsource/import_file.go b/pkg/flow/internal/importsource/import_file.go new file mode 100644 index 000000000000..178064c91872 --- /dev/null +++ b/pkg/flow/internal/importsource/import_file.go @@ -0,0 +1,86 @@ +package importsource + +import ( + "context" + "fmt" + "reflect" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/local/file" + "github.com/grafana/river/vm" +) + +type ImportFile struct { + fileComponent *file.Component + arguments component.Arguments + managedOpts component.Options + eval *vm.Evaluator +} + +var _ ImportSource = (*ImportFile)(nil) + +func NewImportFile(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportFile { + opts := managedOpts + opts.OnStateChange = func(e component.Exports) { + onContentChange(e.(file.Exports).Content.Value) + } + return &ImportFile{ + managedOpts: opts, + eval: eval, + } +} + +type importFileConfigBlock struct { + LocalFileArguments file.Arguments `river:",squash"` +} + +// SetToDefault implements river.Defaulter. +func (a *importFileConfigBlock) SetToDefault() { + a.LocalFileArguments = file.DefaultArguments +} + +func (im *ImportFile) Evaluate(scope *vm.Scope) error { + var arguments importFileConfigBlock + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + if im.fileComponent == nil { + var err error + im.fileComponent, err = file.New(im.managedOpts, arguments.LocalFileArguments) + if err != nil { + return fmt.Errorf("creating file component: %w", err) + } + im.arguments = arguments + } + + if reflect.DeepEqual(im.arguments, arguments) { + return nil + } + + // Update the existing managed component + if err := im.fileComponent.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportFile) Run(ctx context.Context) error { + return im.fileComponent.Run(ctx) +} + +func (im *ImportFile) Arguments() component.Arguments { + return im.arguments +} + +func (im *ImportFile) Component() component.Component { + return im.fileComponent +} + +func (im *ImportFile) CurrentHealth() component.Health { + return im.fileComponent.CurrentHealth() +} + +// DebugInfo() is not implemented by the file component. +func (im *ImportFile) DebugInfo() interface{} { + return nil +} diff --git a/pkg/flow/internal/importsource/import_git.go b/pkg/flow/internal/importsource/import_git.go new file mode 100644 index 000000000000..1dc8ad385550 --- /dev/null +++ b/pkg/flow/internal/importsource/import_git.go @@ -0,0 +1,280 @@ +package importsource + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "reflect" + "sync" + "time" + + "github.com/go-kit/log" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/internal/vcs" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/river/vm" +) + +// The difference between this import source and the others is that there is no git component. +type ImportGit struct { + opts component.Options + log log.Logger + eval *vm.Evaluator + mut sync.RWMutex + repo *vcs.GitRepo + repoOpts vcs.GitRepoOptions + args Arguments + onContentChange func(string) + + lastContent string + + argsChanged chan struct{} + + healthMut sync.RWMutex + health component.Health +} + +var ( + _ ImportSource = (*ImportGit)(nil) + _ component.Component = (*ImportGit)(nil) + _ component.HealthComponent = (*ImportGit)(nil) +) + +type Arguments struct { + Repository string `river:"repository,attr"` + Revision string `river:"revision,attr,optional"` + Path string `river:"path,attr"` + PullFrequency time.Duration `river:"pull_frequency,attr,optional"` + GitAuthConfig vcs.GitAuthConfig `river:",squash"` +} + +var DefaultArguments = Arguments{ + Revision: "HEAD", + PullFrequency: time.Minute, +} + +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +func NewImportGit(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportGit { + return &ImportGit{ + opts: managedOpts, + log: managedOpts.Logger, + eval: eval, + argsChanged: make(chan struct{}, 1), + onContentChange: onContentChange, + } +} + +func (im *ImportGit) Evaluate(scope *vm.Scope) error { + var arguments Arguments + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + + if reflect.DeepEqual(im.args, arguments) { + return nil + } + + if err := im.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportGit) Run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + ticker *time.Ticker + tickerC <-chan time.Time + ) + + for { + select { + case <-ctx.Done(): + return nil + + case <-im.argsChanged: + im.mut.Lock() + pullFrequency := im.args.PullFrequency + im.mut.Unlock() + ticker, tickerC = im.updateTicker(pullFrequency, ticker, tickerC) + + case <-tickerC: + level.Info(im.log).Log("msg", "updating repository") + im.tickPollFile(ctx) + } + } +} + +func (im *ImportGit) updateTicker(pullFrequency time.Duration, ticker *time.Ticker, tickerC <-chan time.Time) (*time.Ticker, <-chan time.Time) { + level.Info(im.log).Log("msg", "updating repository pull frequency, next pull attempt will be done according to the pullFrequency", "new_frequency", pullFrequency) + + if pullFrequency > 0 { + if ticker == nil { + ticker = time.NewTicker(pullFrequency) + tickerC = ticker.C + } else { + ticker.Reset(pullFrequency) + } + return ticker, tickerC + } + + if ticker != nil { + ticker.Stop() + } + return nil, nil +} + +func (im *ImportGit) tickPollFile(ctx context.Context) { + im.mut.Lock() + err := im.pollFile(ctx, im.args) + pullFrequency := im.args.PullFrequency + im.mut.Unlock() + + im.updateHealth(err) + + if err != nil { + level.Error(im.log).Log("msg", "failed to update repository", "pullFrequency", pullFrequency, "err", err) + } +} + +func (im *ImportGit) updateHealth(err error) { + im.healthMut.Lock() + defer im.healthMut.Unlock() + + if err != nil { + im.health = component.Health{ + Health: component.HealthTypeUnhealthy, + Message: err.Error(), + UpdateTime: time.Now(), + } + } else { + im.health = component.Health{ + Health: component.HealthTypeHealthy, + Message: "module updated", + UpdateTime: time.Now(), + } + } +} + +// Update implements component.Component. +// Only acknowledge the error from Update if it's not a +// vcs.UpdateFailedError; vcs.UpdateFailedError means that the Git repo +// exists, but we were unable to update it. It makes sense to retry on the next poll and it may succeed. +func (im *ImportGit) Update(args component.Arguments) (err error) { + defer func() { + im.updateHealth(err) + }() + im.mut.Lock() + defer im.mut.Unlock() + + newArgs := args.(Arguments) + + // TODO(rfratto): store in a repo-specific directory so changing repositories + // doesn't risk break the module loader if there's a SHA collision between + // the two different repositories. + repoPath := filepath.Join(im.opts.DataPath, "repo") + + repoOpts := vcs.GitRepoOptions{ + Repository: newArgs.Repository, + Revision: newArgs.Revision, + Auth: newArgs.GitAuthConfig, + } + + // Create or update the repo field. + // Failure to update repository makes the module loader temporarily use cached contents on disk + if im.repo == nil || !reflect.DeepEqual(repoOpts, im.repoOpts) { + r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts) + if err != nil { + if errors.As(err, &vcs.UpdateFailedError{}) { + level.Error(im.log).Log("msg", "failed to update repository", "err", err) + im.updateHealth(err) + } else { + return err + } + } + im.repo = r + im.repoOpts = repoOpts + } + + if err := im.pollFile(context.Background(), newArgs); err != nil { + if errors.As(err, &vcs.UpdateFailedError{}) { + level.Error(im.log).Log("msg", "failed to poll file from repository", "err", err) + // We don't update the health here because it will be updated via the defer call. + // This is not very good because if we reassign the err before exiting the function it will not update the health correctly. + // TODO improve the error health handling. + } else { + return err + } + } + + // Schedule an update for handling the changed arguments. + select { + case im.argsChanged <- struct{}{}: + default: + } + + im.args = newArgs + return nil +} + +// pollFile fetches the latest content from the repository and updates the +// controller. pollFile must only be called with im.mut held. +func (im *ImportGit) pollFile(ctx context.Context, args Arguments) error { + // Make sure our repo is up-to-date. + if err := im.repo.Update(ctx); err != nil { + return err + } + + // Finally, configure our controller. + bb, err := im.repo.ReadFile(args.Path) + if err != nil { + return err + } + content := string(bb) + if im.lastContent != content { + im.onContentChange(content) + im.lastContent = content + } + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (im *ImportGit) CurrentHealth() component.Health { + im.healthMut.RLock() + defer im.healthMut.RUnlock() + return im.health +} + +// DebugInfo implements component.DebugComponent. +func (im *ImportGit) DebugInfo() interface{} { + type DebugInfo struct { + SHA string `river:"sha,attr"` + RepoError string `river:"repo_error,attr,optional"` + } + + im.mut.RLock() + defer im.mut.RUnlock() + + rev, err := im.repo.CurrentRevision() + if err != nil { + return DebugInfo{RepoError: err.Error()} + } else { + return DebugInfo{SHA: rev} + } +} + +func (im *ImportGit) Arguments() component.Arguments { + return im.args +} + +func (im *ImportGit) Component() component.Component { + return im +} diff --git a/pkg/flow/internal/importsource/import_http.go b/pkg/flow/internal/importsource/import_http.go new file mode 100644 index 000000000000..9bc0ece91fbc --- /dev/null +++ b/pkg/flow/internal/importsource/import_http.go @@ -0,0 +1,87 @@ +package importsource + +import ( + "context" + "fmt" + "reflect" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/remote/http" + remote_http "github.com/grafana/agent/component/remote/http" + "github.com/grafana/river/vm" +) + +type ImportHTTP struct { + managedRemoteHTTP *remote_http.Component + arguments component.Arguments + managedOpts component.Options + eval *vm.Evaluator +} + +var _ ImportSource = (*ImportHTTP)(nil) + +func NewImportHTTP(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportHTTP { + opts := managedOpts + opts.OnStateChange = func(e component.Exports) { + onContentChange(e.(http.Exports).Content.Value) + } + return &ImportHTTP{ + managedOpts: opts, + eval: eval, + } +} + +type ImportHTTPConfigBlock struct { + RemoteHTTPArguments remote_http.Arguments `river:",squash"` +} + +// SetToDefault implements river.Defaulter. +func (a *ImportHTTPConfigBlock) SetToDefault() { + a.RemoteHTTPArguments.SetToDefault() +} + +func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { + var arguments ImportHTTPConfigBlock + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + if im.managedRemoteHTTP == nil { + var err error + im.managedRemoteHTTP, err = remote_http.New(im.managedOpts, arguments.RemoteHTTPArguments) + if err != nil { + return fmt.Errorf("creating http component: %w", err) + } + im.arguments = arguments + } + + if reflect.DeepEqual(im.arguments, arguments) { + return nil + } + + // Update the existing managed component + if err := im.managedRemoteHTTP.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportHTTP) Run(ctx context.Context) error { + return im.managedRemoteHTTP.Run(ctx) +} + +func (im *ImportHTTP) Arguments() component.Arguments { + return im.arguments +} + +func (im *ImportHTTP) Component() component.Component { + return im.managedRemoteHTTP +} + +func (im *ImportHTTP) CurrentHealth() component.Health { + return im.managedRemoteHTTP.CurrentHealth() +} + +// DebugInfo() is not implemented by the http component. +func (im *ImportHTTP) DebugInfo() interface{} { + return nil +} diff --git a/pkg/flow/internal/importsource/import_source.go b/pkg/flow/internal/importsource/import_source.go new file mode 100644 index 000000000000..4841c12a7e61 --- /dev/null +++ b/pkg/flow/internal/importsource/import_source.go @@ -0,0 +1,58 @@ +package importsource + +import ( + "context" + "fmt" + + "github.com/grafana/agent/component" + "github.com/grafana/river/vm" +) + +type SourceType int + +const ( + File SourceType = iota + Http + Git +) + +const ( + BlockImportFile = "import.file" + BlockImportHTTP = "import.http" + BlockImportGit = "import.git" +) + +type ImportSource interface { + Evaluate(scope *vm.Scope) error + Run(ctx context.Context) error + Component() component.Component + CurrentHealth() component.Health + DebugInfo() interface{} + Arguments() component.Arguments +} + +func NewImportSource(sourceType SourceType, managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) ImportSource { + switch sourceType { + case File: + return NewImportFile(managedOpts, eval, onContentChange) + case Http: + return NewImportHTTP(managedOpts, eval, onContentChange) + case Git: + return NewImportGit(managedOpts, eval, onContentChange) + } + // This is a programming error, not a config error so this is ok to panic. + panic(fmt.Errorf("unsupported source type: %v", sourceType)) +} + +func GetSourceType(fullName string) SourceType { + switch fullName { + case BlockImportFile: + return File + case BlockImportGit: + return Git + case BlockImportHTTP: + return Http + } + // This is a programming error, not a config error so this is ok to panic. + panic(fmt.Errorf("name does not map to a know source type: %v", fullName)) +} diff --git a/pkg/flow/module.go b/pkg/flow/module.go index ec97aab093d5..689ccefae6c2 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" "github.com/grafana/agent/pkg/flow/logging" @@ -128,12 +129,12 @@ func newModule(o *moduleOptions) *module { } // LoadConfig parses River config and loads it. -func (c *module) LoadConfig(config []byte, args map[string]any) error { - ff, err := ParseSource(c.o.ID, config) +func (c *module) LoadConfig(cfg []byte, args map[string]any, options config.LoaderConfigOptions) error { + ff, err := ParseSource(c.o.ID, cfg) if err != nil { return err } - return c.f.LoadSource(ff, args) + return c.f.loadSource(ff, args, options) } // Run starts the Module. No components within the Module diff --git a/pkg/flow/module_declare_test.go b/pkg/flow/module_declare_test.go new file mode 100644 index 000000000000..91abb01f59a6 --- /dev/null +++ b/pkg/flow/module_declare_test.go @@ -0,0 +1,227 @@ +package flow_test + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/internal/testcomponents" + "github.com/stretchr/testify/require" +) + +type testCase struct { + name string + config string + expected int +} + +func TestDeclare(t *testing.T) { + tt := []testCase{ + { + name: "BasicDeclare", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "NestedDeclares", + config: ` + declare "test" { + argument "input" { + optional = false + } + + declare "nested" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + nested "default" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = nested.default.output + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "DeclaredInParentDepth1", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + rootDeclare "default" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = rootDeclare.default.output + } + } + declare "rootDeclare" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "DeclaredInParentDepth2", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + declare "anotherDeclare" { + argument "input" { + optional = false + } + rootDeclare "default" { + input = argument.input.value + } + export "output" { + value = rootDeclare.default.output + } + } + + anotherDeclare "myOtherDeclare" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = anotherDeclare.myOtherDeclare.output + } + } + declare "rootDeclare" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == tc.expected + }, 3*time.Second, 10*time.Millisecond) + }) + } +} diff --git a/pkg/flow/module_import_test.go b/pkg/flow/module_import_test.go new file mode 100644 index 000000000000..98ea9b1e57f0 --- /dev/null +++ b/pkg/flow/module_import_test.go @@ -0,0 +1,742 @@ +package flow_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/internal/testcomponents" + "github.com/stretchr/testify/require" + + _ "github.com/grafana/agent/component/module/string" +) + +func TestImportModule(t *testing.T) { + const defaultModuleUpdate = ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "testOutput" { + value = -10 + } + } +` + testCases := []struct { + name string + module string + otherModule string + config string + updateModule func(filename string) string + updateFile string + }{ + { + name: "TestImportModule", + module: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.testOutput + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleNoArgs", + module: ` + declare "test" { + testcomponents.passthrough "pt" { + input = 10 + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.testOutput + } + `, + updateModule: func(filename string) string { + return ` + declare "test" { + testcomponents.passthrough "pt" { + input = -10 + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + } + ` + }, + updateFile: "module", + }, + { + name: "TestImportModuleInDeclare", + module: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = testImport.test.myModule.testOutput + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.anotherModuleOutput + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleInNestedDeclare", + module: ` + declare "test" { + argument "input" { + optional = false + } + + export "testOutput" { + value = argument.input.value + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "yetAgainAnotherModule" { + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testcomponents.passthrough "pt" { + input = testcomponents.count.inc.count + lag = "1ms" + } + + testImport.test "myModule" { + input = testcomponents.passthrough.pt.output + } + + export "anotherModuleOutput" { + value = testImport.test.myModule.testOutput + } + } + anotherModule "myOtherModule" {} + + export "yetAgainAnotherModuleOutput" { + value = anotherModule.myOtherModule.anotherModuleOutput + } + } + + yetAgainAnotherModule "default" {} + + testcomponents.summation "sum" { + input = yetAgainAnotherModule.default.yetAgainAnotherModuleOutput + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleWithImportBlock", + module: ` + import.file "otherModule" { + filename = "other_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + otherModule.test "default" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = otherModule.test.default.testOutput + } + } + `, + otherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.anotherModule.myOtherModule.anotherModuleOutput + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "other_module", + }, + { + name: "TestImportModuleWithNestedDeclareUsingModule", + module: ` + import.file "default" { + filename = "other_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + declare "blabla" { + argument "input" {} + default.test "default" { + input = argument.input.value + } + + export "blablaOutput" { + value = default.test.default.testOutput + } + } + + blabla "default" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = blabla.default.blablaOutput + } + } + `, + otherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "testOutput" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.anotherModule.myOtherModule.anotherModuleOutput + } + `, + }, + { + name: "TestImportModuleWithNestedDeclareDependency", + module: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "other_testOutput" { + value = testcomponents.passthrough.pt.output + } + } + + declare "test" { + argument "input" { + optional = false + } + + other_test "default" { + input = argument.input.value + } + + export "testOutput" { + value = other_test.default.other_testOutput + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = testImport.test.myModule.testOutput + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.anotherModuleOutput + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "output" { + value = -10 + } + } + + declare "test" { + argument "input" { + optional = false + } + + other_test "default" { + input = argument.input.value + } + + export "testOutput" { + value = other_test.default.output + } + } + ` + }, + updateFile: "module", + }, + { + name: "TestImportModuleWithMoreNesting", + module: ` + import.file "importOtherTest" { + filename = "other_module" + } + declare "test" { + argument "input" { + optional = false + } + + importOtherTest.other_test "default" { + input = argument.input.value + } + + export "testOutput" { + value = importOtherTest.other_test.default.other_testOutput + } + } + `, + otherModule: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "other_testOutput" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = testImport.test.myModule.testOutput + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.anotherModuleOutput + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "other_testOutput" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + { + name: "TestImportModuleWithMoreNestingAndMoreNesting", + module: ` + import.file "importOtherTest" { + filename = "other_module" + } + declare "test" { + argument "input" { + optional = false + } + + declare "anotherOne" { + argument "input" { + optional = false + } + importOtherTest.other_test "default" { + input = argument.input.value + } + export "anotherOneOutput" { + value = importOtherTest.other_test.default.other_testOutput + } + } + + anotherOne "default" { + input = argument.input.value + } + + export "testOutput" { + value = anotherOne.default.anotherOneOutput + } + } + `, + otherModule: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "5ms" + } + + export "other_testOutput" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "anotherModuleOutput" { + value = testImport.test.myModule.testOutput + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.anotherModuleOutput + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "other_testOutput" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + filename := "module" + require.NoError(t, os.WriteFile(filename, []byte(tc.module), 0664)) + defer os.Remove(filename) + + otherFilename := "other_module" + if tc.otherModule != "" { + require.NoError(t, os.WriteFile(otherFilename, []byte(tc.otherModule), 0664)) + defer os.Remove(otherFilename) + } + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + // Update module if needed + if tc.updateModule != nil { + newModule := tc.updateModule(tc.updateFile) + require.NoError(t, os.WriteFile(tc.updateFile, []byte(newModule), 0664)) + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == -10 + }, 3*time.Second, 10*time.Millisecond) + } + }) + } +} + +func TestImportModuleError(t *testing.T) { + testCases := []struct { + name string + module string + otherModule string + config string + expectedError string + }{ + { + name: "TestImportedModuleTriesAccessingDeclareOnRoot", + module: ` + declare "test" { + argument "input" { + optional = false + } + + cantAccessThis "default" {} + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + declare "cantAccessThis" { + export "output" { + value = -1 + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.output + } + `, + expectedError: `unrecognized component name "cantAccessThis"`, + }, // TODO: add more tests + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filename := "module" + require.NoError(t, os.WriteFile(filename, []byte(tc.module), 0664)) + defer os.Remove(filename) + + otherFilename := "other_module" + if tc.otherModule != "" { + require.NoError(t, os.WriteFile(otherFilename, []byte(tc.otherModule), 0664)) + defer os.Remove(otherFilename) + } + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil) + require.ErrorContains(t, err, tc.expectedError) + }) + } +} diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index c5f4417c84c3..4966a7169993 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/config" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" "github.com/grafana/agent/pkg/flow/logging" @@ -314,7 +315,7 @@ func (t *testModule) Run(ctx context.Context) error { return err } - err = m.LoadConfig([]byte(t.content), t.args) + err = m.LoadConfig([]byte(t.content), t.args, config.DefaultLoaderConfigOptions()) if err != nil { return err } diff --git a/pkg/flow/source.go b/pkg/flow/source.go index acd7d2ce2f58..7196339380c8 100644 --- a/pkg/flow/source.go +++ b/pkg/flow/source.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/grafana/agent/pkg/config/encoder" + "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/river/ast" "github.com/grafana/river/diag" "github.com/grafana/river/parser" @@ -17,10 +18,11 @@ type Source struct { sourceMap map[string][]byte // Map that links parsed Flow source's name with its content. hash [sha256.Size]byte // Hash of all files in sourceMap sorted by name. - // Components holds the list of raw River AST blocks describing components. + // Components holds the list of raw River AST blocks describing components and services. // The Flow controller can interpret them. - components []*ast.BlockStmt + blocks []*ast.BlockStmt configBlocks []*ast.BlockStmt + declares []*controller.Declare } // ParseSource parses the River file specified by bb into a File. name should be @@ -43,8 +45,9 @@ func ParseSource(name string, bb []byte) (*Source, error) { // TODO(rfratto): should this code be brought into a helper somewhere? Maybe // in ast? var ( - components []*ast.BlockStmt - configs []*ast.BlockStmt + blocks []*ast.BlockStmt + configs []*ast.BlockStmt + declares []*controller.Declare ) for _, stmt := range node.Body { @@ -60,10 +63,12 @@ func ParseSource(name string, bb []byte) (*Source, error) { case *ast.BlockStmt: fullName := strings.Join(stmt.Name, ".") switch fullName { - case "logging", "tracing", "argument", "export": + case "declare": + declares = append(declares, controller.NewDeclare(stmt, string(bb[stmt.LCurlyPos.Position().Offset+1:stmt.RCurlyPos.Position().Offset-1]))) + case "logging", "tracing", "argument", "export", "import.file", "import.git", "import.http": configs = append(configs, stmt) default: - components = append(components, stmt) + blocks = append(blocks, stmt) } default: @@ -77,8 +82,9 @@ func ParseSource(name string, bb []byte) (*Source, error) { } return &Source{ - components: components, + blocks: blocks, configBlocks: configs, + declares: declares, sourceMap: map[string][]byte{name: bb}, hash: sha256.Sum256(bb), }, nil @@ -118,8 +124,9 @@ func ParseSources(sources map[string][]byte) (*Source, error) { return nil, err } - mergedSource.components = append(mergedSource.components, sourceFragment.components...) + mergedSource.blocks = append(mergedSource.blocks, sourceFragment.blocks...) mergedSource.configBlocks = append(mergedSource.configBlocks, sourceFragment.configBlocks...) + mergedSource.declares = append(mergedSource.declares, sourceFragment.declares...) } mergedSource.hash = [32]byte(hash.Sum(nil)) diff --git a/pkg/flow/source_test.go b/pkg/flow/source_test.go index fa79c8c1e9e1..9daa935f52fa 100644 --- a/pkg/flow/source_test.go +++ b/pkg/flow/source_test.go @@ -26,9 +26,9 @@ func TestParseSource(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - require.Len(t, f.components, 2) - require.Equal(t, "testcomponents.tick.ticker_a", getBlockID(f.components[0])) - require.Equal(t, "testcomponents.passthrough.static", getBlockID(f.components[1])) + require.Len(t, f.blocks, 2) + require.Equal(t, "testcomponents.tick.ticker_a", getBlockID(f.blocks[0])) + require.Equal(t, "testcomponents.passthrough.static", getBlockID(f.blocks[1])) } func TestParseSourceWithConfigBlock(t *testing.T) { @@ -46,8 +46,8 @@ func TestParseSourceWithConfigBlock(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - require.Len(t, f.components, 1) - require.Equal(t, "testcomponents.tick.ticker_with_config_block", getBlockID(f.components[0])) + require.Len(t, f.blocks, 1) + require.Equal(t, "testcomponents.tick.ticker_with_config_block", getBlockID(f.blocks[0])) require.Len(t, f.configBlocks, 1) require.Equal(t, "logging", getBlockID(f.configBlocks[0])) } @@ -57,7 +57,7 @@ func TestParseSource_Defaults(t *testing.T) { require.NotNil(t, f) require.NoError(t, err) - require.Len(t, f.components, 0) + require.Len(t, f.blocks, 0) } func TestParseSources_DuplicateComponent(t *testing.T) {