From a73e2e3137633f88b9b8dccbe136cb0b944fdd57 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 30 Nov 2023 13:57:52 +0100 Subject: [PATCH] Implement new modules with import and declare keywords. --- .github/workflows/integration-tests.yml | 2 + cmd/internal/flowmode/cmd_run.go | 4 +- component/component_provider.go | 6 +- component/module/file/file.go | 6 +- component/module/git/git.go | 6 +- component/module/http/http.go | 6 +- component/module/module.go | 41 +- component/module/string/string.go | 4 +- component/registry.go | 2 +- converter/internal/test_common/testing.go | 2 +- .../configs/http-module/module.river | 21 + integration-tests/docker-compose.yaml | 9 +- .../tests/module-file/config.river | 38 + .../tests/module-file/logfile.river | 5 + integration-tests/tests/module-file/logs.txt | 13 + .../tests/module-file/loki.river | 17 + .../tests/module-file/loki_write.river | 13 + .../tests/module-file/module_file_test.go | 95 +++ .../tests/module-file/prom-scrape.river | 21 + .../config.river | 27 + .../scrape_prom_metrics_module_git_test.go | 78 ++ .../config.river | 24 + .../scrape_prom_metrics_module_http_test.go | 78 ++ pkg/flow/componenttest/testfailmodule.go | 6 +- pkg/flow/flow.go | 22 +- pkg/flow/flow_components.go | 36 +- pkg/flow/flow_services_test.go | 14 +- pkg/flow/flow_test.go | 4 +- pkg/flow/flow_updates_test.go | 10 +- .../internal/controller/component_node.go | 34 + pkg/flow/internal/controller/declare.go | 9 + pkg/flow/internal/controller/loader.go | 229 ++++-- pkg/flow/internal/controller/loader_test.go | 7 +- pkg/flow/internal/controller/module_info.go | 111 +++ .../internal/controller/module_references.go | 77 ++ pkg/flow/internal/controller/node_config.go | 7 + .../internal/controller/node_config_import.go | 396 ++++++++++ pkg/flow/internal/controller/node_declare.go | 55 ++ .../controller/node_declare_component.go | 361 +++++++++ ..._component.go => node_native_component.go} | 70 +- ..._test.go => node_native_component_test.go} | 4 +- .../controller/node_with_dependants.go | 18 + pkg/flow/internal/controller/queue.go | 16 +- pkg/flow/internal/controller/queue_test.go | 8 +- .../internal/import-source/import_file.go | 86 ++ pkg/flow/internal/import-source/import_git.go | 281 +++++++ .../internal/import-source/import_http.go | 87 ++ .../internal/import-source/import_source.go | 58 ++ pkg/flow/module.go | 4 +- pkg/flow/module_caching_test.go | 4 +- pkg/flow/module_declare_test.go | 227 ++++++ pkg/flow/module_fail_test.go | 2 +- pkg/flow/module_import_test.go | 741 ++++++++++++++++++ pkg/flow/module_test.go | 8 +- pkg/flow/source.go | 9 +- pkg/flow/source_test.go | 4 +- .../git/internal/vcs => pkg/util/git}/auth.go | 0 .../internal/vcs => pkg/util/git}/errors.go | 0 .../git/internal/vcs => pkg/util/git}/git.go | 0 .../internal/vcs => pkg/util/git}/git_test.go | 2 +- 60 files changed, 3357 insertions(+), 168 deletions(-) create mode 100644 integration-tests/configs/http-module/module.river create mode 100644 integration-tests/tests/module-file/config.river create mode 100644 integration-tests/tests/module-file/logfile.river create mode 100644 integration-tests/tests/module-file/logs.txt create mode 100644 integration-tests/tests/module-file/loki.river create mode 100644 integration-tests/tests/module-file/loki_write.river create mode 100644 integration-tests/tests/module-file/module_file_test.go create mode 100644 integration-tests/tests/module-file/prom-scrape.river create mode 100644 integration-tests/tests/scrape-prom-metrics-module-git/config.river create mode 100644 integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go create mode 100644 integration-tests/tests/scrape-prom-metrics-module-http/config.river create mode 100644 integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go create mode 100644 pkg/flow/internal/controller/component_node.go create mode 100644 pkg/flow/internal/controller/declare.go create mode 100644 pkg/flow/internal/controller/module_info.go create mode 100644 pkg/flow/internal/controller/module_references.go create mode 100644 pkg/flow/internal/controller/node_config_import.go create mode 100644 pkg/flow/internal/controller/node_declare.go create mode 100644 pkg/flow/internal/controller/node_declare_component.go rename pkg/flow/internal/controller/{node_component.go => node_native_component.go} (85%) rename pkg/flow/internal/controller/{node_component_test.go => node_native_component_test.go} (93%) create mode 100644 pkg/flow/internal/controller/node_with_dependants.go create mode 100644 pkg/flow/internal/import-source/import_file.go create mode 100644 pkg/flow/internal/import-source/import_git.go create mode 100644 pkg/flow/internal/import-source/import_http.go create mode 100644 pkg/flow/internal/import-source/import_source.go create mode 100644 pkg/flow/module_declare_test.go create mode 100644 pkg/flow/module_import_test.go rename {component/module/git/internal/vcs => pkg/util/git}/auth.go (100%) rename {component/module/git/internal/vcs => pkg/util/git}/errors.go (100%) rename {component/module/git/internal/vcs => pkg/util/git}/git.go (100%) rename {component/module/git/internal/vcs => pkg/util/git}/git_test.go (97%) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 4b9f7077ed57..11e6f260ca05 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -19,5 +19,7 @@ jobs: go-version: "1.21" - name: Set OTEL Exporter Endpoint run: echo "OTEL_EXPORTER_ENDPOINT=172.17.0.1:4318" >> $GITHUB_ENV + - name: Set Module HTTP Endpoint + run: echo "MODULE_HTTP_ENDPOINT=http://172.17.0.1:8090/module.river" >> $GITHUB_ENV - name: Run tests run: make integration-test diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index c8618b928b85..c70905dfcb1f 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -274,7 +274,7 @@ func (fr *flowRun) Run(configPath string) error { if err != nil { return nil, fmt.Errorf("reading config path %q: %w", configPath, err) } - if err := f.LoadSource(flowSource, nil); err != nil { + if err := f.LoadSource(flowSource, nil, nil); err != nil { return flowSource, fmt.Errorf("error during the initial grafana/agent load: %w", err) } @@ -360,7 +360,7 @@ func getEnabledComponentsFunc(f *flow.Flow) func() map[string]interface{} { components := component.GetAllComponents(f, component.InfoOptions{}) componentNames := map[string]struct{}{} for _, c := range components { - componentNames[c.Registration.Name] = struct{}{} + componentNames[c.BlockName] = struct{}{} } return map[string]interface{}{"enabled-components": maps.Keys(componentNames)} } diff --git a/component/component_provider.go b/component/component_provider.go index 90454b5b04c3..b299f7479bf9 100644 --- a/component/component_provider.go +++ b/component/component_provider.go @@ -93,8 +93,8 @@ type Info struct { // this component depends on, or is depended on by, respectively. References, ReferencedBy []string - Registration Registration // Component registration. - Health Health // Current component health. + BlockName string // Component block name. + Health Health // Current component health. Arguments Arguments // Current arguments value of the component. Exports Exports // Current exports value of the component. @@ -157,7 +157,7 @@ func (info *Info) MarshalJSON() ([]byte, error) { } return json.Marshal(&componentDetailJSON{ - Name: info.Registration.Name, + Name: info.BlockName, Type: "block", ModuleID: info.ID.ModuleID, LocalID: info.ID.LocalID, diff --git a/component/module/file/file.go b/component/module/file/file.go index e40c5dc9ca48..fdc19804eb69 100644 --- a/component/module/file/file.go +++ b/component/module/file/file.go @@ -58,7 +58,7 @@ var ( // New creates a new module.file component. func New(o component.Options, args Arguments) (*Component, error) { - m, err := module.NewModuleComponent(o) + m, err := module.NewModuleComponentDeprecated(o) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone if !c.inUpdate.Load() && c.isCreated.Load() { // Any errors found here are reported via component health - _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value) + _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, nil) } } @@ -135,7 +135,7 @@ func (c *Component) Update(args component.Arguments) error { // Force a content load here and bubble up any error. This will catch problems // on initial load. - return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value) + return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, nil) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/git/git.go b/component/module/git/git.go index dfe17ef2cb4a..a7d7c5f27dd9 100644 --- a/component/module/git/git.go +++ b/component/module/git/git.go @@ -12,8 +12,8 @@ import ( "github.com/go-kit/log" "github.com/grafana/agent/component" "github.com/grafana/agent/component/module" - "github.com/grafana/agent/component/module/git/internal/vcs" "github.com/grafana/agent/pkg/flow/logging/level" + vcs "github.com/grafana/agent/pkg/util/git" ) func init() { @@ -74,7 +74,7 @@ var ( // New creates a new module.git component. func New(o component.Options, args Arguments) (*Component, error) { - m, err := module.NewModuleComponent(o) + m, err := module.NewModuleComponentDeprecated(o) if err != nil { return nil, err } @@ -239,7 +239,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error { return err } - return c.mod.LoadFlowSource(args.Arguments, string(bb)) + return c.mod.LoadFlowSource(args.Arguments, string(bb), nil) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/http/http.go b/component/module/http/http.go index bc1be2158fdb..39855b9b8595 100644 --- a/component/module/http/http.go +++ b/component/module/http/http.go @@ -57,7 +57,7 @@ var ( // New creates a new module.http component. func New(o component.Options, args Arguments) (*Component, error) { - m, err := module.NewModuleComponent(o) + m, err := module.NewModuleComponentDeprecated(o) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http. if !c.inUpdate.Load() && c.isCreated.Load() { // Any errors found here are reported via component health - _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value) + _ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, nil) } } @@ -134,7 +134,7 @@ func (c *Component) Update(args component.Arguments) error { // Force a content load here and bubble up any error. This will catch problems // on initial load. - return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value) + return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, nil) } // CurrentHealth implements component.HealthComponent. diff --git a/component/module/module.go b/component/module/module.go index 7995fdbca5c9..40e2d19d138c 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -16,13 +16,15 @@ type ModuleComponent struct { opts component.Options mod component.Module - mut sync.RWMutex - health component.Health - latestContent string - latestArgs map[string]any + mut sync.RWMutex + health component.Health + latestContent string + latestArgs map[string]any + latestParentModuleDefinitions map[string]string } // Exports holds values which are exported from the run module. +// This export type is deprecated. type Exports struct { // Exports exported from the running module. Exports map[string]any `river:"exports,block"` @@ -30,6 +32,18 @@ type Exports struct { // NewModuleComponent initializes a new ModuleComponent. func NewModuleComponent(o component.Options) (*ModuleComponent, error) { + c := &ModuleComponent{ + opts: o, + } + var err error + c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) { + c.opts.OnStateChange(exports) + }) + return c, err +} + +// TODO: Remove when getting rid of old modules +func NewModuleComponentDeprecated(o component.Options) (*ModuleComponent, error) { c := &ModuleComponent{ opts: o, } @@ -43,12 +57,12 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) { // LoadFlowSource loads the flow controller with the current component source. // It will set the component health in addition to return the error so that the consumer can rely on either or both. // If the content is the same as the last time it was successfully loaded, it will not be reloaded. -func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error { - if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { +func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string, parentModuleDefinitions map[string]string) error { + if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() && reflect.DeepEqual(args, c.getLatestParentModuleDefinitions()) { return nil } - err := c.mod.LoadConfig([]byte(contentValue), args) + err := c.mod.LoadConfig([]byte(contentValue), args, parentModuleDefinitions) if err != nil { c.setHealth(component.Health{ Health: component.HealthTypeUnhealthy, @@ -61,6 +75,7 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin c.setLatestArgs(args) c.setLatestContent(contentValue) + c.setLatestParentModuleDefinitions(parentModuleDefinitions) c.setHealth(component.Health{ Health: component.HealthTypeHealthy, Message: "module content loaded", @@ -104,6 +119,18 @@ func (c *ModuleComponent) getLatestContent() string { return c.latestContent } +func (c *ModuleComponent) setLatestParentModuleDefinitions(parentModuleDefinitions map[string]string) { + c.mut.Lock() + defer c.mut.Unlock() + c.latestParentModuleDefinitions = parentModuleDefinitions +} + +func (c *ModuleComponent) getLatestParentModuleDefinitions() map[string]string { + c.mut.RLock() + defer c.mut.RUnlock() + return c.latestParentModuleDefinitions +} + func (c *ModuleComponent) setLatestArgs(args map[string]any) { c.mut.Lock() defer c.mut.Unlock() diff --git a/component/module/string/string.go b/component/module/string/string.go index bd3e6193f441..65e2cdf6190d 100644 --- a/component/module/string/string.go +++ b/component/module/string/string.go @@ -42,7 +42,7 @@ var ( // New creates a new module.string component. func New(o component.Options, args Arguments) (*Component, error) { - m, err := module.NewModuleComponent(o) + m, err := module.NewModuleComponentDeprecated(o) if err != nil { return nil, err } @@ -66,7 +66,7 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) - return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value) + return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value, nil) } // CurrentHealth implements component.HealthComponent. diff --git a/component/registry.go b/component/registry.go index 11cc593b0ddd..81ead2f9e0d7 100644 --- a/component/registry.go +++ b/component/registry.go @@ -44,7 +44,7 @@ type Module interface { // LoadConfig parses River config and loads it into the Module. // LoadConfig can be called multiple times, and called prior to // [Module.Run]. - LoadConfig(config []byte, args map[string]any) error + LoadConfig(config []byte, args map[string]any, moduleDefinitions map[string]string) error // Run starts the Module. No components within the Module // will be run until Run is called. diff --git a/converter/internal/test_common/testing.go b/converter/internal/test_common/testing.go index 03855fc2ca31..3d6a04c03880 100644 --- a/converter/internal/test_common/testing.go +++ b/converter/internal/test_common/testing.go @@ -198,7 +198,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) { labelstore.New(nil, prometheus.DefaultRegisterer), }, }) - err = f.LoadSource(cfg, nil) + err = f.LoadSource(cfg, nil, nil) // Many components will fail to build as e.g. the cert files are missing, so we ignore these errors. // This is not ideal, but we still validate for other potential issues. diff --git a/integration-tests/configs/http-module/module.river b/integration-tests/configs/http-module/module.river new file mode 100644 index 000000000000..808b79a86b4b --- /dev/null +++ b/integration-tests/configs/http-module/module.river @@ -0,0 +1,21 @@ +declare "myModule" { + argument "scrape_endpoint" {} + + argument "forward_to" {} + + argument "scrape_interval" { + optional = true + default = "1s" + } + + prometheus.scrape "scrape_prom_metrics_module_file" { + targets = [ + {"__address__" = argument.scrape_endpoint.value}, + ] + forward_to = argument.forward_to.value + scrape_classic_histograms = true + enable_protobuf_negotiation = true + scrape_interval = argument.scrape_interval.value + scrape_timeout = "500ms" + } +} \ No newline at end of file diff --git a/integration-tests/docker-compose.yaml b/integration-tests/docker-compose.yaml index a94a05db21d9..d5503de64b62 100644 --- a/integration-tests/docker-compose.yaml +++ b/integration-tests/docker-compose.yaml @@ -29,4 +29,11 @@ services: dockerfile: ./integration-tests/configs/prom-gen/Dockerfile context: .. ports: - - "9001:9001" \ No newline at end of file + - "9001:9001" + + http-module: + image: nginx:alpine + ports: + - "8090:80" + volumes: + - ./configs/http-module/module.river:/usr/share/nginx/html/module.river:ro \ No newline at end of file diff --git a/integration-tests/tests/module-file/config.river b/integration-tests/tests/module-file/config.river new file mode 100644 index 000000000000..a8e776d5cd24 --- /dev/null +++ b/integration-tests/tests/module-file/config.river @@ -0,0 +1,38 @@ +import.file "import_loki" { + filename = "loki.river" +} + +import_loki.loki "loki" {} + +import.file "import_prom_scrape" { + filename = "prom-scrape.river" +} + +declare "target" { + export "output" { + value = "localhost:9001" + } +} + +target "promTarget" {} + +import_prom_scrape.scrape "promScraper" { + scrape_endpoint = target.promTarget.output + forward_to = [prometheus.remote_write.module_file.receiver] +} + +prometheus.remote_write "module_file" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "module_file", + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/logfile.river b/integration-tests/tests/module-file/logfile.river new file mode 100644 index 000000000000..a9b2b9762b3a --- /dev/null +++ b/integration-tests/tests/module-file/logfile.river @@ -0,0 +1,5 @@ +declare "getLogFile" { + export "output" { + value = [{__path__ = "logs.txt"}] + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/logs.txt b/integration-tests/tests/module-file/logs.txt new file mode 100644 index 000000000000..ed6c24c81170 --- /dev/null +++ b/integration-tests/tests/module-file/logs.txt @@ -0,0 +1,13 @@ +[2023-10-02 14:25:43] INFO: Starting the web application... +[2023-10-02 14:25:45] DEBUG: Database connection established. +[2023-10-02 14:26:01] INFO: User 'john_doe' logged in. +[2023-10-02 14:26:05] WARNING: User 'john_doe' attempted to access restricted area. +[2023-10-02 14:26:10] ERROR: Failed to retrieve data for item ID: 1234. +[2023-10-02 14:26:15] INFO: User 'john_doe' logged out. +[2023-10-02 14:27:00] INFO: User 'admin' logged in. +[2023-10-02 14:27:05] INFO: Data backup started. +[2023-10-02 14:30:00] INFO: Data backup completed successfully. +[2023-10-02 14:31:23] ERROR: Database connection lost. Retrying in 5 seconds... +[2023-10-02 14:31:28] INFO: Database reconnected. +[2023-10-02 14:32:00] INFO: User 'admin' logged out. +[2023-10-02 14:32:05] INFO: Shutting down the web application... diff --git a/integration-tests/tests/module-file/loki.river b/integration-tests/tests/module-file/loki.river new file mode 100644 index 000000000000..24f5af09e874 --- /dev/null +++ b/integration-tests/tests/module-file/loki.river @@ -0,0 +1,17 @@ +import.file "logTarget" { + filename = "logfile.river" +} + +import.file "write" { + filename = "loki_write.river" +} + +declare "loki" { + logTarget.getLogFile "logFile" {} + loki.source.file "test" { + targets = logTarget.getLogFile.logFile.output + forward_to = [write.loki_write.default.receiver] + } + write.loki_write "default" {} +} + diff --git a/integration-tests/tests/module-file/loki_write.river b/integration-tests/tests/module-file/loki_write.river new file mode 100644 index 000000000000..03c9ed44b66b --- /dev/null +++ b/integration-tests/tests/module-file/loki_write.river @@ -0,0 +1,13 @@ +declare "loki_write" { + loki.write "test" { + endpoint { + url = "http://localhost:3100/loki/api/v1/push" + } + external_labels = { + test_name = "module_file", + } + } + export "receiver" { + value = loki.write.test.receiver + } +} \ No newline at end of file diff --git a/integration-tests/tests/module-file/module_file_test.go b/integration-tests/tests/module-file/module_file_test.go new file mode 100644 index 000000000000..bb7db88feb44 --- /dev/null +++ b/integration-tests/tests/module-file/module_file_test.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "strconv" + "testing" + + "github.com/grafana/agent/integration-tests/common" + "github.com/stretchr/testify/assert" +) + +const promURL = "http://localhost:9009/prometheus/api/v1/query?query=" +const lokiUrl = "http://localhost:3100/loki/api/v1/query?query={test_name=%22module_file%22}" + +func metricQuery(metricName string) string { + return fmt.Sprintf("%s%s{test_name='module_file'}", promURL, metricName) +} + +func TestScrapePromMetricsModuleFile(t *testing.T) { + metrics := []string{ + // TODO: better differentiate these metric types? + "golang_counter", + "golang_gauge", + "golang_histogram_bucket", + "golang_summary", + "golang_native_histogram", + } + + for _, metric := range metrics { + metric := metric + t.Run(metric, func(t *testing.T) { + t.Parallel() + if metric == "golang_native_histogram" { + assertHistogramData(t, metricQuery(metric), metric) + } else { + assertMetricData(t, metricQuery(metric), metric) + } + }) + } +} + +func assertHistogramData(t *testing.T, query string, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "module_file") + if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) { + histogram := metricResponse.Data.Result[0].Histogram + if assert.NotEmpty(c, histogram.Data.Count) { + count, _ := strconv.Atoi(histogram.Data.Count) + assert.Greater(c, count, 10, "Count should be at some point greater than 10.") + } + if assert.NotEmpty(c, histogram.Data.Sum) { + sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64) + assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.") + } + assert.NotEmpty(c, histogram.Data.Buckets) + assert.Nil(c, metricResponse.Data.Result[0].Value) + } + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit") +} + +func TestReadLogFile(t *testing.T) { + var logResponse common.LogResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(lokiUrl, &logResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, logResponse.Data.Result) { + assert.Equal(c, logResponse.Data.Result[0].Stream["filename"], "logs.txt") + logs := make([]string, len(logResponse.Data.Result[0].Values)) + for i, valuePair := range logResponse.Data.Result[0].Values { + logs[i] = valuePair[1] + } + assert.Contains(c, logs, "[2023-10-02 14:25:43] INFO: Starting the web application...") + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} + +func assertMetricData(t *testing.T, query, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "module_file") + assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value) + assert.Nil(c, metricResponse.Data.Result[0].Histogram) + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} diff --git a/integration-tests/tests/module-file/prom-scrape.river b/integration-tests/tests/module-file/prom-scrape.river new file mode 100644 index 000000000000..f6e0b31aadcd --- /dev/null +++ b/integration-tests/tests/module-file/prom-scrape.river @@ -0,0 +1,21 @@ +declare "scrape" { + argument "scrape_endpoint" {} + + argument "forward_to" {} + + argument "scrape_interval" { + optional = true + default = "1s" + } + + prometheus.scrape "module_file" { + targets = [ + {"__address__" = argument.scrape_endpoint.value}, + ] + forward_to = argument.forward_to.value + scrape_classic_histograms = true + enable_protobuf_negotiation = true + scrape_interval = argument.scrape_interval.value + scrape_timeout = "500ms" + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-git/config.river b/integration-tests/tests/scrape-prom-metrics-module-git/config.river new file mode 100644 index 000000000000..68dffaef6ffd --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-git/config.river @@ -0,0 +1,27 @@ +import.git "scrape_module" { + // TODO: change this to use either a docker container hosting a git repo just for the integration tests (not easy) + // or put the module.river file in the agent-modules repo (easy) + repository = "https://github.com/wildum/module.git" + path = "module.river" +} + +scrape_module.myModule "scrape_prom_metrics_module_git" { + scrape_endpoint = "localhost:9001" + forward_to = [prometheus.remote_write.scrape_prom_metrics_module_git.receiver] +} + +prometheus.remote_write "scrape_prom_metrics_module_git" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "scrape_prom_metrics_module_git", + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go b/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go new file mode 100644 index 000000000000..1c095f2dd6bf --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-git/scrape_prom_metrics_module_git_test.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "strconv" + "testing" + + "github.com/grafana/agent/integration-tests/common" + "github.com/stretchr/testify/assert" +) + +const promURL = "http://localhost:9009/prometheus/api/v1/query?query=" + +func metricQuery(metricName string) string { + return fmt.Sprintf("%s%s{test_name='scrape_prom_metrics_module_git'}", promURL, metricName) +} + +func TestScrapePromMetricsModuleGit(t *testing.T) { + metrics := []string{ + // TODO: better differentiate these metric types? + "golang_counter", + "golang_gauge", + "golang_histogram_bucket", + "golang_summary", + "golang_native_histogram", + } + + for _, metric := range metrics { + metric := metric + t.Run(metric, func(t *testing.T) { + t.Parallel() + if metric == "golang_native_histogram" { + assertHistogramData(t, metricQuery(metric), metric) + } else { + assertMetricData(t, metricQuery(metric), metric) + } + }) + } +} + +func assertHistogramData(t *testing.T, query string, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_module_git") + if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) { + histogram := metricResponse.Data.Result[0].Histogram + if assert.NotEmpty(c, histogram.Data.Count) { + count, _ := strconv.Atoi(histogram.Data.Count) + assert.Greater(c, count, 10, "Count should be at some point greater than 10.") + } + if assert.NotEmpty(c, histogram.Data.Sum) { + sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64) + assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.") + } + assert.NotEmpty(c, histogram.Data.Buckets) + assert.Nil(c, metricResponse.Data.Result[0].Value) + } + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit") +} + +func assertMetricData(t *testing.T, query, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_module_git") + assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value) + assert.Nil(c, metricResponse.Data.Result[0].Histogram) + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-http/config.river b/integration-tests/tests/scrape-prom-metrics-module-http/config.river new file mode 100644 index 000000000000..dcb93d4d86a3 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-http/config.river @@ -0,0 +1,24 @@ +import.http "scrape_module" { + url = coalesce(env("MODULE_HTTP_ENDPOINT"), "http://localhost:8090/module.river") +} + +scrape_module.myModule "scrape_prom_metrics_module_http" { + scrape_endpoint = "localhost:9001" + forward_to = [prometheus.remote_write.scrape_prom_metrics_module_http.receiver] +} + +prometheus.remote_write "scrape_prom_metrics_module_http" { + endpoint { + url = "http://localhost:9009/api/v1/push" + send_native_histograms = true + metadata_config { + send_interval = "1s" + } + queue_config { + max_samples_per_send = 100 + } + } + external_labels = { + test_name = "scrape_prom_metrics_module_http", + } +} diff --git a/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go b/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go new file mode 100644 index 000000000000..b847de78d444 --- /dev/null +++ b/integration-tests/tests/scrape-prom-metrics-module-http/scrape_prom_metrics_module_http_test.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "strconv" + "testing" + + "github.com/grafana/agent/integration-tests/common" + "github.com/stretchr/testify/assert" +) + +const promURL = "http://localhost:9009/prometheus/api/v1/query?query=" + +func metricQuery(metricName string) string { + return fmt.Sprintf("%s%s{test_name='scrape_prom_metrics_module_http'}", promURL, metricName) +} + +func TestScrapePromMetricsModuleHTTP(t *testing.T) { + metrics := []string{ + // TODO: better differentiate these metric types? + "golang_counter", + "golang_gauge", + "golang_histogram_bucket", + "golang_summary", + "golang_native_histogram", + } + + for _, metric := range metrics { + metric := metric + t.Run(metric, func(t *testing.T) { + t.Parallel() + if metric == "golang_native_histogram" { + assertHistogramData(t, metricQuery(metric), metric) + } else { + assertMetricData(t, metricQuery(metric), metric) + } + }) + } +} + +func assertHistogramData(t *testing.T, query string, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_module_http") + if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) { + histogram := metricResponse.Data.Result[0].Histogram + if assert.NotEmpty(c, histogram.Data.Count) { + count, _ := strconv.Atoi(histogram.Data.Count) + assert.Greater(c, count, 10, "Count should be at some point greater than 10.") + } + if assert.NotEmpty(c, histogram.Data.Sum) { + sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64) + assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.") + } + assert.NotEmpty(c, histogram.Data.Buckets) + assert.Nil(c, metricResponse.Data.Result[0].Value) + } + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit") +} + +func assertMetricData(t *testing.T, query, expectedMetric string) { + var metricResponse common.MetricResponse + assert.EventuallyWithT(t, func(c *assert.CollectT) { + err := common.FetchDataFromURL(query, &metricResponse) + assert.NoError(c, err) + if assert.NotEmpty(c, metricResponse.Data.Result) { + assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric) + assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_module_http") + assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value) + assert.Nil(c, metricResponse.Data.Result[0].Histogram) + } + }, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit") +} diff --git a/pkg/flow/componenttest/testfailmodule.go b/pkg/flow/componenttest/testfailmodule.go index 011659f95564..3481965ef9f5 100644 --- a/pkg/flow/componenttest/testfailmodule.go +++ b/pkg/flow/componenttest/testfailmodule.go @@ -15,14 +15,14 @@ func init() { Exports: mod.Exports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - m, err := mod.NewModuleComponent(opts) + m, err := mod.NewModuleComponentDeprecated(opts) if err != nil { return nil, err } if args.(TestFailArguments).Fail { return nil, fmt.Errorf("module told to fail") } - err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) + err = m.LoadFlowSource(nil, args.(TestFailArguments).Content, nil) if err != nil { return nil, err } @@ -58,7 +58,7 @@ func (t *TestFailModule) Run(ctx context.Context) error { func (t *TestFailModule) UpdateContent(content string) error { t.content = content - err := t.mc.LoadFlowSource(nil, t.content) + err := t.mc.LoadFlowSource(nil, t.content, nil) return err } diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 76d62bdb9cb2..8d0d776fcd6f 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -185,7 +185,7 @@ func newController(o controllerOptions) *Flow { Logger: log, TraceProvider: tracer, DataPath: o.DataPath, - OnComponentUpdate: func(cn *controller.ComponentNode) { + OnComponentUpdate: func(cn controller.NodeWithDependants) { // Changed components should be queued for reevaluation. f.updateQueue.Enqueue(cn) }, @@ -245,15 +245,25 @@ func (f *Flow) Run(ctx context.Context) { level.Info(f.log).Log("msg", "scheduling loaded components and services") var ( - components = f.loader.Components() - services = f.loader.Services() + components = f.loader.Components() + services = f.loader.Services() + imports = f.loader.Imports() + declareComponents = f.loader.DeclareComponent() - runnables = make([]controller.RunnableNode, 0, len(components)+len(services)) + runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports)+len(declareComponents)) ) for _, c := range components { runnables = append(runnables, c) } + for _, i := range imports { + runnables = append(runnables, i) + } + + for _, d := range declareComponents { + runnables = append(runnables, d) + } + // Only the root controller should run services, since modules share the // same service instance as the root. if !f.opts.IsModule { @@ -276,11 +286,11 @@ func (f *Flow) Run(ctx context.Context) { // // The controller will only start running components after Load is called once // without any configuration errors. -func (f *Flow) LoadSource(source *Source, args map[string]any) error { +func (f *Flow) LoadSource(source *Source, args map[string]any, parentModuleDefinitions map[string]string) error { f.loadMut.Lock() defer f.loadMut.Unlock() - diags := f.loader.Apply(args, source.components, source.configBlocks) + diags := f.loader.Apply(args, source.components, source.configBlocks, source.declares, parentModuleDefinitions) if !f.loadedOnce.Load() && diags.HasErrors() { // The first call to Load should not run any components if there were // errors in the configuration file. diff --git a/pkg/flow/flow_components.go b/pkg/flow/flow_components.go index 0899971339b7..e4f4c7734b0a 100644 --- a/pkg/flow/flow_components.go +++ b/pkg/flow/flow_components.go @@ -29,9 +29,9 @@ func (f *Flow) GetComponent(id component.ID, opts component.InfoOptions) (*compo return nil, component.ErrComponentNotFound } - cn, ok := node.(*controller.ComponentNode) + cn, ok := node.(controller.ComponentNode) if !ok { - return nil, fmt.Errorf("%q is not a component", id) + return nil, fmt.Errorf("%q is not a ComponentNode", id) } return f.getComponentDetail(cn, graph, opts), nil @@ -52,18 +52,30 @@ func (f *Flow) ListComponents(moduleID string, opts component.InfoOptions) ([]*c } var ( - components = f.loader.Components() - graph = f.loader.OriginalGraph() + components = f.loader.Components() + imports = f.loader.Imports() + declareComponents = f.loader.DeclareComponent() + graph = f.loader.OriginalGraph() ) - detail := make([]*component.Info, len(components)) - for i, component := range components { - detail[i] = f.getComponentDetail(component, graph, opts) + detail := make([]*component.Info, len(components)+len(imports)+len(declareComponents)) + idx := 0 + for _, component := range components { + detail[idx] = f.getComponentDetail(component, graph, opts) + idx++ + } + for _, importNode := range imports { + detail[idx] = f.getComponentDetail(importNode, graph, opts) + idx++ + } + for _, declareComponent := range declareComponents { + detail[idx] = f.getComponentDetail(declareComponent, graph, opts) + idx++ } return detail, nil } -func (f *Flow) getComponentDetail(cn *controller.ComponentNode, graph *dag.Graph, opts component.InfoOptions) *component.Info { +func (f *Flow) getComponentDetail(cn controller.ComponentNode, graph *dag.Graph, opts component.InfoOptions) *component.Info { var references, referencedBy []string // Skip over any edge which isn't between two component nodes. This is a @@ -75,12 +87,12 @@ func (f *Flow) getComponentDetail(cn *controller.ComponentNode, graph *dag.Graph // // TODO(rfratto): add support for config block nodes in the API and UI. for _, dep := range graph.Dependencies(cn) { - if _, ok := dep.(*controller.ComponentNode); ok { + if _, ok := dep.(controller.ComponentNode); ok { references = append(references, dep.NodeID()) } } for _, dep := range graph.Dependants(cn) { - if _, ok := dep.(*controller.ComponentNode); ok { + if _, ok := dep.(controller.ComponentNode); ok { referencedBy = append(referencedBy, dep.NodeID()) } } @@ -119,8 +131,8 @@ func (f *Flow) getComponentDetail(cn *controller.ComponentNode, graph *dag.Graph References: references, ReferencedBy: referencedBy, - Registration: cn.Registration(), - Health: health, + BlockName: cn.BlockName(), + Health: health, Arguments: arguments, Exports: exports, diff --git a/pkg/flow/flow_services_test.go b/pkg/flow/flow_services_test.go index a4bf2b4cb848..077a5c2cd4e7 100644 --- a/pkg/flow/flow_services_test.go +++ b/pkg/flow/flow_services_test.go @@ -37,7 +37,7 @@ func TestServices(t *testing.T) { opts.Services = append(opts.Services, svc) ctrl := New(opts) - require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil)) + require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil, nil)) // Start the controller. This should cause our service to run. go ctrl.Run(ctx) @@ -88,7 +88,7 @@ func TestServices_Configurable(t *testing.T) { ctrl := New(opts) - require.NoError(t, ctrl.LoadSource(f, nil)) + require.NoError(t, ctrl.LoadSource(f, nil, nil)) // Start the controller. This should cause our service to run. go ctrl.Run(ctx) @@ -134,7 +134,7 @@ func TestServices_Configurable_Optional(t *testing.T) { ctrl := New(opts) - require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil)) + require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil, nil)) // Start the controller. This should cause our service to run. go ctrl.Run(ctx) @@ -168,7 +168,7 @@ func TestFlow_GetServiceConsumers(t *testing.T) { ctrl := New(opts) defer cleanUpController(ctrl) - require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil)) + require.NoError(t, ctrl.LoadSource(makeEmptyFile(t), nil, nil)) expectConsumers := []service.Consumer{{ Type: service.ConsumerTypeService, @@ -246,7 +246,7 @@ func TestComponents_Using_Services(t *testing.T) { ComponentRegistry: registry, ModuleRegistry: newModuleRegistry(), }) - require.NoError(t, ctrl.LoadSource(f, nil)) + require.NoError(t, ctrl.LoadSource(f, nil, nil)) go ctrl.Run(ctx) require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built") @@ -276,7 +276,7 @@ func TestComponents_Using_Services_In_Modules(t *testing.T) { mod, err := opts.ModuleController.NewModule("", nil) require.NoError(t, err, "Failed to create module") - err = mod.LoadConfig([]byte(`service_consumer "example" {}`), nil) + err = mod.LoadConfig([]byte(`service_consumer "example" {}`), nil, nil) require.NoError(t, err, "Failed to load module config") return &testcomponents.Fake{ @@ -321,7 +321,7 @@ func TestComponents_Using_Services_In_Modules(t *testing.T) { ComponentRegistry: registry, ModuleRegistry: newModuleRegistry(), }) - require.NoError(t, ctrl.LoadSource(f, nil)) + require.NoError(t, ctrl.LoadSource(f, nil, nil)) go ctrl.Run(ctx) require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built") diff --git a/pkg/flow/flow_test.go b/pkg/flow/flow_test.go index 590f97a424f1..17a2b60d41d9 100644 --- a/pkg/flow/flow_test.go +++ b/pkg/flow/flow_test.go @@ -42,7 +42,7 @@ func TestController_LoadSource_Evaluation(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) require.Len(t, ctrl.loader.Components(), 4) @@ -59,7 +59,7 @@ func getFields(t *testing.T, g *dag.Graph, nodeID string) (component.Arguments, n := g.GetByID(nodeID) require.NotNil(t, n, "couldn't find node %q in graph", nodeID) - uc := n.(*controller.ComponentNode) + uc := n.(*controller.NativeComponentNode) return uc.Arguments(), uc.Exports() } diff --git a/pkg/flow/flow_updates_test.go b/pkg/flow/flow_updates_test.go index c2349928f06a..cf77237b7c76 100644 --- a/pkg/flow/flow_updates_test.go +++ b/pkg/flow/flow_updates_test.go @@ -42,7 +42,7 @@ func TestController_Updates(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -122,7 +122,7 @@ func TestController_Updates_WithQueueFull(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -195,7 +195,7 @@ func TestController_Updates_WithLag(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -269,7 +269,7 @@ func TestController_Updates_WithOtherLaggingPipeline(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -338,7 +338,7 @@ func TestController_Updates_WithLaggingComponent(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/flow/internal/controller/component_node.go b/pkg/flow/internal/controller/component_node.go new file mode 100644 index 000000000000..269e313b99fa --- /dev/null +++ b/pkg/flow/internal/controller/component_node.go @@ -0,0 +1,34 @@ +package controller + +import ( + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/internal/dag" +) + +type ComponentNode interface { + dag.Node + + // CurrentHealth returns the current health of the node. + CurrentHealth() component.Health + + // DebugInfo returns debugging information from the managed component (if any). + DebugInfo() interface{} + + // Arguments returns the current arguments of the managed component. + Arguments() component.Arguments + + // Exports returns the current set of exports from the managed component. + Exports() component.Exports + + // Component returns the instance of the managed component. + Component() component.Component + + // ModuleIDs returns the current list of modules that this component is managing. + ModuleIDs() []string + + // Label returns the label for the block or "" if none was specified. + Label() string + + // BlockName returns the name of the block. + BlockName() string +} diff --git a/pkg/flow/internal/controller/declare.go b/pkg/flow/internal/controller/declare.go new file mode 100644 index 000000000000..e97554396c32 --- /dev/null +++ b/pkg/flow/internal/controller/declare.go @@ -0,0 +1,9 @@ +package controller + +import "github.com/grafana/river/ast" + +// Should this be defined somewhere else? +type Declare struct { + Block *ast.BlockStmt + Content string +} diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 10a6f37965ab..2c4ddb098b34 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -37,11 +38,17 @@ type Loader struct { // also prevents log spamming with errors. backoffConfig backoff.Config - mut sync.RWMutex - graph *dag.Graph - originalGraph *dag.Graph - componentNodes []*ComponentNode - serviceNodes []*ServiceNode + mut sync.RWMutex + graph *dag.Graph + originalGraph *dag.Graph + componentNodes []*NativeComponentNode + serviceNodes []*ServiceNode + declareComponentNodes []*DeclareComponentNode + importNodes map[string]*ImportConfigNode + declareNodes map[string]*DeclareNode + parentModuleDefinitions map[string]string + moduleReferences map[string][]ModuleReference + cache *valueCache blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing cm *controllerMetrics @@ -75,13 +82,16 @@ func NewLoader(opts LoaderOptions) *Loader { } l := &Loader{ - log: log.With(globals.Logger, "controller_id", globals.ControllerID), - tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), - globals: globals, - services: services, - host: host, - componentReg: reg, - workerPool: opts.WorkerPool, + log: log.With(globals.Logger, "controller_id", globals.ControllerID), + tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID), + globals: globals, + services: services, + host: host, + componentReg: reg, + workerPool: opts.WorkerPool, + importNodes: map[string]*ImportConfigNode{}, + declareNodes: map[string]*DeclareNode{}, + moduleReferences: map[string][]ModuleReference{}, // This is a reasonable default which should work for most cases. If a component is completely stuck, we would // retry and log an error every 10 seconds, at most. @@ -117,7 +127,7 @@ func NewLoader(opts LoaderOptions) *Loader { // The provided parentContext can be used to provide global variables and // functions to components. A child context will be constructed from the parent // to expose values of other components. -func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) diag.Diagnostics { +func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt, declares []Declare, parentModuleDefinitions map[string]string) diag.Diagnostics { start := time.Now() l.mut.Lock() defer l.mut.Unlock() @@ -129,15 +139,17 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co } l.cache.SyncModuleArgs(args) - newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks) + l.parentModuleDefinitions = parentModuleDefinitions + newGraph, diags := l.loadNewGraph(args, componentBlocks, configBlocks, declares) if diags.HasErrors() { return diags } var ( - components = make([]*ComponentNode, 0, len(componentBlocks)) - componentIDs = make([]ComponentID, 0, len(componentBlocks)) - services = make([]*ServiceNode, 0, len(l.services)) + components = make([]*NativeComponentNode, 0, len(componentBlocks)) + componentIDs = make([]ComponentID, 0, len(componentBlocks)) + services = make([]*ServiceNode, 0, len(l.services)) + declareComponentNodes = make([]*DeclareComponentNode, 0, len(l.declareComponentNodes)) ) tracer := l.tracer.Tracer("") @@ -168,7 +180,7 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co var err error switch n := n.(type) { - case *ComponentNode: + case *NativeComponentNode: components = append(components, n) componentIDs = append(componentIDs, n.ID()) @@ -185,7 +197,6 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co }) } } - case *ServiceNode: services = append(services, n) @@ -202,7 +213,22 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co }) } } - + case *DeclareComponentNode: + declareComponentNodes = append(declareComponentNodes, n) + componentIDs = append(componentIDs, n.ID()) + if err = l.evaluate(logger, n); err != nil { + var evalDiags diag.Diagnostics + if errors.As(err, &evalDiags) { + diags = append(diags, evalDiags...) + } else { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Failed to build declared component: %s", err), + StartPos: ast.StartPos(n.Block()).Position(), + EndPos: ast.EndPos(n.Block()).Position(), + }) + } + } case BlockNode: if err = l.evaluate(logger, n); err != nil { diags.Add(diag.Diagnostic{ @@ -227,6 +253,7 @@ func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, co return nil }) + l.declareComponentNodes = declareComponentNodes l.componentNodes = components l.serviceNodes = services l.graph = &newGraph @@ -252,9 +279,11 @@ func (l *Loader) Cleanup(stopWorkerPool bool) { } // loadNewGraph creates a new graph from the provided blocks and validates it. -func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) (dag.Graph, diag.Diagnostics) { +func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt, declares []Declare) (dag.Graph, diag.Diagnostics) { var g dag.Graph + l.moduleReferences = make(map[string][]ModuleReference) + // Split component blocks into blocks for components and services. componentBlocks, serviceBlocks := l.splitComponentBlocks(componentBlocks) @@ -266,6 +295,10 @@ func (l *Loader) loadNewGraph(args map[string]any, componentBlocks []*ast.BlockS configBlockDiags := l.populateConfigBlockNodes(args, &g, configBlocks) diags = append(diags, configBlockDiags...) + // Fill our graph with declare nodes + declareDiags := l.populateDeclareNodes(&g, declares) + diags = append(diags, declareDiags...) + // Fill our graph with components. componentNodeDiags := l.populateComponentNodes(&g, componentBlocks) diags = append(diags, componentNodeDiags...) @@ -310,6 +343,24 @@ func (l *Loader) splitComponentBlocks(blocks []*ast.BlockStmt) (componentBlocks, return componentBlocks, serviceBlocks } +func (l *Loader) populateDeclareNodes(g *dag.Graph, declares []Declare) diag.Diagnostics { + var diags diag.Diagnostics + l.declareNodes = map[string]*DeclareNode{} + for _, declare := range declares { + node := NewDeclareNode(declare.Block, declare.Content) + if g.GetByID(node.NodeID()) != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("cannot add declare node %q; node with same ID already exists", node.NodeID()), + }) + continue + } + g.Add(node) + l.declareNodes[node.label] = node + } + return diags +} + // populateServiceNodes adds service nodes to the graph. func (l *Loader) populateServiceNodes(g *dag.Graph, serviceBlocks []*ast.BlockStmt) diag.Diagnostics { var diags diag.Diagnostics @@ -413,6 +464,8 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con g.Add(c) } + l.importNodes = nodeMap.importMap + return diags } @@ -423,7 +476,6 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo blockMap = make(map[string]*ast.BlockStmt, len(componentBlocks)) ) for _, block := range componentBlocks { - var c *ComponentNode id := BlockComponentID(block).String() if orig, redefined := blockMap[id]; redefined { @@ -438,23 +490,18 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo blockMap[id] = block // Check the graph from the previous call to Load to see we can copy an - // existing instance of ComponentNode. + // existing instance of NativeComponentNode and DeclareComponentNode. if exist := l.graph.GetByID(id); exist != nil { - c = exist.(*ComponentNode) - c.UpdateBlock(block) + switch v := exist.(type) { + case *NativeComponentNode: + v.UpdateBlock(block) + g.Add(v) + case *DeclareComponentNode: + v.UpdateBlock(block) + g.Add(v) + } } else { componentName := block.GetBlockName() - registration, exists := l.componentReg.Get(componentName) - if !exists { - diags.Add(diag.Diagnostic{ - Severity: diag.SeverityLevelError, - Message: fmt.Sprintf("Unrecognized component name %q", componentName), - StartPos: block.NamePos.Position(), - EndPos: block.NamePos.Add(len(componentName) - 1).Position(), - }) - continue - } - if block.Label == "" { diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, @@ -464,17 +511,57 @@ func (l *Loader) populateComponentNodes(g *dag.Graph, componentBlocks []*ast.Blo }) continue } - - // Create a new component - c = NewComponentNode(l.globals, registration, block) + firstPart := strings.Split(componentName, ".")[0] + if l.shouldAddDeclareComponentNode(firstPart, componentName) { + g.Add(NewDeclareComponentNode(l.globals, block, l.getModuleInfo)) + } else { + registration, exists := l.componentReg.Get(componentName) + if !exists { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Unrecognized component name %q", componentName), + StartPos: block.NamePos.Position(), + EndPos: block.NamePos.Add(len(componentName) - 1).Position(), + }) + continue + } + g.Add(NewComponentNode(l.globals, registration, block)) + } } - - g.Add(c) } return diags } +func (l *Loader) shouldAddDeclareComponentNode(firstPart, componentName string) bool { + _, declareExists := l.declareNodes[firstPart] + _, importExists := l.importNodes[firstPart] + _, moduleDepExists := l.parentModuleDefinitions[componentName] + + return declareExists || importExists || moduleDepExists +} + +func (l *Loader) wireModuleReferences(g *dag.Graph, dc *DeclareComponentNode, declareNode *DeclareNode) error { + var references []ModuleReference + if deps, ok := l.moduleReferences[declareNode.label]; ok { + references = deps + } else { + var err error + references, err = GetModuleReferences(declareNode.content, l.importNodes, l.declareNodes, l.parentModuleDefinitions) + if err != nil { + return err + } + l.moduleReferences[declareNode.label] = references + } + // Add edges between the DeclareComponentNode and all import nodes that it needs. + for _, ref := range references { + if ref.importNode != nil { + g.AddEdge(dag.Edge{From: dc, To: ref.importNode}) + } + } + return nil +} + // Wire up all the related nodes func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { var diags diag.Diagnostics @@ -495,6 +582,20 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { g.AddEdge(dag.Edge{From: n, To: dep}) } + case *DeclareNode: + // A DeclareNode has no edge, it only holds a static content. + continue + case *DeclareComponentNode: + err := l.wireDeclareComponentNode(g, n) + if err != nil { + diags.Add(diag.Diagnostic{ + Severity: diag.SeverityLevelError, + Message: fmt.Sprintf("Error while parsing the declare component %s: %v", n.label, err), + StartPos: n.block.NamePos.Position(), + EndPos: n.block.NamePos.Add(len(n.componentName) - 1).Position(), + }) + continue + } } // Finally, wire component references. @@ -508,6 +609,18 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics { return diags } +func (l *Loader) wireDeclareComponentNode(g *dag.Graph, dc *DeclareComponentNode) error { + if declareNode, exists := l.declareNodes[dc.declareLabel]; exists { + err := l.wireModuleReferences(g, dc, declareNode) + if err != nil { + return err + } + } else if importNode, exists := l.importNodes[dc.importLabel]; exists { + g.AddEdge(dag.Edge{From: dc, To: importNode}) + } + return nil +} + // Variables returns the Variables the Loader exposes for other Flow components // to reference. func (l *Loader) Variables() map[string]interface{} { @@ -515,12 +628,18 @@ func (l *Loader) Variables() map[string]interface{} { } // Components returns the current set of loaded components. -func (l *Loader) Components() []*ComponentNode { +func (l *Loader) Components() []*NativeComponentNode { l.mut.RLock() defer l.mut.RUnlock() return l.componentNodes } +func (l *Loader) DeclareComponent() []*DeclareComponentNode { + l.mut.RLock() + defer l.mut.RUnlock() + return l.declareComponentNodes +} + // Services returns the current set of service nodes. func (l *Loader) Services() []*ServiceNode { l.mut.RLock() @@ -528,6 +647,12 @@ func (l *Loader) Services() []*ServiceNode { return l.serviceNodes } +func (l *Loader) Imports() map[string]*ImportConfigNode { + l.mut.RLock() + defer l.mut.RUnlock() + return l.importNodes +} + // Graph returns a copy of the DAG managed by the Loader. func (l *Loader) Graph() *dag.Graph { l.mut.RLock() @@ -549,7 +674,7 @@ func (l *Loader) OriginalGraph() *dag.Graph { // the worker pool starts to evaluate them, resulting in smaller number of total evaluations when // node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until // it succeeds or until the ctx is cancelled. -func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode) { +func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []NodeWithDependants) { if len(updatedNodes) == 0 { return } @@ -565,7 +690,7 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone l.mut.RLock() defer l.mut.RUnlock() - dependenciesToParentsMap := make(map[dag.Node]*ComponentNode) + dependenciesToParentsMap := make(map[dag.Node]NodeWithDependants) for _, parent := range updatedNodes { // Make sure we're in-sync with the current exports of parent. l.cache.CacheExports(parent.ID(), parent.Exports()) @@ -624,9 +749,9 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*Compone // concurrentEvalFn returns a function that evaluates a node and updates the cache. This function can be submitted to // a worker pool for asynchronous evaluation. -func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent *ComponentNode) { +func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer trace.Tracer, parent NodeWithDependants) { start := time.Now() - l.cm.dependenciesWaitTime.Observe(time.Since(parent.lastUpdateTime.Load()).Seconds()) + l.cm.dependenciesWaitTime.Observe(time.Since(parent.LastUpdateTime()).Seconds()) _, span := tracer.Start(spanCtx, "EvaluateNode", trace.WithSpanKind(trace.SpanKindInternal)) span.SetAttributes(attribute.String("node_id", n.NodeID())) defer span.End() @@ -687,11 +812,14 @@ func (l *Loader) evaluate(logger log.Logger, bn BlockNode) error { // mut must be held when calling postEvaluate. func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error { switch c := bn.(type) { - case *ComponentNode: + case *NativeComponentNode: // Always update the cache both the arguments and exports, since both might // change when a component gets re-evaluated. We also want to cache the arguments and exports in case of an error l.cache.CacheArguments(c.ID(), c.Arguments()) l.cache.CacheExports(c.ID(), c.Exports()) + case *DeclareComponentNode: + l.cache.CacheArguments(c.ID(), c.Arguments()) + l.cache.CacheExports(c.ID(), c.Exports()) case *ArgumentConfigNode: if _, found := l.cache.moduleArguments[c.Label()]; !found { if c.Optional() { @@ -711,6 +839,13 @@ func (l *Loader) postEvaluate(logger log.Logger, bn BlockNode, err error) error return nil } +func (l *Loader) getModuleInfo(componentName string, importLabel string, declareLabel string) (ModuleInfo, error) { + if importLabel == "" { + return getLocalModuleInfo(l.declareNodes, l.moduleReferences, l.parentModuleDefinitions, componentName, declareLabel) + } + return getImportedModuleInfo(l.importNodes, l.parentModuleDefinitions, componentName, declareLabel, importLabel) +} + func multierrToDiags(errors error) diag.Diagnostics { var diags diag.Diagnostics for _, err := range errors.(*multierror.Error).Errors { diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index e93f757b1a2f..9351dda73657 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -73,7 +73,7 @@ func TestLoader(t *testing.T) { Logger: l, TraceProvider: noop.NewTracerProvider(), DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + OnComponentUpdate: func(cn controller.NodeWithDependants) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string) controller.ModuleController { return nil @@ -207,7 +207,7 @@ func TestScopeWithFailingComponent(t *testing.T) { Logger: l, TraceProvider: noop.NewTracerProvider(), DataPath: t.TempDir(), - OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, + OnComponentUpdate: func(cn controller.NodeWithDependants) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string) controller.ModuleController { return fakeModuleController{} @@ -230,6 +230,7 @@ func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, diags diag.Diagnostics componentBlocks []*ast.BlockStmt configBlocks []*ast.BlockStmt = nil + declares []controller.Declare ) componentBlocks, diags = fileToBlock(t, componentBytes) @@ -244,7 +245,7 @@ func applyFromContent(t *testing.T, l *controller.Loader, componentBytes []byte, } } - applyDiags := l.Apply(nil, componentBlocks, configBlocks) + applyDiags := l.Apply(nil, componentBlocks, configBlocks, declares, nil) diags = append(diags, applyDiags...) return diags diff --git a/pkg/flow/internal/controller/module_info.go b/pkg/flow/internal/controller/module_info.go new file mode 100644 index 000000000000..7273a3cce685 --- /dev/null +++ b/pkg/flow/internal/controller/module_info.go @@ -0,0 +1,111 @@ +package controller + +import ( + "fmt" + "strings" +) + +type ModuleInfo struct { + content string + moduleDefinitions map[string]string +} + +func getLocalModuleInfo( + declareNodes map[string]*DeclareNode, + moduleReferences map[string][]ModuleReference, + parentModuleDefinitions map[string]string, + componentName string, + declareLabel string, +) (ModuleInfo, error) { + + var moduleInfo ModuleInfo + var content string + var err error + + if node, exists := declareNodes[declareLabel]; exists { + moduleInfo.moduleDefinitions, err = getLocalModuleDefinitions(componentName, moduleReferences, parentModuleDefinitions) + if err != nil { + return moduleInfo, err + } + + content, err = node.ModuleContent() + if err != nil { + return moduleInfo, err + } + } else if c, ok := parentModuleDefinitions[componentName]; ok { + content = c + moduleInfo.moduleDefinitions = parentModuleDefinitions + } else { + return moduleInfo, fmt.Errorf("could not find a definition for the declared module %s", componentName) + } + moduleInfo.content = content + return moduleInfo, nil +} + +func getLocalModuleDefinitions(componentName string, + localModuleReferences map[string][]ModuleReference, + parentModuleDefinitions map[string]string, +) (map[string]string, error) { + + moduleReferences := make(map[string]string) + for _, moduleDependency := range localModuleReferences[componentName] { + if moduleDependency.importNode != nil { + for importModulePath, importModuleContent := range moduleDependency.importNode.importedDeclares { + moduleReferences[moduleDependency.importNode.label+"."+importModulePath] = importModuleContent + } + } else if moduleDependency.declareNode != nil { + ref, err := moduleDependency.declareNode.ModuleContent() + if err != nil { + return moduleReferences, nil + } + moduleReferences[moduleDependency.declareLabel] = ref + } else { + // Nested declares have access to their parents module definitions. + if c, ok := parentModuleDefinitions[moduleDependency.componentName]; ok { + moduleReferences[moduleDependency.componentName] = c + } else { + return moduleReferences, fmt.Errorf("could not find the required module dependency %s for the module %s", moduleDependency.componentName, componentName) + } + } + } + return moduleReferences, nil +} + +func getImportedModuleInfo( + importNodes map[string]*ImportConfigNode, + parentModuleDefinitions map[string]string, + componentName string, + declareLabel string, + importLabel string, +) (ModuleInfo, error) { + + var moduleInfo ModuleInfo + var content string + var err error + if node, exists := importNodes[importLabel]; exists { + moduleInfo.moduleDefinitions = node.importedDeclares + content, err = node.ModuleContent(declareLabel) + if err != nil { + return moduleInfo, err + } + } else if c, ok := parentModuleDefinitions[componentName]; ok { + content = c + moduleInfo.moduleDefinitions = filterParentModuleDefinitions(importLabel, parentModuleDefinitions) + } else { + return moduleInfo, fmt.Errorf("could not find a definition for the imported module %s", componentName) + } + moduleInfo.content = content + return moduleInfo, nil +} + +// filterParentModuleDefinitions prevents modules from accessing other module definitions which are not in their scope. +func filterParentModuleDefinitions(importLabel string, parentModuleDefinitions map[string]string) map[string]string { + filteredParentModuleDefinitions := make(map[string]string) + for importPath, content := range parentModuleDefinitions { + // The scope is defined by the importLabel prefix in the importPath of the modules. + if strings.HasPrefix(importPath, importLabel) { + filteredParentModuleDefinitions[strings.TrimPrefix(importPath, importLabel+".")] = content + } + } + return filteredParentModuleDefinitions +} diff --git a/pkg/flow/internal/controller/module_references.go b/pkg/flow/internal/controller/module_references.go new file mode 100644 index 000000000000..2d975be9d169 --- /dev/null +++ b/pkg/flow/internal/controller/module_references.go @@ -0,0 +1,77 @@ +package controller + +import ( + "strings" + + "github.com/grafana/river/ast" + "github.com/grafana/river/parser" +) + +type ModuleReference struct { + componentName string + importLabel string + declareLabel string + importNode *ImportConfigNode + declareNode *DeclareNode +} + +// This function will parse the provided river content and collect references to known modules. +func GetModuleReferences( + content string, + importNodes map[string]*ImportConfigNode, + declareNodes map[string]*DeclareNode, + parentModuleDefinitions map[string]string, +) ([]ModuleReference, error) { + + uniqueReferences := make(map[string]ModuleReference) + err := getModuleReferences(content, importNodes, declareNodes, uniqueReferences, parentModuleDefinitions) + if err != nil { + return nil, err + } + + references := make([]ModuleReference, 0, len(uniqueReferences)) + for _, ref := range uniqueReferences { + references = append(references, ref) + } + + return references, nil +} + +func getModuleReferences( + content string, + importNodes map[string]*ImportConfigNode, + declareNodes map[string]*DeclareNode, + uniqueReferences map[string]ModuleReference, + parentModuleDefinitions map[string]string, +) error { + + node, err := parser.ParseFile("", []byte(content)) + if err != nil { + return err + } + + for _, stmt := range node.Body { + switch stmt := stmt.(type) { + case *ast.BlockStmt: + componentName := strings.Join(stmt.Name, ".") + switch componentName { + case "declare": + declareContent := content[stmt.LCurlyPos.Position().Offset+1 : stmt.RCurlyPos.Position().Offset-1] + err = getModuleReferences(declareContent, importNodes, declareNodes, uniqueReferences, parentModuleDefinitions) + if err != nil { + return err + } + default: + potentialImportLabel, potentialDeclareLabel := ExtractImportAndDeclareLabels(componentName) + if declareNode, ok := declareNodes[potentialDeclareLabel]; ok { + uniqueReferences[componentName] = ModuleReference{componentName: componentName, importLabel: "", declareLabel: potentialDeclareLabel, declareNode: declareNode} + } else if importNode, ok := importNodes[potentialImportLabel]; ok { + uniqueReferences[componentName] = ModuleReference{componentName: componentName, importLabel: potentialImportLabel, declareLabel: potentialDeclareLabel, importNode: importNode} + } else if _, ok := parentModuleDefinitions[componentName]; ok { + uniqueReferences[componentName] = ModuleReference{componentName: componentName, importLabel: potentialImportLabel, declareLabel: potentialDeclareLabel} + } + } + } + } + return nil +} diff --git a/pkg/flow/internal/controller/node_config.go b/pkg/flow/internal/controller/node_config.go index d583dc1e1061..7b84067cea4c 100644 --- a/pkg/flow/internal/controller/node_config.go +++ b/pkg/flow/internal/controller/node_config.go @@ -3,6 +3,7 @@ package controller import ( "fmt" + importsource "github.com/grafana/agent/pkg/flow/internal/import-source" "github.com/grafana/river/ast" "github.com/grafana/river/diag" ) @@ -26,6 +27,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewLoggingConfigNode(block, globals), nil case tracingBlockID: return NewTracingConfigNode(block, globals), nil + case importsource.BlockImportFile, importsource.BlockImportGit, importsource.BlockImportHTTP: + return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil default: var diags diag.Diagnostics diags.Add(diag.Diagnostic{ @@ -46,6 +49,7 @@ type ConfigNodeMap struct { tracing *TracingConfigNode argumentMap map[string]*ArgumentConfigNode exportMap map[string]*ExportConfigNode + importMap map[string]*ImportConfigNode } // NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called @@ -56,6 +60,7 @@ func NewConfigNodeMap() *ConfigNodeMap { tracing: nil, argumentMap: map[string]*ArgumentConfigNode{}, exportMap: map[string]*ExportConfigNode{}, + importMap: map[string]*ImportConfigNode{}, } } @@ -73,6 +78,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics { nodeMap.logging = n case *TracingConfigNode: nodeMap.tracing = n + case *ImportConfigNode: + nodeMap.importMap[n.Label()] = n default: diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, diff --git a/pkg/flow/internal/controller/node_config_import.go b/pkg/flow/internal/controller/node_config_import.go new file mode 100644 index 000000000000..856e1cb1bdf8 --- /dev/null +++ b/pkg/flow/internal/controller/node_config_import.go @@ -0,0 +1,396 @@ +package controller + +import ( + "context" + "fmt" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component" + importsource "github.com/grafana/agent/pkg/flow/internal/import-source" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/river/ast" + "github.com/grafana/river/parser" + "github.com/grafana/river/vm" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" +) + +type ImportConfigNode struct { + id ComponentID + label string + nodeID string + componentName string + globalID string + globals ComponentGlobals // Need a copy of the globals to create other import nodes. + source importsource.ImportSource + registry *prometheus.Registry + importedDeclares map[string]string + importConfigNodesChildren map[string]*ImportConfigNode + OnComponentUpdate func(cn NodeWithDependants) + logger log.Logger + inContentUpdate bool + + mut sync.RWMutex + importedContentMut sync.RWMutex + block *ast.BlockStmt // Current River blocks to derive config from + lastUpdateTime atomic.Time + + healthMut sync.RWMutex + evalHealth component.Health // Health of the last evaluate + runHealth component.Health // Health of running the component +} + +var _ NodeWithDependants = (*ImportConfigNode)(nil) +var _ RunnableNode = (*ImportConfigNode)(nil) +var _ ComponentNode = (*ImportConfigNode)(nil) + +// NewImportConfigNode creates a new ImportConfigNode from an initial ast.BlockStmt. +// The underlying config isn't applied until Evaluate is called. +func NewImportConfigNode(block *ast.BlockStmt, globals ComponentGlobals, sourceType importsource.SourceType) *ImportConfigNode { + var ( + id = BlockComponentID(block) + nodeID = id.String() + ) + + initHealth := component.Health{ + Health: component.HealthTypeUnknown, + Message: "component created", + UpdateTime: time.Now(), + } + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + cn := &ImportConfigNode{ + id: id, + globalID: globalID, + label: block.Label, + globals: globals, + nodeID: BlockComponentID(block).String(), + componentName: block.GetBlockName(), + importedDeclares: make(map[string]string), + OnComponentUpdate: globals.OnComponentUpdate, + block: block, + evalHealth: initHealth, + runHealth: initHealth, + } + managedOpts := getImportManagedOptions(globals, cn) + cn.logger = managedOpts.Logger + cn.source = importsource.NewImportSource(sourceType, managedOpts, vm.New(block.Body), cn.onContentUpdate) + return cn +} + +func getImportManagedOptions(globals ComponentGlobals, cn *ImportConfigNode) component.Options { + cn.registry = prometheus.NewRegistry() + return component.Options{ + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.globalID, + }, cn.registry), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), + + DataPath: filepath.Join(globals.DataPath, cn.globalID), + + GetServiceData: func(name string) (interface{}, error) { + return globals.GetServiceData(name) + }, + } +} + +// Evaluate implements BlockNode and updates the arguments for the managed config block +// by re-evaluating its River block with the provided scope. The managed config block +// will be built the first time Evaluate is called. +// +// Evaluate will return an error if the River block cannot be evaluated or if +// decoding to arguments fails. +func (cn *ImportConfigNode) Evaluate(scope *vm.Scope) error { + err := cn.evaluate(scope) + + switch err { + case nil: + cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") + default: + msg := fmt.Sprintf("component evaluation failed: %s", err) + cn.setEvalHealth(component.HealthTypeUnhealthy, msg) + } + return err +} + +func (cn *ImportConfigNode) setEvalHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.evalHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportConfigNode) evaluate(scope *vm.Scope) error { + cn.mut.Lock() + defer cn.mut.Unlock() + return cn.source.Evaluate(scope) +} + +// processNodeBody processes the body of a node. +func (cn *ImportConfigNode) processNodeBody(node *ast.File, content string) { + for _, stmt := range node.Body { + switch stmt := stmt.(type) { + case *ast.BlockStmt: + fullName := strings.Join(stmt.Name, ".") + switch fullName { + case "declare": + cn.processDeclareBlock(stmt, content) + case importsource.BlockImportFile, importsource.BlockImportGit, importsource.BlockImportHTTP: + cn.processImportBlock(stmt, fullName) + default: + level.Error(cn.logger).Log("msg", "only declare and import blocks are allowed in a module", "forbidden", fullName) + } + default: + level.Error(cn.logger).Log("msg", "only declare and import blocks are allowed in a module") + } + } +} + +// processDeclareBlock processes a declare block. +func (cn *ImportConfigNode) processDeclareBlock(stmt *ast.BlockStmt, content string) { + if _, ok := cn.importedDeclares[stmt.Label]; ok { + level.Error(cn.logger).Log("msg", "declare block redefined", "name", stmt.Label) + return + } + cn.importedDeclares[stmt.Label] = content[stmt.LCurlyPos.Position().Offset+1 : stmt.RCurlyPos.Position().Offset-1] +} + +// processDeclareBlock processes an import block. +func (cn *ImportConfigNode) processImportBlock(stmt *ast.BlockStmt, fullName string) { + sourceType := importsource.GetSourceType(fullName) + if _, ok := cn.importConfigNodesChildren[stmt.Label]; ok { + level.Error(cn.logger).Log("msg", "import block redefined", "name", stmt.Label) + return + } + childGlobals := cn.globals + // Children have a special OnComponentUpdate function which will surface all the imported declares to the root import config node. + childGlobals.OnComponentUpdate = cn.OnChildrenContentUpdate + cn.importConfigNodesChildren[stmt.Label] = NewImportConfigNode(stmt, childGlobals, sourceType) +} + +// onContentUpdate is triggered every time the managed import component has new content. +func (cn *ImportConfigNode) onContentUpdate(content string) { + cn.importedContentMut.Lock() + defer cn.importedContentMut.Unlock() + cn.inContentUpdate = true + defer func() { + cn.inContentUpdate = false + }() + cn.importedDeclares = make(map[string]string) + // We recreate the nodes when the content changes. Can we copy instead for optimization? + cn.importConfigNodesChildren = make(map[string]*ImportConfigNode) + node, err := parser.ParseFile(cn.label, []byte(content)) + if err != nil { + level.Error(cn.logger).Log("msg", "failed to parse file on update", "err", err) + return + } + cn.processNodeBody(node, content) + err = cn.evaluateChildren() + if err != nil { + level.Error(cn.logger).Log("msg", "failed to update content", "err", err) + return + } + cn.lastUpdateTime.Store(time.Now()) + cn.OnComponentUpdate(cn) +} + +// evaluateChildren evaluates the import nodes managed by this import node. +func (cn *ImportConfigNode) evaluateChildren() error { + for _, child := range cn.importConfigNodesChildren { + err := child.Evaluate(&vm.Scope{ + Parent: nil, + Variables: make(map[string]interface{}), + }) + if err != nil { + return fmt.Errorf("imported node %s failed to evaluate, %v", child.label, err) + } + } + return nil +} + +// runChildren run the import nodes managed by this import node. +func (cn *ImportConfigNode) runChildren(ctx context.Context) error { + var wg sync.WaitGroup + errChildrenChan := make(chan error, len(cn.importConfigNodesChildren)) + + for _, child := range cn.importConfigNodesChildren { + wg.Add(1) + go func(child *ImportConfigNode) { + defer wg.Done() + if err := child.Run(ctx); err != nil { + errChildrenChan <- err + } + }(child) + } + + go func() { + wg.Wait() + close(errChildrenChan) + }() + + return <-errChildrenChan +} + +// OnChildrenContentUpdate passes their imported content to their parents. +// To avoid collisions, the content is scoped via namespaces. +func (cn *ImportConfigNode) OnChildrenContentUpdate(child NodeWithDependants) { + switch child := child.(type) { + case *ImportConfigNode: + for importedDeclareLabel, content := range child.importedDeclares { + label := child.label + "." + importedDeclareLabel + cn.importedDeclares[label] = content + } + } + // This avoids OnComponentUpdate to be called multiple times in a row when the content changes. + if !cn.inContentUpdate { + cn.OnComponentUpdate(cn) + } +} + +// ModuleContent returns the content of a declare block imported by the node. +func (cn *ImportConfigNode) ModuleContent(declareLabel string) (string, error) { + cn.importedContentMut.Lock() + defer cn.importedContentMut.Unlock() + if content, ok := cn.importedDeclares[declareLabel]; ok { + return content, nil + } + return "", fmt.Errorf("declareLabel %s not found in imported node %s", declareLabel, cn.label) +} + +// Run runs the managed component in the calling goroutine until ctx is +// canceled. Evaluate must have been called at least once without returning an +// error before calling Run. +// +// Run will immediately return ErrUnevaluated if Evaluate has never been called +// successfully. Otherwise, Run will return nil. +func (cn *ImportConfigNode) Run(ctx context.Context) error { + cn.mut.RLock() + managed := cn.source + cn.mut.RUnlock() + + if managed == nil { + return ErrUnevaluated + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errChan := make(chan error, 1) + + if len(cn.importConfigNodesChildren) > 0 { + go func() { + errChan <- cn.runChildren(ctx) + }() + } + + cn.setRunHealth(component.HealthTypeHealthy, "started component") + + go func() { + errChan <- managed.Run(ctx) + }() + + err := <-errChan + + var exitMsg string + if err != nil { + level.Error(cn.logger).Log("msg", "component exited with error", "err", err) + exitMsg = fmt.Sprintf("component shut down with error: %s", err) + } else { + level.Info(cn.logger).Log("msg", "component exited") + exitMsg = "component shut down normally" + } + + cn.setRunHealth(component.HealthTypeExited, exitMsg) + return err +} + +func (cn *ImportConfigNode) setRunHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.runHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +func (cn *ImportConfigNode) Label() string { return cn.label } + +// Block implements BlockNode and returns the current block of the managed config node. +func (cn *ImportConfigNode) Block() *ast.BlockStmt { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.block +} + +// NodeID implements dag.Node and returns the unique ID for the config node. +func (cn *ImportConfigNode) NodeID() string { return cn.nodeID } + +// This node has no exports. +func (cn *ImportConfigNode) Exports() component.Exports { + return nil +} + +func (cn *ImportConfigNode) ID() ComponentID { return cn.id } + +func (cn *ImportConfigNode) LastUpdateTime() time.Time { + return cn.lastUpdateTime.Load() +} + +// Arguments returns the current arguments of the managed component. +func (cn *ImportConfigNode) Arguments() component.Arguments { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.source.Arguments() +} + +// Component returns the instance of the managed component. Component may be +// nil if the ComponentNode has not been successfully evaluated yet. +func (cn *ImportConfigNode) Component() component.Component { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.source.Component() +} + +// CurrentHealth returns the current health of the ComponentNode. +// +// The health of a ComponentNode is determined by combining: +// +// 1. Health from the call to Run(). +// 2. Health from the last call to Evaluate(). +// 3. Health reported from the component. +func (cn *ImportConfigNode) CurrentHealth() component.Health { + cn.healthMut.RLock() + defer cn.healthMut.RUnlock() + return component.LeastHealthy(cn.runHealth, cn.evalHealth, cn.source.CurrentHealth()) +} + +// FileComponent does not have DebugInfo +func (cn *ImportConfigNode) DebugInfo() interface{} { + return nil +} + +// This component does not manage modules. +func (cn *ImportConfigNode) ModuleIDs() []string { + return nil +} + +// BlockName returns the name of the block. +func (cn *ImportConfigNode) BlockName() string { + return cn.componentName +} diff --git a/pkg/flow/internal/controller/node_declare.go b/pkg/flow/internal/controller/node_declare.go new file mode 100644 index 000000000000..ec9ff4596f83 --- /dev/null +++ b/pkg/flow/internal/controller/node_declare.go @@ -0,0 +1,55 @@ +package controller + +import ( + "sync" + + "github.com/grafana/river/ast" + "github.com/grafana/river/vm" +) + +type DeclareNode struct { + label string + nodeID string + componentName string + content string + + mut sync.RWMutex + block *ast.BlockStmt +} + +var _ BlockNode = (*DeclareNode)(nil) + +// NewDeclareNode creates a new declare node with a content which will be loaded by declare component nodes. +func NewDeclareNode(block *ast.BlockStmt, content string) *DeclareNode { + return &DeclareNode{ + label: block.Label, + nodeID: BlockComponentID(block).String(), + componentName: block.GetBlockName(), + content: content, + + block: block, + } +} + +func (cn *DeclareNode) ModuleContent() (string, error) { + cn.mut.Lock() + defer cn.mut.Unlock() + return cn.content, nil +} + +// Evaluate does nothing for this node. +func (cn *DeclareNode) Evaluate(scope *vm.Scope) error { + return nil +} + +func (cn *DeclareNode) Label() string { return cn.label } + +// Block implements BlockNode and returns the current block of the managed config node. +func (cn *DeclareNode) Block() *ast.BlockStmt { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.block +} + +// NodeID implements dag.Node and returns the unique ID for the config node. +func (cn *DeclareNode) NodeID() string { return cn.nodeID } diff --git a/pkg/flow/internal/controller/node_declare_component.go b/pkg/flow/internal/controller/node_declare_component.go new file mode 100644 index 000000000000..b389d2c8989e --- /dev/null +++ b/pkg/flow/internal/controller/node_declare_component.go @@ -0,0 +1,361 @@ +package controller + +import ( + "context" + "fmt" + "path" + "path/filepath" + "reflect" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/module" + "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/pkg/flow/tracing" + "github.com/grafana/river/ast" + "github.com/grafana/river/vm" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" +) + +// DeclareComponentNode is a controller node which manages a module. +// +// DeclareComponentNode manages the underlying module and caches its current +// arguments and exports. +type DeclareComponentNode struct { + id ComponentID + globalID string + label string + componentName string + importLabel string + declareLabel string + nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. + managedOpts component.Options + registry *prometheus.Registry + moduleController ModuleController + OnComponentUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate + + GetModuleInfo func(fullName string, importLabel string, declareLabel string) (ModuleInfo, error) // Retrieve the module config. + lastUpdateTime atomic.Time + + mut sync.RWMutex + block *ast.BlockStmt // Current River block to derive args from + eval *vm.Evaluator + managed *module.ModuleComponent // Inner managed module + args component.Arguments // Evaluated arguments for the managed component + + // NOTE(rfratto): health and exports have their own mutex because they may be + // set asynchronously while mut is still being held (i.e., when calling Evaluate + // and the managed module immediately creates new exports) + + healthMut sync.RWMutex + evalHealth component.Health // Health of the last evaluate + runHealth component.Health // Health of running the component + + exportsMut sync.RWMutex + exports component.Exports // Evaluated exports for the managed module +} + +// ExtractImportAndDeclareLabels extract an importLabel and a declareLabel from a componentName. +func ExtractImportAndDeclareLabels(componentName string) (string, string) { + parts := strings.Split(componentName, ".") + if len(parts) == 0 { + return "", "" + } + // If this is a local declare. + importLabel := "" + declareLabel := parts[0] + // If this is an imported module. + if len(parts) > 1 { + importLabel = parts[0] + declareLabel = parts[1] + } + return importLabel, declareLabel +} + +var _ NodeWithDependants = (*DeclareComponentNode)(nil) +var _ ComponentNode = (*DeclareComponentNode)(nil) + +// NewDeclareComponentNode creates a new DeclareComponentNode from an initial ast.BlockStmt. +// The underlying managed module isn't created until Evaluate is called. +func NewDeclareComponentNode(globals ComponentGlobals, b *ast.BlockStmt, GetModuleInfo func(string, string, string) (ModuleInfo, error)) *DeclareComponentNode { + var ( + id = BlockComponentID(b) + nodeID = id.String() + ) + + initHealth := component.Health{ + Health: component.HealthTypeUnknown, + Message: "node declare component created", + UpdateTime: time.Now(), + } + + // We need to generate a globally unique component ID to give to the + // component and for use with telemetry data which doesn't support + // reconstructing the global ID. For everything else (HTTP, data), we can + // just use the controller-local ID as those values are guaranteed to be + // globally unique. + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + + componentName := b.GetBlockName() + + importLabel, declareLabel := ExtractImportAndDeclareLabels(componentName) + + cn := &DeclareComponentNode{ + id: id, + globalID: globalID, + label: b.Label, + nodeID: nodeID, + componentName: componentName, + importLabel: importLabel, + declareLabel: declareLabel, + moduleController: globals.NewModuleController(globalID), + OnComponentUpdate: globals.OnComponentUpdate, + GetModuleInfo: GetModuleInfo, + + block: b, + eval: vm.New(b.Body), + + evalHealth: initHealth, + runHealth: initHealth, + } + cn.managedOpts = getDeclareManagedOptions(globals, cn) + + return cn +} + +func getDeclareManagedOptions(globals ComponentGlobals, cn *DeclareComponentNode) component.Options { + cn.registry = prometheus.NewRegistry() + return component.Options{ + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.globalID, + }, cn.registry), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), + + DataPath: filepath.Join(globals.DataPath, cn.globalID), + + OnStateChange: cn.setExports, + ModuleController: cn.moduleController, + + GetServiceData: func(name string) (interface{}, error) { + return globals.GetServiceData(name) + }, + } +} + +// ID returns the component ID of the managed component from its River block. +func (cn *DeclareComponentNode) ID() ComponentID { return cn.id } + +// Label returns the label for the block or "" if none was specified. +func (cn *DeclareComponentNode) Label() string { return cn.label } + +// NodeID implements dag.Node and returns the unique ID for this node. The +// NodeID is the string representation of the component's ID from its River +// block. +func (cn *DeclareComponentNode) NodeID() string { return cn.nodeID } + +// UpdateBlock updates the River block used to construct arguments for the +// managed module. The new block isn't used until the next time Evaluate is +// invoked. +// +// UpdateBlock will panic if the block does not match the component ID of the +// DeclareComponentNode. +func (cn *DeclareComponentNode) UpdateBlock(b *ast.BlockStmt) { + if !BlockComponentID(b).Equals(cn.id) { + panic("UpdateBlock called with an River block with a different component ID") + } + + cn.mut.Lock() + defer cn.mut.Unlock() + cn.block = b + cn.eval = vm.New(b.Body) +} + +// Evaluate implements BlockNode and updates the arguments by re-evaluating its River block with the provided scope and the module content by +// retrieving it from the corresponding import or declare node for the managed module. +// The managed module will be built the first time Evaluate is called. +// +// Evaluate will return an error if the River block cannot be evaluated, if +// decoding to arguments fails or if the module content cannot be retrieved. +func (cn *DeclareComponentNode) Evaluate(scope *vm.Scope) error { + err := cn.evaluate(scope) + + switch err { + case nil: + cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") + default: + msg := fmt.Sprintf("component evaluation failed: %s", err) + cn.setEvalHealth(component.HealthTypeUnhealthy, msg) + } + return err +} + +func (cn *DeclareComponentNode) evaluate(scope *vm.Scope) error { + cn.mut.Lock() + defer cn.mut.Unlock() + + var args map[string]any + if err := cn.eval.Evaluate(scope, &args); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + + if cn.managed == nil { + // We haven't built the managed module successfully yet. + managed, err := module.NewModuleComponent(cn.managedOpts) + if err != nil { + return fmt.Errorf("building module: %w", err) + } + cn.managed = managed + } + + moduleInfo, err := cn.GetModuleInfo(cn.componentName, cn.importLabel, cn.declareLabel) + if err != nil { + return fmt.Errorf("retrieving module info: %w", err) + } + + // Reload the module with new config + if err := cn.managed.LoadFlowSource(args, moduleInfo.content, moduleInfo.moduleDefinitions); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (cn *DeclareComponentNode) Run(ctx context.Context) error { + cn.mut.RLock() + managed := cn.managed + logger := cn.managedOpts.Logger + cn.mut.RUnlock() + + if managed == nil { + return ErrUnevaluated + } + + cn.setRunHealth(component.HealthTypeHealthy, "started module") + cn.managed.RunFlowController(ctx) + + level.Info(logger).Log("msg", "module exited") + cn.setRunHealth(component.HealthTypeExited, "module shut down") + return nil +} + +// Arguments returns the current arguments of the managed module. +func (cn *DeclareComponentNode) Arguments() component.Arguments { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.args +} + +// Block implements BlockNode and returns the current block of the managed module. +func (cn *DeclareComponentNode) Block() *ast.BlockStmt { + cn.mut.RLock() + defer cn.mut.RUnlock() + return cn.block +} + +// Exports returns the current set of exports from the managed module. +// Exports returns nil if the managed module does not have exports. +func (cn *DeclareComponentNode) Exports() component.Exports { + cn.exportsMut.RLock() + defer cn.exportsMut.RUnlock() + return cn.exports +} + +func (cn *DeclareComponentNode) LastUpdateTime() time.Time { + return cn.lastUpdateTime.Load() +} + +// setExports is called whenever the managed module updates. e must be the +// same type as the registered exports type of the managed module. +func (cn *DeclareComponentNode) setExports(e component.Exports) { + // Some components may aggressively reexport values even though no exposed + // state has changed. This may be done for components which always supply + // exports whenever their arguments are evaluated without tracking internal + // state to see if anything actually changed. + // + // To avoid needlessly reevaluating components we'll ignore unchanged + // exports. + var changed bool + + cn.exportsMut.Lock() + if !reflect.DeepEqual(cn.exports, e) { + changed = true + cn.exports = e + } + cn.exportsMut.Unlock() + + if changed { + // Inform the controller that we have new exports. + cn.lastUpdateTime.Store(time.Now()) + cn.OnComponentUpdate(cn) + } +} + +// CurrentHealth returns the current health of the DeclareComponentNode. +// +// The health of a DeclareComponentNode is determined by combining: +// +// 1. Health from the call to Run(). +// 2. Health from the last call to Evaluate(). +// 3. Health reported from the module. +func (cn *DeclareComponentNode) CurrentHealth() component.Health { + cn.healthMut.RLock() + defer cn.healthMut.RUnlock() + return component.LeastHealthy(cn.runHealth, cn.evalHealth, cn.managed.CurrentHealth()) +} + +// TODO implement debugInfo? +func (cn *DeclareComponentNode) DebugInfo() interface{} { + cn.mut.RLock() + defer cn.mut.RUnlock() + return nil +} + +// setEvalHealth sets the internal health from a call to Evaluate. See Health +// for information on how overall health is calculated. +func (cn *DeclareComponentNode) setEvalHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.evalHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +// setRunHealth sets the internal health from a call to Run. See Health for +// information on how overall health is calculated. +func (cn *DeclareComponentNode) setRunHealth(t component.HealthType, msg string) { + cn.healthMut.Lock() + defer cn.healthMut.Unlock() + + cn.runHealth = component.Health{ + Health: t, + Message: msg, + UpdateTime: time.Now(), + } +} + +// ModuleIDs returns the current list of modules that this component is +// managing. +func (cn *DeclareComponentNode) ModuleIDs() []string { + return cn.moduleController.ModuleIDs() +} + +// BlockName returns the name of the block. +func (cn *DeclareComponentNode) BlockName() string { + return cn.componentName +} + +// This node does not manage any component. +func (cn *DeclareComponentNode) Component() component.Component { + return nil +} diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_native_component.go similarity index 85% rename from pkg/flow/internal/controller/node_component.go rename to pkg/flow/internal/controller/node_native_component.go index b99597809d4b..ef14722caf78 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_native_component.go @@ -66,7 +66,7 @@ type ComponentGlobals struct { Logger *logging.Logger // Logger shared between all managed components. TraceProvider trace.TracerProvider // Tracer shared between all managed components. DataPath string // Shared directory where component data may be stored - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + OnComponentUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports Registerer prometheus.Registerer // Registerer for serving agent and component metrics ControllerID string // ID of controller. @@ -74,12 +74,12 @@ type ComponentGlobals struct { GetServiceData func(name string) (interface{}, error) // Get data for a service. } -// ComponentNode is a controller node which manages a user-defined component. +// NativeComponentNode is a controller node which manages a user-defined component. // -// ComponentNode manages the underlying component and caches its current -// arguments and exports. ComponentNode manages the arguments for the component +// NativeComponentNode manages the underlying component and caches its current +// arguments and exports. NativeComponentNode manages the arguments for the component // from a River block. -type ComponentNode struct { +type NativeComponentNode struct { id ComponentID globalID string label string @@ -90,7 +90,7 @@ type ComponentNode struct { registry *prometheus.Registry exportsType reflect.Type moduleController ModuleController - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + OnComponentUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate lastUpdateTime atomic.Time mut sync.RWMutex @@ -111,11 +111,12 @@ type ComponentNode struct { exports component.Exports // Evaluated exports for the managed component } -var _ BlockNode = (*ComponentNode)(nil) +var _ NodeWithDependants = (*NativeComponentNode)(nil) +var _ ComponentNode = (*NativeComponentNode)(nil) // NewComponentNode creates a new ComponentNode from an initial ast.BlockStmt. // The underlying managed component isn't created until Evaluate is called. -func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *ast.BlockStmt) *ComponentNode { +func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *ast.BlockStmt) *NativeComponentNode { var ( id = BlockComponentID(b) nodeID = id.String() @@ -137,12 +138,12 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a globalID = path.Join(globals.ControllerID, nodeID) } - cn := &ComponentNode{ + cn := &NativeComponentNode{ id: id, globalID: globalID, label: b.Label, nodeID: nodeID, - componentName: strings.Join(b.Name, "."), + componentName: b.GetBlockName(), reg: reg, exportsType: getExportsType(reg), moduleController: globals.NewModuleController(globalID), @@ -163,7 +164,7 @@ func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *a return cn } -func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Options { +func getManagedOptions(globals ComponentGlobals, cn *NativeComponentNode) component.Options { cn.registry = prometheus.NewRegistry() return component.Options{ ID: cn.globalID, @@ -192,29 +193,29 @@ func getExportsType(reg component.Registration) reflect.Type { } // Registration returns the original registration of the component. -func (cn *ComponentNode) Registration() component.Registration { return cn.reg } +func (cn *NativeComponentNode) Registration() component.Registration { return cn.reg } // Component returns the instance of the managed component. Component may be // nil if the ComponentNode has not been successfully evaluated yet. -func (cn *ComponentNode) Component() component.Component { +func (cn *NativeComponentNode) Component() component.Component { cn.mut.RLock() defer cn.mut.RUnlock() return cn.managed } // ID returns the component ID of the managed component from its River block. -func (cn *ComponentNode) ID() ComponentID { return cn.id } +func (cn *NativeComponentNode) ID() ComponentID { return cn.id } // Label returns the label for the block or "" if none was specified. -func (cn *ComponentNode) Label() string { return cn.label } +func (cn *NativeComponentNode) Label() string { return cn.label } // ComponentName returns the component's type, i.e. `local.file.test` returns `local.file`. -func (cn *ComponentNode) ComponentName() string { return cn.componentName } +func (cn *NativeComponentNode) ComponentName() string { return cn.componentName } // NodeID implements dag.Node and returns the unique ID for this node. The // NodeID is the string representation of the component's ID from its River // block. -func (cn *ComponentNode) NodeID() string { return cn.nodeID } +func (cn *NativeComponentNode) NodeID() string { return cn.nodeID } // UpdateBlock updates the River block used to construct arguments for the // managed component. The new block isn't used until the next time Evaluate is @@ -222,7 +223,7 @@ func (cn *ComponentNode) NodeID() string { return cn.nodeID } // // UpdateBlock will panic if the block does not match the component ID of the // ComponentNode. -func (cn *ComponentNode) UpdateBlock(b *ast.BlockStmt) { +func (cn *NativeComponentNode) UpdateBlock(b *ast.BlockStmt) { if !BlockComponentID(b).Equals(cn.id) { panic("UpdateBlock called with an River block with a different component ID") } @@ -239,7 +240,7 @@ func (cn *ComponentNode) UpdateBlock(b *ast.BlockStmt) { // // Evaluate will return an error if the River block cannot be evaluated or if // decoding to arguments fails. -func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { +func (cn *NativeComponentNode) Evaluate(scope *vm.Scope) error { err := cn.evaluate(scope) switch err { @@ -252,7 +253,7 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { return err } -func (cn *ComponentNode) evaluate(scope *vm.Scope) error { +func (cn *NativeComponentNode) evaluate(scope *vm.Scope) error { cn.mut.Lock() defer cn.mut.Unlock() @@ -299,7 +300,7 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error { // // Run will immediately return ErrUnevaluated if Evaluate has never been called // successfully. Otherwise, Run will return nil. -func (cn *ComponentNode) Run(ctx context.Context) error { +func (cn *NativeComponentNode) Run(ctx context.Context) error { cn.mut.RLock() managed := cn.managed cn.mut.RUnlock() @@ -330,14 +331,14 @@ func (cn *ComponentNode) Run(ctx context.Context) error { var ErrUnevaluated = errors.New("managed component not built") // Arguments returns the current arguments of the managed component. -func (cn *ComponentNode) Arguments() component.Arguments { +func (cn *NativeComponentNode) Arguments() component.Arguments { cn.mut.RLock() defer cn.mut.RUnlock() return cn.args } // Block implements BlockNode and returns the current block of the managed component. -func (cn *ComponentNode) Block() *ast.BlockStmt { +func (cn *NativeComponentNode) Block() *ast.BlockStmt { cn.mut.RLock() defer cn.mut.RUnlock() return cn.block @@ -345,15 +346,19 @@ func (cn *ComponentNode) Block() *ast.BlockStmt { // Exports returns the current set of exports from the managed component. // Exports returns nil if the managed component does not have exports. -func (cn *ComponentNode) Exports() component.Exports { +func (cn *NativeComponentNode) Exports() component.Exports { cn.exportsMut.RLock() defer cn.exportsMut.RUnlock() return cn.exports } +func (cn *NativeComponentNode) LastUpdateTime() time.Time { + return cn.lastUpdateTime.Load() +} + // setExports is called whenever the managed component updates. e must be the // same type as the registered exports type of the managed component. -func (cn *ComponentNode) setExports(e component.Exports) { +func (cn *NativeComponentNode) setExports(e component.Exports) { if cn.exportsType == nil { panic(fmt.Sprintf("Component %s called OnStateChange but never registered an Exports type", cn.nodeID)) } @@ -391,7 +396,7 @@ func (cn *ComponentNode) setExports(e component.Exports) { // 1. Health from the call to Run(). // 2. Health from the last call to Evaluate(). // 3. Health reported from the component. -func (cn *ComponentNode) CurrentHealth() component.Health { +func (cn *NativeComponentNode) CurrentHealth() component.Health { cn.healthMut.RLock() defer cn.healthMut.RUnlock() @@ -409,7 +414,7 @@ func (cn *ComponentNode) CurrentHealth() component.Health { } // DebugInfo returns debugging information from the managed component (if any). -func (cn *ComponentNode) DebugInfo() interface{} { +func (cn *NativeComponentNode) DebugInfo() interface{} { cn.mut.RLock() defer cn.mut.RUnlock() @@ -421,7 +426,7 @@ func (cn *ComponentNode) DebugInfo() interface{} { // setEvalHealth sets the internal health from a call to Evaluate. See Health // for information on how overall health is calculated. -func (cn *ComponentNode) setEvalHealth(t component.HealthType, msg string) { +func (cn *NativeComponentNode) setEvalHealth(t component.HealthType, msg string) { cn.healthMut.Lock() defer cn.healthMut.Unlock() @@ -434,7 +439,7 @@ func (cn *ComponentNode) setEvalHealth(t component.HealthType, msg string) { // setRunHealth sets the internal health from a call to Run. See Health for // information on how overall health is calculated. -func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) { +func (cn *NativeComponentNode) setRunHealth(t component.HealthType, msg string) { cn.healthMut.Lock() defer cn.healthMut.Unlock() @@ -447,6 +452,11 @@ func (cn *ComponentNode) setRunHealth(t component.HealthType, msg string) { // ModuleIDs returns the current list of modules that this component is // managing. -func (cn *ComponentNode) ModuleIDs() []string { +func (cn *NativeComponentNode) ModuleIDs() []string { return cn.moduleController.ModuleIDs() } + +// BlockName returns the name of the block. +func (cn *NativeComponentNode) BlockName() string { + return cn.componentName +} diff --git a/pkg/flow/internal/controller/node_component_test.go b/pkg/flow/internal/controller/node_native_component_test.go similarity index 93% rename from pkg/flow/internal/controller/node_component_test.go rename to pkg/flow/internal/controller/node_native_component_test.go index 6eb46f004601..e2f734352030 100644 --- a/pkg/flow/internal/controller/node_component_test.go +++ b/pkg/flow/internal/controller/node_native_component_test.go @@ -14,7 +14,7 @@ func TestGlobalID(t *testing.T) { NewModuleController: func(id string) ModuleController { return nil }, - }, &ComponentNode{ + }, &NativeComponentNode{ nodeID: "local.id", globalID: "module.file/local.id", }) @@ -28,7 +28,7 @@ func TestLocalID(t *testing.T) { NewModuleController: func(id string) ModuleController { return nil }, - }, &ComponentNode{ + }, &NativeComponentNode{ nodeID: "local.id", globalID: "local.id", }) diff --git a/pkg/flow/internal/controller/node_with_dependants.go b/pkg/flow/internal/controller/node_with_dependants.go new file mode 100644 index 000000000000..a7c47360c7ff --- /dev/null +++ b/pkg/flow/internal/controller/node_with_dependants.go @@ -0,0 +1,18 @@ +package controller + +import ( + "time" + + "github.com/grafana/agent/component" +) + +// NodeWithDependants must be implemented by nodes which can trigger other nodes to be evaluated. +type NodeWithDependants interface { + BlockNode + + LastUpdateTime() time.Time + + Exports() component.Exports + + ID() ComponentID +} diff --git a/pkg/flow/internal/controller/queue.go b/pkg/flow/internal/controller/queue.go index a8cd1b5bae05..d4708cecf806 100644 --- a/pkg/flow/internal/controller/queue.go +++ b/pkg/flow/internal/controller/queue.go @@ -10,8 +10,8 @@ import ( // for later reevaluation. type Queue struct { mut sync.Mutex - queuedSet map[*ComponentNode]struct{} - queuedOrder []*ComponentNode + queuedSet map[NodeWithDependants]struct{} + queuedOrder []NodeWithDependants updateCh chan struct{} } @@ -20,14 +20,14 @@ type Queue struct { func NewQueue() *Queue { return &Queue{ updateCh: make(chan struct{}, 1), - queuedSet: make(map[*ComponentNode]struct{}), - queuedOrder: make([]*ComponentNode, 0), + queuedSet: make(map[NodeWithDependants]struct{}), + queuedOrder: make([]NodeWithDependants, 0), } } // Enqueue inserts a new component into the Queue. Enqueue is a no-op if the // component is already in the Queue. -func (q *Queue) Enqueue(c *ComponentNode) { +func (q *Queue) Enqueue(c NodeWithDependants) { q.mut.Lock() defer q.mut.Unlock() @@ -48,13 +48,13 @@ func (q *Queue) Enqueue(c *ComponentNode) { func (q *Queue) Chan() <-chan struct{} { return q.updateCh } // DequeueAll removes all components from the queue and returns them. -func (q *Queue) DequeueAll() []*ComponentNode { +func (q *Queue) DequeueAll() []NodeWithDependants { q.mut.Lock() defer q.mut.Unlock() all := q.queuedOrder - q.queuedOrder = make([]*ComponentNode, 0) - q.queuedSet = make(map[*ComponentNode]struct{}) + q.queuedOrder = make([]NodeWithDependants, 0) + q.queuedSet = make(map[NodeWithDependants]struct{}) return all } diff --git a/pkg/flow/internal/controller/queue_test.go b/pkg/flow/internal/controller/queue_test.go index c93fb14ef8fc..8fe953ba31ba 100644 --- a/pkg/flow/internal/controller/queue_test.go +++ b/pkg/flow/internal/controller/queue_test.go @@ -9,7 +9,7 @@ import ( ) func TestEnqueueDequeue(t *testing.T) { - tn := &ComponentNode{} + tn := &NativeComponentNode{} q := NewQueue() q.Enqueue(tn) require.Lenf(t, q.queuedSet, 1, "queue should be 1") @@ -26,7 +26,7 @@ func TestDequeue_Empty(t *testing.T) { } func TestDequeue_InOrder(t *testing.T) { - c1, c2, c3 := &ComponentNode{}, &ComponentNode{}, &ComponentNode{} + c1, c2, c3 := &NativeComponentNode{}, &NativeComponentNode{}, &NativeComponentNode{} q := NewQueue() q.Enqueue(c1) q.Enqueue(c2) @@ -41,7 +41,7 @@ func TestDequeue_InOrder(t *testing.T) { } func TestDequeue_NoDuplicates(t *testing.T) { - c1, c2 := &ComponentNode{}, &ComponentNode{} + c1, c2 := &NativeComponentNode{}, &NativeComponentNode{} q := NewQueue() q.Enqueue(c1) q.Enqueue(c1) @@ -58,7 +58,7 @@ func TestDequeue_NoDuplicates(t *testing.T) { } func TestEnqueue_ChannelNotification(t *testing.T) { - c1 := &ComponentNode{} + c1 := &NativeComponentNode{} q := NewQueue() notificationsCount := atomic.Int32{} diff --git a/pkg/flow/internal/import-source/import_file.go b/pkg/flow/internal/import-source/import_file.go new file mode 100644 index 000000000000..178064c91872 --- /dev/null +++ b/pkg/flow/internal/import-source/import_file.go @@ -0,0 +1,86 @@ +package importsource + +import ( + "context" + "fmt" + "reflect" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/local/file" + "github.com/grafana/river/vm" +) + +type ImportFile struct { + fileComponent *file.Component + arguments component.Arguments + managedOpts component.Options + eval *vm.Evaluator +} + +var _ ImportSource = (*ImportFile)(nil) + +func NewImportFile(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportFile { + opts := managedOpts + opts.OnStateChange = func(e component.Exports) { + onContentChange(e.(file.Exports).Content.Value) + } + return &ImportFile{ + managedOpts: opts, + eval: eval, + } +} + +type importFileConfigBlock struct { + LocalFileArguments file.Arguments `river:",squash"` +} + +// SetToDefault implements river.Defaulter. +func (a *importFileConfigBlock) SetToDefault() { + a.LocalFileArguments = file.DefaultArguments +} + +func (im *ImportFile) Evaluate(scope *vm.Scope) error { + var arguments importFileConfigBlock + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + if im.fileComponent == nil { + var err error + im.fileComponent, err = file.New(im.managedOpts, arguments.LocalFileArguments) + if err != nil { + return fmt.Errorf("creating file component: %w", err) + } + im.arguments = arguments + } + + if reflect.DeepEqual(im.arguments, arguments) { + return nil + } + + // Update the existing managed component + if err := im.fileComponent.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportFile) Run(ctx context.Context) error { + return im.fileComponent.Run(ctx) +} + +func (im *ImportFile) Arguments() component.Arguments { + return im.arguments +} + +func (im *ImportFile) Component() component.Component { + return im.fileComponent +} + +func (im *ImportFile) CurrentHealth() component.Health { + return im.fileComponent.CurrentHealth() +} + +// DebugInfo() is not implemented by the file component. +func (im *ImportFile) DebugInfo() interface{} { + return nil +} diff --git a/pkg/flow/internal/import-source/import_git.go b/pkg/flow/internal/import-source/import_git.go new file mode 100644 index 000000000000..b889f3a9d157 --- /dev/null +++ b/pkg/flow/internal/import-source/import_git.go @@ -0,0 +1,281 @@ +package importsource + +import ( + "context" + "errors" + "fmt" + "path/filepath" + "reflect" + "sync" + "time" + + "github.com/go-kit/log" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/logging/level" + vcs "github.com/grafana/agent/pkg/util/git" + "github.com/grafana/river/vm" +) + +// The difference between this import source and the others is that there is no git component. +// The git logic in the internal package is a copy of the one used in the old module. +type ImportGit struct { + opts component.Options + log log.Logger + eval *vm.Evaluator + mut sync.RWMutex + repo *vcs.GitRepo + repoOpts vcs.GitRepoOptions + args Arguments + onContentChange func(string) + + lastContent string + + argsChanged chan struct{} + + healthMut sync.RWMutex + health component.Health +} + +var ( + _ ImportSource = (*ImportGit)(nil) + _ component.Component = (*ImportGit)(nil) + _ component.HealthComponent = (*ImportGit)(nil) +) + +type Arguments struct { + Repository string `river:"repository,attr"` + Revision string `river:"revision,attr,optional"` + Path string `river:"path,attr"` + PullFrequency time.Duration `river:"pull_frequency,attr,optional"` + GitAuthConfig vcs.GitAuthConfig `river:",squash"` +} + +var DefaultArguments = Arguments{ + Revision: "HEAD", + PullFrequency: time.Minute, +} + +// SetToDefault implements river.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +func NewImportGit(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportGit { + return &ImportGit{ + opts: managedOpts, + log: managedOpts.Logger, + eval: eval, + argsChanged: make(chan struct{}, 1), + onContentChange: onContentChange, + } +} + +func (im *ImportGit) Evaluate(scope *vm.Scope) error { + var arguments Arguments + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + + if reflect.DeepEqual(im.args, arguments) { + return nil + } + + if err := im.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportGit) Run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + ticker *time.Ticker + tickerC <-chan time.Time + ) + + for { + select { + case <-ctx.Done(): + return nil + + case <-im.argsChanged: + im.mut.Lock() + pullFrequency := im.args.PullFrequency + im.mut.Unlock() + ticker, tickerC = im.updateTicker(pullFrequency, ticker, tickerC) + + case <-tickerC: + level.Info(im.log).Log("msg", "updating repository") + im.tickPollFile(ctx) + } + } +} + +func (im *ImportGit) updateTicker(pullFrequency time.Duration, ticker *time.Ticker, tickerC <-chan time.Time) (*time.Ticker, <-chan time.Time) { + level.Info(im.log).Log("msg", "updating repository pull frequency, next pull attempt will be done according to the pullFrequency", "new_frequency", pullFrequency) + + if pullFrequency > 0 { + if ticker == nil { + ticker = time.NewTicker(pullFrequency) + tickerC = ticker.C + } else { + ticker.Reset(pullFrequency) + } + return ticker, tickerC + } + + if ticker != nil { + ticker.Stop() + } + return nil, nil +} + +func (im *ImportGit) tickPollFile(ctx context.Context) { + im.mut.Lock() + err := im.pollFile(ctx, im.args) + pullFrequency := im.args.PullFrequency + im.mut.Unlock() + + im.updateHealth(err) + + if err != nil { + level.Error(im.log).Log("msg", "failed to update repository", "pullFrequency", pullFrequency, "err", err) + } +} + +func (im *ImportGit) updateHealth(err error) { + im.healthMut.Lock() + defer im.healthMut.Unlock() + + if err != nil { + im.health = component.Health{ + Health: component.HealthTypeUnhealthy, + Message: err.Error(), + UpdateTime: time.Now(), + } + } else { + im.health = component.Health{ + Health: component.HealthTypeHealthy, + Message: "module updated", + UpdateTime: time.Now(), + } + } +} + +// Update implements component.Component. +// Only acknowledge the error from Update if it's not a +// vcs.UpdateFailedError; vcs.UpdateFailedError means that the Git repo +// exists, but we were unable to update it. It makes sense to retry on the next poll and it may succeed. +func (im *ImportGit) Update(args component.Arguments) (err error) { + defer func() { + im.updateHealth(err) + }() + im.mut.Lock() + defer im.mut.Unlock() + + newArgs := args.(Arguments) + + // TODO(rfratto): store in a repo-specific directory so changing repositories + // doesn't risk break the module loader if there's a SHA collision between + // the two different repositories. + repoPath := filepath.Join(im.opts.DataPath, "repo") + + repoOpts := vcs.GitRepoOptions{ + Repository: newArgs.Repository, + Revision: newArgs.Revision, + Auth: newArgs.GitAuthConfig, + } + + // Create or update the repo field. + // Failure to update repository makes the module loader temporarily use cached contents on disk + if im.repo == nil || !reflect.DeepEqual(repoOpts, im.repoOpts) { + r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts) + if err != nil { + if errors.As(err, &vcs.UpdateFailedError{}) { + level.Error(im.log).Log("msg", "failed to update repository", "err", err) + im.updateHealth(err) + } else { + return err + } + } + im.repo = r + im.repoOpts = repoOpts + } + + if err := im.pollFile(context.Background(), newArgs); err != nil { + if errors.As(err, &vcs.UpdateFailedError{}) { + level.Error(im.log).Log("msg", "failed to poll file from repository", "err", err) + // We don't update the health here because it will be updated via the defer call. + // This is not very good because if we reassign the err before exiting the function it will not update the health correctly. + // TODO improve the error health handling. + } else { + return err + } + } + + // Schedule an update for handling the changed arguments. + select { + case im.argsChanged <- struct{}{}: + default: + } + + im.args = newArgs + return nil +} + +// pollFile fetches the latest content from the repository and updates the +// controller. pollFile must only be called with im.mut held. +func (im *ImportGit) pollFile(ctx context.Context, args Arguments) error { + // Make sure our repo is up-to-date. + if err := im.repo.Update(ctx); err != nil { + return err + } + + // Finally, configure our controller. + bb, err := im.repo.ReadFile(args.Path) + if err != nil { + return err + } + content := string(bb) + if im.lastContent != content { + im.onContentChange(content) + im.lastContent = content + } + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (im *ImportGit) CurrentHealth() component.Health { + im.healthMut.RLock() + defer im.healthMut.RUnlock() + return im.health +} + +// DebugInfo implements component.DebugComponent. +func (im *ImportGit) DebugInfo() interface{} { + type DebugInfo struct { + SHA string `river:"sha,attr"` + RepoError string `river:"repo_error,attr,optional"` + } + + im.mut.RLock() + defer im.mut.RUnlock() + + rev, err := im.repo.CurrentRevision() + if err != nil { + return DebugInfo{RepoError: err.Error()} + } else { + return DebugInfo{SHA: rev} + } +} + +func (im *ImportGit) Arguments() component.Arguments { + return im.args +} + +func (im *ImportGit) Component() component.Component { + return im +} diff --git a/pkg/flow/internal/import-source/import_http.go b/pkg/flow/internal/import-source/import_http.go new file mode 100644 index 000000000000..9bc0ece91fbc --- /dev/null +++ b/pkg/flow/internal/import-source/import_http.go @@ -0,0 +1,87 @@ +package importsource + +import ( + "context" + "fmt" + "reflect" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/remote/http" + remote_http "github.com/grafana/agent/component/remote/http" + "github.com/grafana/river/vm" +) + +type ImportHTTP struct { + managedRemoteHTTP *remote_http.Component + arguments component.Arguments + managedOpts component.Options + eval *vm.Evaluator +} + +var _ ImportSource = (*ImportHTTP)(nil) + +func NewImportHTTP(managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) *ImportHTTP { + opts := managedOpts + opts.OnStateChange = func(e component.Exports) { + onContentChange(e.(http.Exports).Content.Value) + } + return &ImportHTTP{ + managedOpts: opts, + eval: eval, + } +} + +type ImportHTTPConfigBlock struct { + RemoteHTTPArguments remote_http.Arguments `river:",squash"` +} + +// SetToDefault implements river.Defaulter. +func (a *ImportHTTPConfigBlock) SetToDefault() { + a.RemoteHTTPArguments.SetToDefault() +} + +func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { + var arguments ImportHTTPConfigBlock + if err := im.eval.Evaluate(scope, &arguments); err != nil { + return fmt.Errorf("decoding River: %w", err) + } + if im.managedRemoteHTTP == nil { + var err error + im.managedRemoteHTTP, err = remote_http.New(im.managedOpts, arguments.RemoteHTTPArguments) + if err != nil { + return fmt.Errorf("creating http component: %w", err) + } + im.arguments = arguments + } + + if reflect.DeepEqual(im.arguments, arguments) { + return nil + } + + // Update the existing managed component + if err := im.managedRemoteHTTP.Update(arguments); err != nil { + return fmt.Errorf("updating component: %w", err) + } + return nil +} + +func (im *ImportHTTP) Run(ctx context.Context) error { + return im.managedRemoteHTTP.Run(ctx) +} + +func (im *ImportHTTP) Arguments() component.Arguments { + return im.arguments +} + +func (im *ImportHTTP) Component() component.Component { + return im.managedRemoteHTTP +} + +func (im *ImportHTTP) CurrentHealth() component.Health { + return im.managedRemoteHTTP.CurrentHealth() +} + +// DebugInfo() is not implemented by the http component. +func (im *ImportHTTP) DebugInfo() interface{} { + return nil +} diff --git a/pkg/flow/internal/import-source/import_source.go b/pkg/flow/internal/import-source/import_source.go new file mode 100644 index 000000000000..1467234c1d7e --- /dev/null +++ b/pkg/flow/internal/import-source/import_source.go @@ -0,0 +1,58 @@ +package importsource + +import ( + "context" + "fmt" + + "github.com/grafana/agent/component" + "github.com/grafana/river/vm" +) + +type SourceType int + +const ( + FILE SourceType = iota + HTTP + GIT +) + +const ( + BlockImportFile = "import.file" + BlockImportHTTP = "import.http" + BlockImportGit = "import.git" +) + +type ImportSource interface { + Evaluate(scope *vm.Scope) error + Run(ctx context.Context) error + Component() component.Component + CurrentHealth() component.Health + DebugInfo() interface{} + Arguments() component.Arguments +} + +func NewImportSource(sourceType SourceType, managedOpts component.Options, eval *vm.Evaluator, onContentChange func(string)) ImportSource { + switch sourceType { + case FILE: + return NewImportFile(managedOpts, eval, onContentChange) + case HTTP: + return NewImportHTTP(managedOpts, eval, onContentChange) + case GIT: + return NewImportGit(managedOpts, eval, onContentChange) + } + // This is a programming error, not a config error so this is ok to panic. + panic(fmt.Errorf("unsupported source type: %v", sourceType)) +} + +func GetSourceType(fullName string) SourceType { + switch fullName { + case BlockImportFile: + return FILE + case BlockImportGit: + return GIT + case BlockImportHTTP: + return HTTP + } + // This is a programming error, not a config error so this is ok to panic. + panic(fmt.Errorf("name does not map to a know source type: %v", fullName)) +} diff --git a/pkg/flow/module.go b/pkg/flow/module.go index ec97aab093d5..d67eefcb0508 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -128,12 +128,12 @@ func newModule(o *moduleOptions) *module { } // LoadConfig parses River config and loads it. -func (c *module) LoadConfig(config []byte, args map[string]any) error { +func (c *module) LoadConfig(config []byte, args map[string]any, parentModuleDefinitions map[string]string) error { ff, err := ParseSource(c.o.ID, config) if err != nil { return err } - return c.f.LoadSource(ff, args) + return c.f.LoadSource(ff, args, parentModuleDefinitions) } // Run starts the Module. No components within the Module diff --git a/pkg/flow/module_caching_test.go b/pkg/flow/module_caching_test.go index e22e0583cbda..bdfce702b6fb 100644 --- a/pkg/flow/module_caching_test.go +++ b/pkg/flow/module_caching_test.go @@ -60,7 +60,7 @@ func TestUpdates_EmptyModule(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -121,7 +121,7 @@ func TestUpdates_ThroughModule(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - err = ctrl.LoadSource(f, nil) + err = ctrl.LoadSource(f, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/flow/module_declare_test.go b/pkg/flow/module_declare_test.go new file mode 100644 index 000000000000..c471bf38beea --- /dev/null +++ b/pkg/flow/module_declare_test.go @@ -0,0 +1,227 @@ +package flow_test + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/internal/testcomponents" + "github.com/stretchr/testify/require" +) + +type testCase struct { + name string + config string + expected int +} + +func TestDeclareComponent(t *testing.T) { + tt := []testCase{ + { + name: "BasicDeclare", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "NestedDeclares", + config: ` + declare "test" { + argument "input" { + optional = false + } + + declare "nested" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + nested "default" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = nested.default.output + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "DeclaredInParentDepth1", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + rootDeclare "default" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = rootDeclare.default.output + } + } + declare "rootDeclare" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + { + name: "DeclaredInParentDepth2", + config: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + declare "anotherDeclare" { + argument "input" { + optional = false + } + rootDeclare "default" { + input = argument.input.value + } + export "output" { + value = rootDeclare.default.output + } + } + + anotherDeclare "myOtherDeclare" { + input = testcomponents.passthrough.pt.output + } + + export "output" { + value = anotherDeclare.myOtherDeclare.output + } + } + declare "rootDeclare" { + argument "input" { + optional = false + } + export "output" { + value = argument.input.value + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = test.myModule.output + } + `, + expected: 10, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == tc.expected + }, 3*time.Second, 10*time.Millisecond) + }) + } +} diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go index 28fb0923a892..5ff28f20c9f0 100644 --- a/pkg/flow/module_fail_test.go +++ b/pkg/flow/module_fail_test.go @@ -15,7 +15,7 @@ func TestIDRemovalIfFailedToLoad(t *testing.T) { fullContent := "test.fail.module \"t1\" { content = \"\" }" fl, err := ParseSource("test", []byte(fullContent)) require.NoError(t, err) - err = f.LoadSource(fl, nil) + err = f.LoadSource(fl, nil, nil) require.NoError(t, err) ctx := context.Background() ctx, cnc := context.WithTimeout(ctx, 600*time.Second) diff --git a/pkg/flow/module_import_test.go b/pkg/flow/module_import_test.go new file mode 100644 index 000000000000..87f182bbac7f --- /dev/null +++ b/pkg/flow/module_import_test.go @@ -0,0 +1,741 @@ +package flow_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow" + "github.com/grafana/agent/pkg/flow/internal/testcomponents" + "github.com/stretchr/testify/require" + + _ "github.com/grafana/agent/component/module/string" +) + +func TestImportModule(t *testing.T) { + const defaultModuleUpdate = ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = -10 + } + } +` + testCases := []struct { + name string + module string + otherModule string + config string + updateModule func(filename string) string + updateFile string + }{ + { + name: "TestImportModule", + module: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.output + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleNoArgs", + module: ` + declare "test" { + testcomponents.passthrough "pt" { + input = 10 + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "test" { + testcomponents.passthrough "pt" { + input = -10 + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + ` + }, + updateFile: "module", + }, + { + name: "TestImportModuleInDeclare", + module: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "output" { + value = testImport.test.myModule.output + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleInNestedDeclare", + module: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "yetAgainAnotherModule" { + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "output" { + value = testImport.test.myModule.output + } + } + anotherModule "myOtherModule" {} + + export "output" { + value = anotherModule.myOtherModule.output + } + } + + yetAgainAnotherModule "default" {} + + testcomponents.summation "sum" { + input = yetAgainAnotherModule.default.output + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "module", + }, + { + name: "TestImportModuleWithImportBlock", + module: ` + import.file "otherModule" { + filename = "other_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + otherModule.test "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = otherModule.test.default.output + } + } + `, + otherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return defaultModuleUpdate + }, + updateFile: "other_module", + }, + { + name: "TestImportModuleWithNestedDeclareUsingModule", + module: ` + import.file "default" { + filename = "other_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + declare "blabla" { + argument "input" {} + default.test "default" { + input = argument.input.value + } + + export "output" { + value = default.test.default.output + } + } + + blabla "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = blabla.default.output + } + } + `, + otherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "module" + } + + testImport.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.anotherModule.myOtherModule.output + } + `, + }, + { + name: "TestImportModuleWithNestedDeclareDependency", + module: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + + declare "test" { + argument "input" { + optional = false + } + + other_test "default" { + input = argument.input.value + } + + export "output" { + value = other_test.default.output + } + } + `, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "output" { + value = testImport.test.myModule.output + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "output" { + value = -10 + } + } + + declare "test" { + argument "input" { + optional = false + } + + other_test "default" { + input = argument.input.value + } + + export "output" { + value = other_test.default.output + } + } + ` + }, + updateFile: "module", + }, + { + name: "TestImportModuleWithMoreNesting", + module: ` + import.file "importOtherTest" { + filename = "other_module" + } + declare "test" { + argument "input" { + optional = false + } + + importOtherTest.other_test "default" { + input = argument.input.value + } + + export "output" { + value = importOtherTest.other_test.default.output + } + } + `, + otherModule: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "output" { + value = testImport.test.myModule.output + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "output" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + { + name: "TestImportModuleWithMoreNestingAndMoreNesting", + module: ` + import.file "importOtherTest" { + filename = "other_module" + } + declare "test" { + argument "input" { + optional = false + } + + declare "anotherOne" { + argument "input" { + optional = false + } + importOtherTest.other_test "default" { + input = argument.input.value + } + export "output" { + value = importOtherTest.other_test.default.output + } + } + + anotherOne "default" { + input = argument.input.value + } + + export "output" { + value = anotherOne.default.output + } + } + `, + otherModule: ` + declare "other_test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + export "output" { + value = testImport.test.myModule.output + } + } + + anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "other_test" { + argument "input" { + optional = false + } + export "output" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filename := "module" + require.NoError(t, os.WriteFile(filename, []byte(tc.module), 0664)) + defer os.Remove(filename) + + otherFilename := "other_module" + if tc.otherModule != "" { + require.NoError(t, os.WriteFile(otherFilename, []byte(tc.otherModule), 0664)) + defer os.Remove(otherFilename) + } + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == 10 + }, 3*time.Second, 10*time.Millisecond) + + // Update module if needed + if tc.updateModule != nil { + newModule := tc.updateModule(tc.updateFile) + require.NoError(t, os.WriteFile(tc.updateFile, []byte(newModule), 0664)) + + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + return export.LastAdded == -10 + }, 3*time.Second, 10*time.Millisecond) + } + }) + } +} + +func TestImportModuleError(t *testing.T) { + testCases := []struct { + name string + module string + otherModule string + config string + expectedError string + }{ + { + name: "TestImportedModuleTriesAccessingDeclareOnRoot", + module: ` + declare "test" { + argument "input" { + optional = false + } + + cantAccessThis "default" {} + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + }`, + config: ` + declare "cantAccessThis" { + export "output" { + value = -1 + } + } + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + import.file "testImport" { + filename = "module" + } + + testImport.test "myModule" { + input = testcomponents.count.inc.count + } + + testcomponents.summation "sum" { + input = testImport.test.myModule.output + } + `, + expectedError: `Unrecognized component name "cantAccessThis"`, + }, // TODO: add more tests + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filename := "module" + require.NoError(t, os.WriteFile(filename, []byte(tc.module), 0664)) + defer os.Remove(filename) + + otherFilename := "other_module" + if tc.otherModule != "" { + require.NoError(t, os.WriteFile(otherFilename, []byte(tc.otherModule), 0664)) + defer os.Remove(otherFilename) + } + + ctrl := flow.New(testOptions(t)) + f, err := flow.ParseSource(t.Name(), []byte(tc.config)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil, nil) + require.ErrorContains(t, err, tc.expectedError) + }) + } +} diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 4e4ddb9faaa8..2730d77d9932 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -144,7 +144,7 @@ func TestArgsNotInModules(t *testing.T) { defer cleanUpController(f) fl, err := ParseSource("test", []byte("argument \"arg\"{}")) require.NoError(t, err) - err = f.LoadSource(fl, nil) + err = f.LoadSource(fl, nil, nil) require.ErrorContains(t, err, "argument blocks only allowed inside a module") } @@ -154,7 +154,7 @@ func TestExportsNotInModules(t *testing.T) { defer cleanUpController(f) fl, err := ParseSource("test", []byte("export \"arg\"{ value = 1}")) require.NoError(t, err) - err = f.LoadSource(fl, nil) + err = f.LoadSource(fl, nil, nil) require.ErrorContains(t, err, "export blocks only allowed inside a module") } @@ -165,7 +165,7 @@ func TestExportsWhenNotUsed(t *testing.T) { fullContent := "test.module \"t1\" { content = \"" + content + "\" }" fl, err := ParseSource("test", []byte(fullContent)) require.NoError(t, err) - err = f.LoadSource(fl, nil) + err = f.LoadSource(fl, nil, nil) require.NoError(t, err) ctx := context.Background() ctx, cnc := context.WithTimeout(ctx, 1*time.Second) @@ -296,7 +296,7 @@ func (t *testModule) Run(ctx context.Context) error { return err } - err = m.LoadConfig([]byte(t.content), t.args) + err = m.LoadConfig([]byte(t.content), t.args, nil) if err != nil { return err } diff --git a/pkg/flow/source.go b/pkg/flow/source.go index acd7d2ce2f58..143ea8998f3e 100644 --- a/pkg/flow/source.go +++ b/pkg/flow/source.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/grafana/agent/pkg/config/encoder" + "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/river/ast" "github.com/grafana/river/diag" "github.com/grafana/river/parser" @@ -21,6 +22,7 @@ type Source struct { // The Flow controller can interpret them. components []*ast.BlockStmt configBlocks []*ast.BlockStmt + declares []controller.Declare } // ParseSource parses the River file specified by bb into a File. name should be @@ -45,6 +47,7 @@ func ParseSource(name string, bb []byte) (*Source, error) { var ( components []*ast.BlockStmt configs []*ast.BlockStmt + declares []controller.Declare ) for _, stmt := range node.Body { @@ -60,7 +63,9 @@ func ParseSource(name string, bb []byte) (*Source, error) { case *ast.BlockStmt: fullName := strings.Join(stmt.Name, ".") switch fullName { - case "logging", "tracing", "argument", "export": + case "declare": + declares = append(declares, controller.Declare{Block: stmt, Content: string(bb[stmt.LCurlyPos.Position().Offset+1 : stmt.RCurlyPos.Position().Offset-1])}) + case "logging", "tracing", "argument", "export", "import.file", "import.git", "import.http": configs = append(configs, stmt) default: components = append(components, stmt) @@ -79,6 +84,7 @@ func ParseSource(name string, bb []byte) (*Source, error) { return &Source{ components: components, configBlocks: configs, + declares: declares, sourceMap: map[string][]byte{name: bb}, hash: sha256.Sum256(bb), }, nil @@ -120,6 +126,7 @@ func ParseSources(sources map[string][]byte) (*Source, error) { mergedSource.components = append(mergedSource.components, sourceFragment.components...) mergedSource.configBlocks = append(mergedSource.configBlocks, sourceFragment.configBlocks...) + mergedSource.declares = append(mergedSource.declares, sourceFragment.declares...) } mergedSource.hash = [32]byte(hash.Sum(nil)) diff --git a/pkg/flow/source_test.go b/pkg/flow/source_test.go index fa79c8c1e9e1..840642ba203a 100644 --- a/pkg/flow/source_test.go +++ b/pkg/flow/source_test.go @@ -89,7 +89,7 @@ func TestParseSources_DuplicateComponent(t *testing.T) { require.NoError(t, err) ctrl := New(testOptions(t)) defer cleanUpController(ctrl) - err = ctrl.LoadSource(s, nil) + err = ctrl.LoadSource(s, nil, nil) diagErrs, ok := err.(diag.Diagnostics) require.True(t, ok) require.Len(t, diagErrs, 2) @@ -120,7 +120,7 @@ func TestParseSources_UniqueComponent(t *testing.T) { require.NoError(t, err) ctrl := New(testOptions(t)) defer cleanUpController(ctrl) - err = ctrl.LoadSource(s, nil) + err = ctrl.LoadSource(s, nil, nil) require.NoError(t, err) } diff --git a/component/module/git/internal/vcs/auth.go b/pkg/util/git/auth.go similarity index 100% rename from component/module/git/internal/vcs/auth.go rename to pkg/util/git/auth.go diff --git a/component/module/git/internal/vcs/errors.go b/pkg/util/git/errors.go similarity index 100% rename from component/module/git/internal/vcs/errors.go rename to pkg/util/git/errors.go diff --git a/component/module/git/internal/vcs/git.go b/pkg/util/git/git.go similarity index 100% rename from component/module/git/internal/vcs/git.go rename to pkg/util/git/git.go diff --git a/component/module/git/internal/vcs/git_test.go b/pkg/util/git/git_test.go similarity index 97% rename from component/module/git/internal/vcs/git_test.go rename to pkg/util/git/git_test.go index 7680c857db0e..e8c232885f5a 100644 --- a/component/module/git/internal/vcs/git_test.go +++ b/pkg/util/git/git_test.go @@ -6,7 +6,7 @@ import ( "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/config" - "github.com/grafana/agent/component/module/git/internal/vcs" + vcs "github.com/grafana/agent/pkg/util/git" "github.com/stretchr/testify/require" )