diff --git a/CHANGELOG.md b/CHANGELOG.md index 19ed22f15989..4f0fd0abc446 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,9 @@ Main (unreleased) - Fixed an issue where `loki.process` validation for stage `metric.counter` was allowing invalid combination of configuration options. (@thampiotr) + +- Fixed issue where adding a module after initial start, that failed to load then subsequently resolving the issue would cause the module to + permanently fail to load with `id already exists` error. (@mattdurham) ### Enhancements diff --git a/component/module/module.go b/component/module/module.go index 79d95a002826..53e886721e65 100644 --- a/component/module/module.go +++ b/component/module/module.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" ) @@ -71,7 +72,10 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin // RunFlowController runs the flow controller that all module components start. func (c *ModuleComponent) RunFlowController(ctx context.Context) { - c.mod.Run(ctx) + err := c.mod.Run(ctx) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "error running module", "id", c.opts.ID, "err", err) + } } // CurrentHealth contains the implementation details for CurrentHealth in a module component. diff --git a/component/registry.go b/component/registry.go index f544bf8287f7..20a364be81c4 100644 --- a/component/registry.go +++ b/component/registry.go @@ -49,7 +49,7 @@ type Module interface { // // Run blocks until the provided context is canceled. The ID of a module as defined in // ModuleController.NewModule will not be released until Run returns. - Run(context.Context) + Run(context.Context) error } // ExportFunc is used for onExport of the Module diff --git a/pkg/flow/componenttest/testfailmodule.go b/pkg/flow/componenttest/testfailmodule.go new file mode 100644 index 000000000000..011659f95564 --- /dev/null +++ b/pkg/flow/componenttest/testfailmodule.go @@ -0,0 +1,67 @@ +package componenttest + +import ( + "context" + "fmt" + + "github.com/grafana/agent/component" + mod "github.com/grafana/agent/component/module" +) + +func init() { + component.Register(component.Registration{ + Name: "test.fail.module", + Args: TestFailArguments{}, + Exports: mod.Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + m, err := mod.NewModuleComponent(opts) + if err != nil { + return nil, err + } + if args.(TestFailArguments).Fail { + return nil, fmt.Errorf("module told to fail") + } + err = m.LoadFlowSource(nil, args.(TestFailArguments).Content) + if err != nil { + return nil, err + } + return &TestFailModule{ + mc: m, + content: args.(TestFailArguments).Content, + opts: opts, + fail: args.(TestFailArguments).Fail, + ch: make(chan error), + }, nil + }, + }) +} + +type TestFailArguments struct { + Content string `river:"content,attr"` + Fail bool `river:"fail,attr,optional"` +} + +type TestFailModule struct { + content string + opts component.Options + ch chan error + mc *mod.ModuleComponent + fail bool +} + +func (t *TestFailModule) Run(ctx context.Context) error { + go t.mc.RunFlowController(ctx) + <-ctx.Done() + return nil +} + +func (t *TestFailModule) UpdateContent(content string) error { + t.content = content + err := t.mc.LoadFlowSource(nil, t.content) + return err +} + +func (t *TestFailModule) Update(_ component.Arguments) error { + return nil +} diff --git a/pkg/flow/flow_services_test.go b/pkg/flow/flow_services_test.go index 128b1267e180..7639daea8144 100644 --- a/pkg/flow/flow_services_test.go +++ b/pkg/flow/flow_services_test.go @@ -183,7 +183,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - componentBuilt := util.NewWaitTrigger() + componentRunning := util.NewWaitTrigger() var ( svc = &testservices.Fake{ @@ -218,9 +218,14 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { Name: "service_consumer", Args: struct{}{}, NeedsServices: []string{"service"}, - Build: func(_ component.Options, _ component.Arguments) (component.Component, error) { - componentBuilt.Trigger() - return &testcomponents.Fake{}, nil + Build: func(o component.Options, _ component.Arguments) (component.Component, error) { + return &testcomponents.Fake{ + RunFunc: func(ctx context.Context) error { + componentRunning.Trigger() + <-ctx.Done() + return nil + }, + }, nil }, }, } @@ -243,7 +248,7 @@ func TestFlow_GetServiceConsumers_Modules(t *testing.T) { require.NoError(t, ctrl.LoadSource(f, nil)) go ctrl.Run(ctx) - require.NoError(t, componentBuilt.Wait(5*time.Second), "Component should have been built") + require.NoError(t, componentRunning.Wait(5*time.Second), "Component should have been built") consumers := ctrl.GetServiceConsumers("service") require.Len(t, consumers, 2, "There should be a consumer for the module loader and the module's component") diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 2351218d642b..c6b491f4c899 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/flow/logging" @@ -209,7 +210,7 @@ func TestScopeWithFailingComponent(t *testing.T) { OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), NewModuleController: func(id string, availableServices []string) controller.ModuleController { - return nil + return fakeModuleController{} }, }, } @@ -304,3 +305,16 @@ func requireGraph(t *testing.T, g *dag.Graph, expect graphDefinition) { } require.ElementsMatch(t, expect.OutEdges, actualEdges, "List of edges do not match") } + +type fakeModuleController struct{} + +func (f fakeModuleController) NewModule(id string, export component.ExportFunc) (component.Module, error) { + return nil, nil +} + +func (f fakeModuleController) ModuleIDs() []string { + return nil +} + +func (f fakeModuleController) ClearModuleIDs() { +} diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index cab7109141a0..9481fd1f5b5b 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -257,7 +257,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { msg := fmt.Sprintf("component evaluation failed: %s", err) cn.setEvalHealth(component.HealthTypeUnhealthy, msg) } - return err } @@ -303,7 +302,7 @@ func (cn *ComponentNode) evaluate(scope *vm.Scope) error { } // Run runs the managed component in the calling goroutine until ctx is -// canceled. Evaluate must have been called at least once without retuning an +// canceled. Evaluate must have been called at least once without returning an // error before calling Run. // // Run will immediately return ErrUnevaluated if Evaluate has never been called diff --git a/pkg/flow/module.go b/pkg/flow/module.go index f0687b8873a7..10aa353ee882 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -6,6 +6,7 @@ import ( "path" "sync" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/worker" @@ -46,9 +47,6 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co if id != "" { fullPath = path.Join(fullPath, id) } - if _, found := m.modules[fullPath]; found { - return nil, fmt.Errorf("id %s already exists", id) - } mod := newModule(&moduleOptions{ ID: fullPath, @@ -57,26 +55,33 @@ func (m *moduleController) NewModule(id string, export component.ExportFunc) (co parent: m, }) - if err := m.o.ModuleRegistry.Register(fullPath, mod); err != nil { - return nil, err - } - - m.modules[fullPath] = struct{}{} return mod, nil } -func (m *moduleController) removeID(id string) { +func (m *moduleController) removeModule(mod *module) { m.mut.Lock() defer m.mut.Unlock() - delete(m.modules, id) - m.o.ModuleRegistry.Unregister(id) + m.o.ModuleRegistry.Unregister(mod.o.ID) + delete(m.modules, mod.o.ID) +} + +func (m *moduleController) addModule(mod *module) error { + m.mut.Lock() + defer m.mut.Unlock() + if err := m.o.ModuleRegistry.Register(mod.o.ID, mod); err != nil { + level.Error(m.o.Logger).Log("msg", "error registering module", "id", mod.o.ID, "err", err) + return err + } + m.modules[mod.o.ID] = struct{}{} + return nil } // ModuleIDs implements [controller.ModuleController]. func (m *moduleController) ModuleIDs() []string { m.mut.RLock() defer m.mut.RUnlock() + return maps.Keys(m.modules) } @@ -86,7 +91,7 @@ type module struct { } type moduleOptions struct { - ID string + ID string // ID is the full name including all parents, "module.file.example.prometheus.remote_write.id". export component.ExportFunc parent *moduleController *moduleControllerOptions @@ -135,9 +140,14 @@ func (c *module) LoadConfig(config []byte, args map[string]any) error { // will be run until Run is called. // // Run blocks until the provided context is canceled. -func (c *module) Run(ctx context.Context) { - defer c.o.parent.removeID(c.o.ID) +func (c *module) Run(ctx context.Context) error { + if err := c.o.parent.addModule(c); err != nil { + return err + } + defer c.o.parent.removeModule(c) + c.f.Run(ctx) + return nil } // moduleControllerOptions holds static options for module controller. diff --git a/pkg/flow/module_fail_test.go b/pkg/flow/module_fail_test.go new file mode 100644 index 000000000000..28fb0923a892 --- /dev/null +++ b/pkg/flow/module_fail_test.go @@ -0,0 +1,76 @@ +package flow + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/stretchr/testify/require" +) + +func TestIDRemovalIfFailedToLoad(t *testing.T) { + f := New(testOptions(t)) + + fullContent := "test.fail.module \"t1\" { content = \"\" }" + fl, err := ParseSource("test", []byte(fullContent)) + require.NoError(t, err) + err = f.LoadSource(fl, nil) + require.NoError(t, err) + ctx := context.Background() + ctx, cnc := context.WithTimeout(ctx, 600*time.Second) + + go f.Run(ctx) + var t1 *componenttest.TestFailModule + require.Eventually(t, func() bool { + t1 = f.loader.Components()[0].Component().(*componenttest.TestFailModule) + return t1 != nil + }, 10*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() + // This should be one due to t1. + return len(f.modules.List()) == 1 + }, 10*time.Second, 100*time.Millisecond) + badContent := + `test.fail.module "bad" { +content="" +fail=true +}` + err = t1.UpdateContent(badContent) + // Because we have bad content this should fail, but the ids should be removed. + require.Error(t, err) + require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() + // Only one since the bad one never should have been added. + rightLength := len(f.modules.List()) == 1 + _, foundT1 := f.modules.Get("test.fail.module.t1") + return rightLength && foundT1 + }, 10*time.Second, 100*time.Millisecond) + // fail a second time to ensure the once is done again. + err = t1.UpdateContent(badContent) + require.Error(t, err) + + goodContent := + `test.fail.module "good" { +content="" +fail=false +}` + err = t1.UpdateContent(goodContent) + require.NoError(t, err) + require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() + modT1, foundT1 := f.modules.Get("test.fail.module.t1") + modGood, foundGood := f.modules.Get("test.fail.module.t1/test.fail.module.good") + return modT1 != nil && modGood != nil && foundT1 && foundGood + }, 10*time.Second, 100*time.Millisecond) + cnc() + require.Eventually(t, func() bool { + f.loadMut.RLock() + defer f.loadMut.RUnlock() + // All should be cleaned up. + return len(f.modules.List()) == 0 + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/pkg/flow/module_registry.go b/pkg/flow/module_registry.go index 7b88dcaf717b..34e89fb6f4de 100644 --- a/pkg/flow/module_registry.go +++ b/pkg/flow/module_registry.go @@ -16,7 +16,7 @@ func newModuleRegistry() *moduleRegistry { } } -// Get retrives a module by ID. +// Get retrieves a module by ID. func (reg *moduleRegistry) Get(id string) (*module, bool) { reg.mut.RLock() defer reg.mut.RUnlock() diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index d877bc4ba51c..0a371bc99a63 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -185,44 +185,58 @@ func TestIDList(t *testing.T) { nc := newModuleController(o) require.Len(t, nc.ModuleIDs(), 0) - _, err := nc.NewModule("t1", nil) + mod1, err := nc.NewModule("t1", nil) require.NoError(t, err) - require.Len(t, nc.ModuleIDs(), 1) - - _, err = nc.NewModule("t2", nil) + ctx := context.Background() + ctx, cncl := context.WithCancel(ctx) + go func() { + m1err := mod1.Run(ctx) + require.NoError(t, m1err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 1 + }, 1*time.Second, 100*time.Millisecond) + + mod2, err := nc.NewModule("t2", nil) require.NoError(t, err) - require.Len(t, nc.ModuleIDs(), 2) + go func() { + m2err := mod2.Run(ctx) + require.NoError(t, m2err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 2 + }, 1*time.Second, 100*time.Millisecond) + // Call cncl which will stop the run methods and remove the ids from the module controller + cncl() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 0 + }, 1*time.Second, 100*time.Millisecond) } -func TestIDCollision(t *testing.T) { +func TestDuplicateIDList(t *testing.T) { defer verifyNoGoroutineLeaks(t) o := testModuleControllerOptions(t) defer o.WorkerPool.Stop() nc := newModuleController(o) - m, err := nc.NewModule("t1", nil) - require.NoError(t, err) - require.NotNil(t, m) - m, err = nc.NewModule("t1", nil) - require.Error(t, err) - require.Nil(t, m) -} + require.Len(t, nc.ModuleIDs(), 0) -func TestIDRemoval(t *testing.T) { - defer verifyNoGoroutineLeaks(t) - opts := testModuleControllerOptions(t) - defer opts.WorkerPool.Stop() - opts.ID = "test" - nc := newModuleController(opts) - m, err := nc.NewModule("t1", func(exports map[string]any) {}) - require.NoError(t, err) - err = m.LoadConfig([]byte(""), nil) + mod1, err := nc.NewModule("t1", nil) require.NoError(t, err) - require.NotNil(t, m) ctx := context.Background() - ctx, cncl := context.WithTimeout(ctx, 1*time.Second) + ctx, cncl := context.WithCancel(ctx) defer cncl() - m.Run(ctx) - require.Len(t, nc.(*moduleController).modules, 0) + go func() { + m1err := mod1.Run(ctx) + require.NoError(t, m1err) + }() + require.Eventually(t, func() bool { + return len(nc.ModuleIDs()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + // This should panic with duplicate registration. + require.PanicsWithError(t, "duplicate metrics collector registration attempted", func() { + _, _ = nc.NewModule("t1", nil) + }) } func testModuleControllerOptions(t *testing.T) *moduleControllerOptions {