Skip to content

Commit

Permalink
Disable debug metrics for components inside foreach, and for foreach …
Browse files Browse the repository at this point in the history
…itself.
  • Loading branch information
ptodev committed Jan 15, 2025
1 parent a50a47d commit 6a2db08
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 42 deletions.
11 changes: 8 additions & 3 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,22 @@ func newController(o controllerOptions) *Runtime {
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
ControllerID: o.ControllerID,
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
reg := o.Reg
if opts.Reg != nil {
reg = opts.Reg
}

return newModuleController(&moduleControllerOptions{
ComponentRegistry: o.ComponentRegistry,
ModuleRegistry: o.ModuleRegistry,
Logger: log,
Tracer: tracer,
Reg: o.Reg,
Reg: reg,
DataPath: o.DataPath,
MinStability: o.MinStability,
EnableCommunityComps: o.EnableCommunityComps,
ID: id,
ID: opts.Id,
ServiceMap: serviceMap,
WorkerPool: workerPool,
})
Expand Down
80 changes: 71 additions & 9 deletions internal/runtime/foreach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime"
alloy_runtime "github.com/grafana/alloy/internal/runtime"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"
)
Expand All @@ -27,20 +31,42 @@ func TestForeach(t *testing.T) {
if tc.update != nil {
testConfigForEach(t, tc.main, tc.reloadConfig, func() {
require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
})
}, nil, nil)
} else {
testConfigForEach(t, tc.main, tc.reloadConfig, nil)
testConfigForEach(t, tc.main, tc.reloadConfig, nil, nil, nil)
}
})
}
}

func TestForeachMetrics(t *testing.T) {
directory := "./testdata/foreach_metrics"
for _, file := range getTestFiles(directory, t) {
tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
if tc.module != "" {
defer os.Remove("module.alloy")
require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
}
if tc.update != nil {
testConfigForEach(t, tc.main, tc.reloadConfig, func() {
require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
}, tc.expectedMetrics, tc.expectedDurationMetrics)
} else {
testConfigForEach(t, tc.main, tc.reloadConfig, nil, tc.expectedMetrics, tc.expectedDurationMetrics)
}
})
}
}

type testForEachFile struct {
description string // description at the top of the txtar file
main string // root config that the controller should load
module string // module imported by the root config
reloadConfig string // root config that the controller should apply on reload
update *updateFile // update can be used to update the content of a file at runtime
description string // description at the top of the txtar file
main string // root config that the controller should load
module string // module imported by the root config
reloadConfig string // root config that the controller should apply on reload
update *updateFile // update can be used to update the content of a file at runtime
expectedMetrics *string // expected prometheus metrics
expectedDurationMetrics *int // expected prometheus duration metrics - check those separately as they vary with each test run
}

func buildTestForEach(t *testing.T, filename string) testForEachFile {
Expand All @@ -62,14 +88,22 @@ func buildTestForEach(t *testing.T, filename string) testForEachFile {
}
case "reload_config.alloy":
tc.reloadConfig = string(alloyConfig.Data)
case "expected_metrics.prom":
expectedMetrics := string(alloyConfig.Data)
tc.expectedMetrics = &expectedMetrics
case "expected_duration_metrics.prom":
expectedDurationMetrics, err := strconv.Atoi(strings.TrimSpace(string((alloyConfig.Data))))
require.NoError(t, err)
tc.expectedDurationMetrics = &expectedDurationMetrics
}
}
return tc
}

func testConfigForEach(t *testing.T, config string, reloadConfig string, update func()) {
func testConfigForEach(t *testing.T, config string, reloadConfig string, update func(), expectedMetrics *string, expectedDurationMetrics *int) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
reg := prometheus.NewRegistry()
ctrl, f := setup(t, config, reg)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand All @@ -92,6 +126,34 @@ func testConfigForEach(t *testing.T, config string, reloadConfig string, update
return sum >= 10
}, 3*time.Second, 10*time.Millisecond)

if expectedDurationMetrics != nil {
// These metrics have different values in each run.
// Hence, we can't compare their values from run to run.
// But we can check if the metric exists as a whole, which is good enough.
metricsToCheck := []string{
"alloy_component_dependencies_wait_seconds",
"alloy_component_evaluation_seconds",
}

countedMetrics, err := testutil.GatherAndCount(reg, metricsToCheck...)
require.NoError(t, err)
require.Equal(t, *expectedDurationMetrics, countedMetrics)
}

