diff --git a/component/component_provider.go b/component/component_provider.go index 7ede7df6b763..7ea9edd8a89f 100644 --- a/component/component_provider.go +++ b/component/component_provider.go @@ -54,6 +54,12 @@ type Info struct { // evaluated yet. Component Component + // ModuleIDs includes the list of current module IDs that the component is + // running. Module IDs are always globally unique. + // + // The sort order of the list is not guaranteed. + ModuleIDs []string + ID ID // ID of the component. Label string // Component label. Not set for singleton components. diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 5469b321df5a..238da4e11b22 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -51,7 +51,6 @@ import ( "sync" "github.com/go-kit/log/level" - "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/cluster" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/logging" @@ -176,7 +175,7 @@ func New(o Options) *Flow { HTTPListenAddr: o.HTTPListenAddr, DialFunc: dialFunc, ControllerID: o.ControllerID, - NewModuleController: func(id string) component.ModuleController { + NewModuleController: func(id string) controller.ModuleController { return newModuleController(&moduleControllerOptions{ Logger: log, Tracer: tracer, diff --git a/pkg/flow/flow_components.go b/pkg/flow/flow_components.go index 28e3de1043a7..45e97d153459 100644 --- a/pkg/flow/flow_components.go +++ b/pkg/flow/flow_components.go @@ -95,6 +95,7 @@ func (f *Flow) getComponentDetail(cn *controller.ComponentNode, graph *dag.Graph return &component.Info{ Component: cn.Component(), + ModuleIDs: cn.ModuleIDs(), ID: component.ID{ ModuleID: f.opts.ControllerID, diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index 888ad5cab1e5..5abf33f8224a 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -65,18 +65,18 @@ type DialFunc func(ctx context.Context, network, address string) (net.Conn, erro // ComponentGlobals are used by ComponentNodes to build managed components. All // ComponentNodes should use the same ComponentGlobals. type ComponentGlobals struct { - Logger *logging.Logger // Logger shared between all managed components. - TraceProvider trace.TracerProvider // Tracer shared between all managed components. - Clusterer *cluster.Clusterer // Clusterer shared between all managed components. - DataPath string // Shared directory where component data may be stored - OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate - OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports - Registerer prometheus.Registerer // Registerer for serving agent and component metrics - HTTPPathPrefix string // HTTP prefix for components. - HTTPListenAddr string // Base address for server - DialFunc DialFunc // Function to connect to HTTPListenAddr. - ControllerID string // ID of controller. - NewModuleController func(id string) component.ModuleController // Func to generate a module controller. + Logger *logging.Logger // Logger shared between all managed components. + TraceProvider trace.TracerProvider // Tracer shared between all managed components. + Clusterer *cluster.Clusterer // Clusterer shared between all managed components. + DataPath string // Shared directory where component data may be stored + OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate + OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports + Registerer prometheus.Registerer // Registerer for serving agent and component metrics + HTTPPathPrefix string // HTTP prefix for components. + HTTPListenAddr string // Base address for server + DialFunc DialFunc // Function to connect to HTTPListenAddr. + ControllerID string // ID of controller. + NewModuleController func(id string) ModuleController // Func to generate a module controller. } // ComponentNode is a controller node which manages a user-defined component. @@ -86,6 +86,7 @@ type ComponentGlobals struct { // from a River block. type ComponentNode struct { id ComponentID + globalID string label string componentName string nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. @@ -93,6 +94,7 @@ type ComponentNode struct { managedOpts component.Options registry *prometheus.Registry exportsType reflect.Type + moduleController ModuleController OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate mut sync.RWMutex @@ -139,13 +141,25 @@ func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode UpdateTime: time.Now(), } + // We need to generate a globally unique component ID to give to the + // component and for use with telemetry data which doesn't support + // reconstructing the global ID. For everything else (HTTP, data), we can + // just use the controller-local ID as those values are guaranteed to be + // globally unique. + globalID := nodeID + if globals.ControllerID != "" { + globalID = path.Join(globals.ControllerID, nodeID) + } + cn := &ComponentNode{ id: id, + globalID: globalID, label: b.Label, nodeID: nodeID, componentName: strings.Join(b.Name, "."), reg: reg, exportsType: getExportsType(reg), + moduleController: globals.NewModuleController(globalID), OnComponentUpdate: globals.OnComponentUpdate, block: b, @@ -170,24 +184,14 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op prefix = "/" + prefix } - // We need to generate a globally unique component ID to give to the - // component and for use with telemetry data which doesn't support - // reconstructing the global ID. For everything else (HTTP, data), we can - // just use the controller-local ID as those values are guaranteed to be - // globally unique. - globalID := cn.nodeID - if globals.ControllerID != "" { - globalID = path.Join(globals.ControllerID, cn.nodeID) - } - cn.registry = prometheus.NewRegistry() return component.Options{ - ID: globalID, - Logger: log.With(globals.Logger, "component", globalID), + ID: cn.globalID, + Logger: log.With(globals.Logger, "component", cn.globalID), Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ - "component_id": globalID, + "component_id": cn.globalID, }, cn.registry), - Tracer: tracing.WrapTracer(globals.TraceProvider, globalID), + Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID), Clusterer: globals.Clusterer, DataPath: filepath.Join(globals.DataPath, cn.nodeID), @@ -196,7 +200,7 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op HTTPPath: path.Join(prefix, cn.nodeID) + "/", OnStateChange: cn.setExports, - ModuleController: globals.NewModuleController(globalID), + ModuleController: cn.moduleController, } } @@ -514,3 +518,9 @@ func (cn *ComponentNode) HTTPHandler() http.Handler { } return handler.Handler() } + +// ModuleIDs returns the current list of modules that this component is +// managing. +func (cn *ComponentNode) ModuleIDs() []string { + return cn.moduleController.ModuleIDs() +} diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 0ead40964f8e..2aa8c3592b59 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -6,7 +6,6 @@ import ( "strings" "testing" - "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/cluster" "github.com/grafana/agent/pkg/flow/internal/controller" "github.com/grafana/agent/pkg/flow/internal/dag" @@ -74,7 +73,7 @@ func TestLoader(t *testing.T) { DataPath: t.TempDir(), OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), - NewModuleController: func(id string) component.ModuleController { + NewModuleController: func(id string) controller.ModuleController { return nil }, } @@ -220,7 +219,7 @@ func TestScopeWithFailingComponent(t *testing.T) { OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ }, Registerer: prometheus.NewRegistry(), Clusterer: noOpClusterer(), - NewModuleController: func(id string) component.ModuleController { + NewModuleController: func(id string) controller.ModuleController { return nil }, } diff --git a/pkg/flow/internal/controller/module.go b/pkg/flow/internal/controller/module.go new file mode 100644 index 000000000000..a672763cf79a --- /dev/null +++ b/pkg/flow/internal/controller/module.go @@ -0,0 +1,12 @@ +package controller + +import "github.com/grafana/agent/component" + +// ModuleController is a lower-level interface for module controllers which +// allows probing for the list of managed modules. +type ModuleController interface { + component.ModuleController + + // ModuleIDs returns the list of managed modules in unspecified order. + ModuleIDs() []string +} diff --git a/pkg/flow/module.go b/pkg/flow/module.go index 72ff77952b04..7d3e8f1e1a26 100644 --- a/pkg/flow/module.go +++ b/pkg/flow/module.go @@ -15,10 +15,11 @@ import ( "github.com/grafana/agent/pkg/flow/tracing" "github.com/grafana/agent/web/api" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/maps" ) type moduleController struct { - mut sync.Mutex + mut sync.RWMutex o *moduleControllerOptions ids map[string]struct{} } @@ -28,7 +29,7 @@ var ( ) // newModuleController is the entrypoint into creating module instances. -func newModuleController(o *moduleControllerOptions) component.ModuleController { +func newModuleController(o *moduleControllerOptions) controller.ModuleController { return &moduleController{ o: o, ids: map[string]struct{}{}, @@ -63,6 +64,13 @@ func (m *moduleController) removeID(id string) { delete(m.ids, id) } +// ModuleIDs implements [controller.ModuleController]. +func (m *moduleController) ModuleIDs() []string { + m.mut.RLock() + defer m.mut.RUnlock() + return maps.Keys(m.ids) +} + type module struct { mut sync.Mutex f *Flow @@ -146,7 +154,6 @@ func (c *module) ComponentHandler() (_ http.Handler) { // moduleControllerOptions holds static options for module controller. type moduleControllerOptions struct { - // Logger to use for controller logs and components. A no-op logger will be // created if this is nil. Logger *logging.Logger diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 68d9dc2d705b..e8e444c54cf1 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -160,6 +160,29 @@ func TestExportsWhenNotUsed(t *testing.T) { } } +func TestIDList(t *testing.T) { + nc := newModuleController(&moduleControllerOptions{ + Logger: nil, + Tracer: nil, + Clusterer: nil, + Reg: nil, + DataPath: "", + HTTPListenAddr: "", + HTTPPath: "", + DialFunc: nil, + ID: "test", + }) + require.Len(t, nc.ModuleIDs(), 0) + + _, err := nc.NewModule("t1", nil) + require.NoError(t, err) + require.Len(t, nc.ModuleIDs(), 1) + + _, err = nc.NewModule("t2", nil) + require.NoError(t, err) + require.Len(t, nc.ModuleIDs(), 2) +} + func TestIDCollision(t *testing.T) { nc := newModuleController(&moduleControllerOptions{ Logger: nil, @@ -253,6 +276,7 @@ func (t *testModule) Run(ctx context.Context) error { if err != nil { return err } + err = m.LoadConfig([]byte(t.content), t.args) if err != nil { return err