From 6a2db080549a987d4338a25cb68243d45519bc32 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 15 Jan 2025 10:36:59 +0000 Subject: [PATCH] Disable debug metrics for components inside foreach, and for foreach itself. --- internal/runtime/alloy.go | 11 ++- internal/runtime/foreach_test.go | 80 ++++++++++++++++--- internal/runtime/import_git_test.go | 10 +-- internal/runtime/import_test.go | 9 ++- .../internal/controller/loader_test.go | 6 +- .../controller/node_builtin_component.go | 29 ++++--- .../controller/node_builtin_component_test.go | 4 +- .../controller/node_config_foreach.go | 41 +++++++++- .../controller/node_config_foreach_test.go | 2 +- .../controller/node_custom_component.go | 2 +- .../runtime/internal/testcomponents/pulse.go | 16 +++- .../testdata/foreach_metrics/foreach_1.txtar | 36 +++++++++ .../testdata/foreach_metrics/foreach_2.txtar | 43 ++++++++++ .../testdata/foreach_metrics/foreach_3.txtar | 46 +++++++++++ 14 files changed, 293 insertions(+), 42 deletions(-) create mode 100644 internal/runtime/testdata/foreach_metrics/foreach_1.txtar create mode 100644 internal/runtime/testdata/foreach_metrics/foreach_2.txtar create mode 100644 internal/runtime/testdata/foreach_metrics/foreach_3.txtar diff --git a/internal/runtime/alloy.go b/internal/runtime/alloy.go index d8de9cfffa..4250b74717 100644 --- a/internal/runtime/alloy.go +++ b/internal/runtime/alloy.go @@ -207,17 +207,22 @@ func newController(o controllerOptions) *Runtime { OnExportsChange: o.OnExportsChange, Registerer: o.Reg, ControllerID: o.ControllerID, - NewModuleController: func(id string) controller.ModuleController { + NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController { + reg := o.Reg + if opts.Reg != nil { + reg = opts.Reg + } + return newModuleController(&moduleControllerOptions{ ComponentRegistry: o.ComponentRegistry, ModuleRegistry: o.ModuleRegistry, Logger: log, Tracer: tracer, - Reg: o.Reg, + Reg: reg, DataPath: o.DataPath, MinStability: o.MinStability, EnableCommunityComps: o.EnableCommunityComps, - ID: id, + ID: opts.Id, ServiceMap: serviceMap, WorkerPool: workerPool, }) diff --git a/internal/runtime/foreach_test.go b/internal/runtime/foreach_test.go index 4335afa793..b5a76c18b4 100644 --- a/internal/runtime/foreach_test.go +++ b/internal/runtime/foreach_test.go @@ -4,6 +4,8 @@ import ( "context" "os" "path/filepath" + "strconv" + "strings" "sync" "testing" "time" @@ -11,6 +13,8 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime" alloy_runtime "github.com/grafana/alloy/internal/runtime" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "golang.org/x/tools/txtar" ) @@ -27,20 +31,42 @@ func TestForeach(t *testing.T) { if tc.update != nil { testConfigForEach(t, tc.main, tc.reloadConfig, func() { require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664)) - }) + }, nil, nil) } else { - testConfigForEach(t, tc.main, tc.reloadConfig, nil) + testConfigForEach(t, tc.main, tc.reloadConfig, nil, nil, nil) + } + }) + } +} + +func TestForeachMetrics(t *testing.T) { + directory := "./testdata/foreach_metrics" + for _, file := range getTestFiles(directory, t) { + tc := buildTestForEach(t, filepath.Join(directory, file.Name())) + t.Run(tc.description, func(t *testing.T) { + if tc.module != "" { + defer os.Remove("module.alloy") + require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664)) + } + if tc.update != nil { + testConfigForEach(t, tc.main, tc.reloadConfig, func() { + require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664)) + }, tc.expectedMetrics, tc.expectedDurationMetrics) + } else { + testConfigForEach(t, tc.main, tc.reloadConfig, nil, tc.expectedMetrics, tc.expectedDurationMetrics) } }) } } type testForEachFile struct { - description string // description at the top of the txtar file - main string // root config that the controller should load - module string // module imported by the root config - reloadConfig string // root config that the controller should apply on reload - update *updateFile // update can be used to update the content of a file at runtime + description string // description at the top of the txtar file + main string // root config that the controller should load + module string // module imported by the root config + reloadConfig string // root config that the controller should apply on reload + update *updateFile // update can be used to update the content of a file at runtime + expectedMetrics *string // expected prometheus metrics + expectedDurationMetrics *int // expected prometheus duration metrics - check those separately as they vary with each test run } func buildTestForEach(t *testing.T, filename string) testForEachFile { @@ -62,14 +88,22 @@ func buildTestForEach(t *testing.T, filename string) testForEachFile { } case "reload_config.alloy": tc.reloadConfig = string(alloyConfig.Data) + case "expected_metrics.prom": + expectedMetrics := string(alloyConfig.Data) + tc.expectedMetrics = &expectedMetrics + case "expected_duration_metrics.prom": + expectedDurationMetrics, err := strconv.Atoi(strings.TrimSpace(string((alloyConfig.Data)))) + require.NoError(t, err) + tc.expectedDurationMetrics = &expectedDurationMetrics } } return tc } -func testConfigForEach(t *testing.T, config string, reloadConfig string, update func()) { +func testConfigForEach(t *testing.T, config string, reloadConfig string, update func(), expectedMetrics *string, expectedDurationMetrics *int) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + reg := prometheus.NewRegistry() + ctrl, f := setup(t, config, reg) err := ctrl.LoadSource(f, nil, "") require.NoError(t, err) @@ -92,6 +126,34 @@ func testConfigForEach(t *testing.T, config string, reloadConfig string, update return sum >= 10 }, 3*time.Second, 10*time.Millisecond) + if expectedDurationMetrics != nil { + // These metrics have different values in each run. + // Hence, we can't compare their values from run to run. + // But we can check if the metric exists as a whole, which is good enough. + metricsToCheck := []string{ + "alloy_component_dependencies_wait_seconds", + "alloy_component_evaluation_seconds", + } + + countedMetrics, err := testutil.GatherAndCount(reg, metricsToCheck...) + require.NoError(t, err) + require.Equal(t, *expectedDurationMetrics, countedMetrics) + } + + if expectedMetrics != nil { + // These metrics have fixed values. + // Hence, we can compare their values from run to run. + metricsToCheck := []string{ + "alloy_component_controller_evaluating", + "alloy_component_controller_running_components", + "alloy_component_evaluation_queue_size", + "pulse_count", + } + + err := testutil.GatherAndCompare(reg, strings.NewReader(*expectedMetrics), metricsToCheck...) + require.NoError(t, err) + } + if update != nil { update() diff --git a/internal/runtime/import_git_test.go b/internal/runtime/import_git_test.go index 393e75faa1..4a7ff50d87 100644 --- a/internal/runtime/import_git_test.go +++ b/internal/runtime/import_git_test.go @@ -55,7 +55,7 @@ testImport.add "cc" { runGit(t, testRepo, "commit", "-m \"test\"") defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, main) + ctrl, f := setup(t, main, nil) err = ctrl.LoadSource(f, nil, "") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -122,7 +122,7 @@ testImport.add "cc" { runGit(t, testRepo, "commit", "-m \"test\"") defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, main) + ctrl, f := setup(t, main, nil) err = ctrl.LoadSource(f, nil, "") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -205,7 +205,7 @@ testImport.add "cc" { runGit(t, testRepo, "commit", "-m \"test2\"") defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, main) + ctrl, f := setup(t, main, nil) err = ctrl.LoadSource(f, nil, "") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -269,7 +269,7 @@ testImport.add "cc" { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, main) + ctrl, f := setup(t, main, nil) err = ctrl.LoadSource(f, nil, "") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -354,7 +354,7 @@ testImport.add "cc" { runGit(t, testRepo, "commit", "-m \"test\"") defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, main) + ctrl, f := setup(t, main, nil) err = ctrl.LoadSource(f, nil, "") expectedErr := vcs.InvalidRevisionError{ Revision: "nonexistent", diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go index f6a1ab499b..5cde86c0c6 100644 --- a/internal/runtime/import_test.go +++ b/internal/runtime/import_test.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "golang.org/x/tools/txtar" @@ -298,7 +299,7 @@ func TestImportError(t *testing.T) { func testConfig(t *testing.T, config string, reloadConfig string, update func()) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + ctrl, f := setup(t, config, nil) err := ctrl.LoadSource(f, nil, "") require.NoError(t, err) @@ -351,7 +352,7 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func()) func testConfigError(t *testing.T, config string, expectedError string) { defer verifyNoGoroutineLeaks(t) - ctrl, f := setup(t, config) + ctrl, f := setup(t, config, nil) err := ctrl.LoadSource(f, nil, "") require.ErrorContains(t, err, expectedError) ctx, cancel := context.WithCancel(context.Background()) @@ -368,14 +369,14 @@ func testConfigError(t *testing.T, config string, expectedError string) { }() } -func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) { +func setup(t *testing.T, config string, reg prometheus.Registerer) (*alloy_runtime.Runtime, *alloy_runtime.Source) { s, err := logging.New(os.Stderr, logging.DefaultOptions) require.NoError(t, err) ctrl := alloy_runtime.New(alloy_runtime.Options{ Logger: s, DataPath: t.TempDir(), MinStability: featuregate.StabilityPublicPreview, - Reg: nil, + Reg: reg, Services: []service.Service{}, }) f, err := alloy_runtime.ParseSource(t.Name(), []byte(config)) diff --git a/internal/runtime/internal/controller/loader_test.go b/internal/runtime/internal/controller/loader_test.go index 2c251d813b..98e1849f36 100644 --- a/internal/runtime/internal/controller/loader_test.go +++ b/internal/runtime/internal/controller/loader_test.go @@ -83,7 +83,7 @@ func TestLoader(t *testing.T) { MinStability: stability, OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) controller.ModuleController { + NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController { return nil }, }, @@ -382,7 +382,7 @@ func TestLoader_Services(t *testing.T) { MinStability: stability, OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) controller.ModuleController { + NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController { return nil }, }, @@ -439,7 +439,7 @@ func TestScopeWithFailingComponent(t *testing.T) { MinStability: featuregate.StabilityPublicPreview, OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) controller.ModuleController { + NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController { return fakeModuleController{} }, }, diff --git a/internal/runtime/internal/controller/node_builtin_component.go b/internal/runtime/internal/controller/node_builtin_component.go index 43e4893a2f..e2123df888 100644 --- a/internal/runtime/internal/controller/node_builtin_component.go +++ b/internal/runtime/internal/controller/node_builtin_component.go @@ -60,20 +60,25 @@ func (id ComponentID) Equals(other ComponentID) bool { // DialFunc is a function to establish a network connection. type DialFunc func(ctx context.Context, network, address string) (net.Conn, error) +type ModuleControllerOpts struct { + Id string + Reg prometheus.Registerer +} + // ComponentGlobals are used by BuiltinComponentNodes to build managed components. All // BuiltinComponentNodes should use the same ComponentGlobals. type ComponentGlobals struct { - Logger *logging.Logger // Logger shared between all managed components. - TraceProvider trace.TracerProvider // Tracer shared between all managed components. - DataPath string // Shared directory where component data may be stored - MinStability featuregate.Stability // Minimum allowed stability level for features - OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate - OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports - Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics - ControllerID string // ID of controller. - NewModuleController func(id string) ModuleController // Func to generate a module controller. - GetServiceData func(name string) (interface{}, error) // Get data for a service. - EnableCommunityComps bool // Enables the use of community components. + Logger *logging.Logger // Logger shared between all managed components. + TraceProvider trace.TracerProvider // Tracer shared between all managed components. + DataPath string // Shared directory where component data may be stored + MinStability featuregate.Stability // Minimum allowed stability level for features + OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate + OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports + Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics + ControllerID string // ID of controller. + NewModuleController func(opts ModuleControllerOpts) ModuleController // Func to generate a module controller. + GetServiceData func(name string) (interface{}, error) // Get data for a service. + EnableCommunityComps bool // Enables the use of community components. } // BuiltinComponentNode is a controller node which manages a builtin component. @@ -146,7 +151,7 @@ func NewBuiltinComponentNode(globals ComponentGlobals, reg component.Registratio componentName: strings.Join(b.Name, "."), reg: reg, exportsType: getExportsType(reg), - moduleController: globals.NewModuleController(globalID), + moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}), OnBlockNodeUpdate: globals.OnBlockNodeUpdate, block: b, diff --git a/internal/runtime/internal/controller/node_builtin_component_test.go b/internal/runtime/internal/controller/node_builtin_component_test.go index c38ee0aac5..d1d9ed3ed4 100644 --- a/internal/runtime/internal/controller/node_builtin_component_test.go +++ b/internal/runtime/internal/controller/node_builtin_component_test.go @@ -13,7 +13,7 @@ func TestGlobalID(t *testing.T) { DataPath: "/data/", MinStability: featuregate.StabilityPublicPreview, ControllerID: "module.file", - NewModuleController: func(id string) ModuleController { + NewModuleController: func(opts ModuleControllerOpts) ModuleController { return nil }, }, &BuiltinComponentNode{ @@ -28,7 +28,7 @@ func TestLocalID(t *testing.T) { DataPath: "/data/", MinStability: featuregate.StabilityPublicPreview, ControllerID: "", - NewModuleController: func(id string) ModuleController { + NewModuleController: func(opts ModuleControllerOpts) ModuleController { return nil }, }, &BuiltinComponentNode{ diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go index 0400c2c408..1a85074d65 100644 --- a/internal/runtime/internal/controller/node_config_foreach.go +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" + "github.com/prometheus/client_golang/prometheus" ) const templateType = "template" @@ -48,6 +49,9 @@ type ForeachConfigNode struct { block *ast.BlockStmt args Arguments + moduleControllerFactory func(opts ModuleControllerOpts) ModuleController + moduleControllerOpts ModuleControllerOpts + healthMut sync.RWMutex evalHealth component.Health // Health of the last evaluate runHealth component.Health // Health of running the component @@ -69,7 +73,8 @@ func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals, custom componentName: block.GetBlockName(), id: BlockComponentID(block), logger: log.With(globals.Logger, "component_path", globals.ControllerID, "component_id", nodeID), - moduleController: globals.NewModuleController(globalID), + moduleControllerFactory: globals.NewModuleController, + moduleControllerOpts: ModuleControllerOpts{Id: globalID}, customReg: customReg, forEachChildrenUpdateChan: make(chan struct{}, 1), customComponents: make(map[string]CustomComponent, 0), @@ -94,6 +99,8 @@ func (fn *ForeachConfigNode) Arguments() component.Arguments { } func (fn *ForeachConfigNode) ModuleIDs() []string { + fn.mut.RLock() + defer fn.mut.RUnlock() return fn.moduleController.ModuleIDs() } @@ -113,6 +120,11 @@ func (fn *ForeachConfigNode) ID() ComponentID { type Arguments struct { Collection []any `alloy:"collection,attr"` Var string `alloy:"var,attr"` + + // enable_metrics should be false by default. + // That way users are protected from an explosion of debug metrics + // if there are many items inside "collection". + EnableMetrics bool `alloy:"enable_metrics,attr,optional"` } func (fn *ForeachConfigNode) Evaluate(evalScope *vm.Scope) error { @@ -156,6 +168,14 @@ func (fn *ForeachConfigNode) evaluate(scope *vm.Scope) error { fn.args = args + // By default don't show debug metrics. + if args.EnableMetrics { + fn.moduleControllerOpts.Reg = nil + } else { + fn.moduleControllerOpts.Reg = NoopRegistry{} + } + fn.moduleController = fn.moduleControllerFactory(fn.moduleControllerOpts) + // Loop through the items to create the custom components. // On re-evaluation new components are added and existing ones are updated. newCustomComponentIds := make(map[string]bool, len(args.Collection)) @@ -204,6 +224,8 @@ func (fn *ForeachConfigNode) evaluate(scope *vm.Scope) error { return nil } +// Assumes that a lock is held, +// so that fn.moduleController doesn't change while the function is running. func (fn *ForeachConfigNode) getOrCreateCustomComponent(customComponentID string) (CustomComponent, error) { cc, exists := fn.customComponents[customComponentID] if exists { @@ -366,3 +388,20 @@ func hashObject(obj any) string { return computeHash(fmt.Sprintf("%#v", v)) } } + +type NoopRegistry struct{} + +var _ prometheus.Registerer = NoopRegistry{} + +// MustRegister implements prometheus.Registerer. +func (n NoopRegistry) MustRegister(...prometheus.Collector) {} + +// Register implements prometheus.Registerer. +func (n NoopRegistry) Register(prometheus.Collector) error { + return nil +} + +// Unregister implements prometheus.Registerer. +func (n NoopRegistry) Unregister(prometheus.Collector) bool { + return true +} diff --git a/internal/runtime/internal/controller/node_config_foreach_test.go b/internal/runtime/internal/controller/node_config_foreach_test.go index ff8b10a6bc..9b24c24271 100644 --- a/internal/runtime/internal/controller/node_config_foreach_test.go +++ b/internal/runtime/internal/controller/node_config_foreach_test.go @@ -241,7 +241,7 @@ func getComponentGlobals(t *testing.T) ComponentGlobals { MinStability: featuregate.StabilityGenerallyAvailable, OnBlockNodeUpdate: func(cn BlockNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) ModuleController { + NewModuleController: func(opts ModuleControllerOpts) ModuleController { return NewModuleControllerMock() }, } diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go index 52fff29bc0..df98803857 100644 --- a/internal/runtime/internal/controller/node_custom_component.go +++ b/internal/runtime/internal/controller/node_custom_component.go @@ -114,7 +114,7 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi componentName: componentName, importNamespace: importNamespace, customComponentName: customComponentName, - moduleController: globals.NewModuleController(globalID), + moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}), OnBlockNodeUpdate: globals.OnBlockNodeUpdate, logger: log.With(globals.Logger, "component_path", parent, "component_id", node), getConfig: getConfig, diff --git a/internal/runtime/internal/testcomponents/pulse.go b/internal/runtime/internal/testcomponents/pulse.go index 1fab1af057..20876a2761 100644 --- a/internal/runtime/internal/testcomponents/pulse.go +++ b/internal/runtime/internal/testcomponents/pulse.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/prometheus/client_golang/prometheus" ) // testcomponents.pulse sends the value 1 at the defined frequency for a number of times defined by the max argument. @@ -38,10 +39,22 @@ type Pulse struct { cfgMut sync.Mutex cfg PulseConfig count int + + pulseCount prometheus.Counter } func NewPulse(o component.Options, cfg PulseConfig) (*Pulse, error) { - t := &Pulse{opts: o, log: o.Logger} + t := &Pulse{ + opts: o, + log: o.Logger, + pulseCount: prometheus.NewCounter(prometheus.CounterOpts{Name: "pulse_count"}), + } + + err := o.Registerer.Register(t.pulseCount) + if err != nil { + return nil, err + } + if err := t.Update(cfg); err != nil { return nil, err } @@ -63,6 +76,7 @@ func (p *Pulse) Run(ctx context.Context) error { for _, r := range p.cfg.ForwardTo { r.ReceiveInt(1) } + p.pulseCount.Inc() p.count++ } p.cfgMut.Unlock() diff --git a/internal/runtime/testdata/foreach_metrics/foreach_1.txtar b/internal/runtime/testdata/foreach_metrics/foreach_1.txtar new file mode 100644 index 0000000000..67bd58b01d --- /dev/null +++ b/internal/runtime/testdata/foreach_metrics/foreach_1.txtar @@ -0,0 +1,36 @@ +Test to make sure debug metrics are disabled by default for foreach. +Use a foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10. + +-- main.alloy -- +foreach "testForeach" { + collection = [10] + var = "num" + + template { + testcomponents.pulse "pt" { + max = num + frequency = "10ms" + forward_to = [testcomponents.summation_receiver.sum.receiver] + } + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation_receiver "sum" { +} + +-- expected_metrics.prom -- + +# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation +# TYPE alloy_component_controller_evaluating gauge +alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0 +# HELP alloy_component_controller_running_components Total number of running components. +# TYPE alloy_component_controller_running_components gauge +alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2 +# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool +# TYPE alloy_component_evaluation_queue_size gauge +alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1 + +-- expected_duration_metrics.prom -- + +2 \ No newline at end of file diff --git a/internal/runtime/testdata/foreach_metrics/foreach_2.txtar b/internal/runtime/testdata/foreach_metrics/foreach_2.txtar new file mode 100644 index 0000000000..b8894cc32c --- /dev/null +++ b/internal/runtime/testdata/foreach_metrics/foreach_2.txtar @@ -0,0 +1,43 @@ +Test to make sure debug metrics for foreach are shown when they are enabled explicitly. +Foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10. + +-- main.alloy -- +foreach "testForeach" { + collection = [10] + var = "num" + enable_metrics = true + + template { + testcomponents.pulse "pt" { + max = num + frequency = "10ms" + forward_to = [testcomponents.summation_receiver.sum.receiver] + } + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation_receiver "sum" { +} + +-- expected_metrics.prom -- + +# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation +# TYPE alloy_component_controller_evaluating gauge +alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0 +alloy_component_controller_evaluating{controller_id="foreach_10_1",controller_path="/foreach.testForeach"} 0 +# HELP alloy_component_controller_running_components Total number of running components. +# TYPE alloy_component_controller_running_components gauge +alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2 +alloy_component_controller_running_components{controller_id="foreach_10_1",controller_path="/foreach.testForeach",health_type="healthy"} 1 +# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool +# TYPE alloy_component_evaluation_queue_size gauge +alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1 +alloy_component_evaluation_queue_size{controller_id="foreach_10_1",controller_path="/foreach.testForeach"} 0 +# HELP pulse_count +# TYPE pulse_count counter +pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_10_1"} 10 + +-- expected_duration_metrics.prom -- + +4 \ No newline at end of file diff --git a/internal/runtime/testdata/foreach_metrics/foreach_3.txtar b/internal/runtime/testdata/foreach_metrics/foreach_3.txtar new file mode 100644 index 0000000000..06809a3dd7 --- /dev/null +++ b/internal/runtime/testdata/foreach_metrics/foreach_3.txtar @@ -0,0 +1,46 @@ +This test uses two different items in the collection. + +-- main.alloy -- +foreach "testForeach" { + collection = [6,4] + var = "num" + enable_metrics = true + + template { + testcomponents.pulse "pt" { + max = num + frequency = "10ms" + forward_to = [testcomponents.summation_receiver.sum.receiver] + } + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation_receiver "sum" { +} + +-- expected_metrics.prom -- + +# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation +# TYPE alloy_component_controller_evaluating gauge +alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0 +alloy_component_controller_evaluating{controller_id="foreach_4_1",controller_path="/foreach.testForeach"} 0 +alloy_component_controller_evaluating{controller_id="foreach_6_1",controller_path="/foreach.testForeach"} 0 +# HELP alloy_component_controller_running_components Total number of running components. +# TYPE alloy_component_controller_running_components gauge +alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2 +alloy_component_controller_running_components{controller_id="foreach_4_1",controller_path="/foreach.testForeach",health_type="healthy"} 1 +alloy_component_controller_running_components{controller_id="foreach_6_1",controller_path="/foreach.testForeach",health_type="healthy"} 1 +# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool +# TYPE alloy_component_evaluation_queue_size gauge +alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1 +alloy_component_evaluation_queue_size{controller_id="foreach_4_1",controller_path="/foreach.testForeach"} 0 +alloy_component_evaluation_queue_size{controller_id="foreach_6_1",controller_path="/foreach.testForeach"} 0 +# HELP pulse_count +# TYPE pulse_count counter +pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_4_1"} 4 +pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_6_1"} 6 + +-- expected_duration_metrics.prom -- + +6 \ No newline at end of file