if expectedMetrics != nil {
// These metrics have fixed values.
// Hence, we can compare their values from run to run.
metricsToCheck := []string{
"alloy_component_controller_evaluating",
"alloy_component_controller_running_components",
"alloy_component_evaluation_queue_size",
"pulse_count",
}

err := testutil.GatherAndCompare(reg, strings.NewReader(*expectedMetrics), metricsToCheck...)
require.NoError(t, err)
}

if update != nil {
update()

Expand Down
10 changes: 5 additions & 5 deletions internal/runtime/import_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
ctrl, f := setup(t, main, nil)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -122,7 +122,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
ctrl, f := setup(t, main, nil)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,7 +205,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test2\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
ctrl, f := setup(t, main, nil)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -269,7 +269,7 @@ testImport.add "cc" {

defer verifyNoGoroutineLeaks(t)

ctrl, f := setup(t, main)
ctrl, f := setup(t, main, nil)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -354,7 +354,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
ctrl, f := setup(t, main, nil)
err = ctrl.LoadSource(f, nil, "")
expectedErr := vcs.InvalidRevisionError{
Revision: "nonexistent",
Expand Down
9 changes: 5 additions & 4 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"

Expand Down Expand Up @@ -298,7 +299,7 @@ func TestImportError(t *testing.T) {

func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
ctrl, f := setup(t, config, nil)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
Expand Down Expand Up @@ -351,7 +352,7 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
ctrl, f := setup(t, config, nil)
err := ctrl.LoadSource(f, nil, "")
require.ErrorContains(t, err, expectedError)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -368,14 +369,14 @@ func testConfigError(t *testing.T, config string, expectedError string) {
}()
}

func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
func setup(t *testing.T, config string, reg prometheus.Registerer) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
s, err := logging.New(os.Stderr, logging.DefaultOptions)
require.NoError(t, err)
ctrl := alloy_runtime.New(alloy_runtime.Options{
Logger: s,
DataPath: t.TempDir(),
MinStability: featuregate.StabilityPublicPreview,
Reg: nil,
Reg: reg,
Services: []service.Service{},
})
f, err := alloy_runtime.ParseSource(t.Name(), []byte(config))
Expand Down
6 changes: 3 additions & 3 deletions internal/runtime/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestLoader(t *testing.T) {
MinStability: stability,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return nil
},
},
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestLoader_Services(t *testing.T) {
MinStability: stability,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return nil
},
},
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
MinStability: featuregate.StabilityPublicPreview,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) controller.ModuleController {
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return fakeModuleController{}
},
},
Expand Down
29 changes: 17 additions & 12 deletions internal/runtime/internal/controller/node_builtin_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,25 @@ func (id ComponentID) Equals(other ComponentID) bool {
// DialFunc is a function to establish a network connection.
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)

type ModuleControllerOpts struct {
Id string
Reg prometheus.Registerer
}

// ComponentGlobals are used by BuiltinComponentNodes to build managed components. All
// BuiltinComponentNodes should use the same ComponentGlobals.
type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
MinStability featuregate.Stability // Minimum allowed stability level for features
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics
ControllerID string // ID of controller.
NewModuleController func(id string) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
EnableCommunityComps bool // Enables the use of community components.
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
DataPath string // Shared directory where component data may be stored
MinStability featuregate.Stability // Minimum allowed stability level for features
OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics
ControllerID string // ID of controller.
NewModuleController func(opts ModuleControllerOpts) ModuleController // Func to generate a module controller.
GetServiceData func(name string) (interface{}, error) // Get data for a service.
EnableCommunityComps bool // Enables the use of community components.
}

// BuiltinComponentNode is a controller node which manages a builtin component.
Expand Down Expand Up @@ -146,7 +151,7 @@ func NewBuiltinComponentNode(globals ComponentGlobals, reg component.Registratio
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}),
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,

block: b,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestGlobalID(t *testing.T) {
DataPath: "/data/",
MinStability: featuregate.StabilityPublicPreview,
ControllerID: "module.file",
NewModuleController: func(id string) ModuleController {
NewModuleController: func(opts ModuleControllerOpts) ModuleController {
return nil
},
}, &BuiltinComponentNode{
Expand All @@ -28,7 +28,7 @@ func TestLocalID(t *testing.T) {
DataPath: "/data/",
MinStability: featuregate.StabilityPublicPreview,
ControllerID: "",
NewModuleController: func(id string) ModuleController {
NewModuleController: func(opts ModuleControllerOpts) ModuleController {
return nil
},
}, &BuiltinComponentNode{
Expand Down
Loading

0 comments on commit 6a2db08

Please sign in to comment.