Skip to content

Commit

Permalink
component: include running module IDs in Info (#4321)
Browse files Browse the repository at this point in the history
This allows clients to query for running modules. This introduces an
internal API for module controllers for lower-level information, leaving
the API exposed to components unchanged.
  • Loading branch information
rfratto authored Jun 30, 2023
1 parent 48c1133 commit a2226c1
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 35 deletions.
6 changes: 6 additions & 0 deletions component/component_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type Info struct {
// evaluated yet.
Component Component

// ModuleIDs includes the list of current module IDs that the component is
// running. Module IDs are always globally unique.
//
// The sort order of the list is not guaranteed.
ModuleIDs []string

ID ID // ID of the component.
Label string // Component label. Not set for singleton components.

Expand Down
3 changes: 1 addition & 2 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/logging"
Expand Down Expand Up @@ -176,7 +175,7 @@ func New(o Options) *Flow {
HTTPListenAddr: o.HTTPListenAddr,
DialFunc: dialFunc,
ControllerID: o.ControllerID,
NewModuleController: func(id string) component.ModuleController {
NewModuleController: func(id string) controller.ModuleController {
return newModuleController(&moduleControllerOptions{
Logger: log,
Tracer: tracer,
Expand Down
1 change: 1 addition & 0 deletions pkg/flow/flow_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (f *Flow) getComponentDetail(cn *controller.ComponentNode, graph *dag.Graph

return &component.Info{
Component: cn.Component(),
ModuleIDs: cn.ModuleIDs(),

ID: component.ID{
ModuleID: f.opts.ControllerID,
Expand Down
64 changes: 37 additions & 27 deletions pkg/flow/internal/controller/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ type DialFunc func(ctx context.Context, network, address string) (net.Conn, erro
// ComponentGlobals are used by ComponentNodes to build managed components. All
// ComponentNodes should use the same ComponentGlobals.
type ComponentGlobals struct {
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
Clusterer *cluster.Clusterer // Clusterer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
HTTPPathPrefix string // HTTP prefix for components.
HTTPListenAddr string // Base address for server
DialFunc DialFunc // Function to connect to HTTPListenAddr.
ControllerID string // ID of controller.
NewModuleController func(id string) component.ModuleController // Func to generate a module controller.
Logger *logging.Logger // Logger shared between all managed components.
TraceProvider trace.TracerProvider // Tracer shared between all managed components.
Clusterer *cluster.Clusterer // Clusterer shared between all managed components.
DataPath string // Shared directory where component data may be stored
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
Registerer prometheus.Registerer // Registerer for serving agent and component metrics
HTTPPathPrefix string // HTTP prefix for components.
HTTPListenAddr string // Base address for server
DialFunc DialFunc // Function to connect to HTTPListenAddr.
ControllerID string // ID of controller.
NewModuleController func(id string) ModuleController // Func to generate a module controller.
}

// ComponentNode is a controller node which manages a user-defined component.
Expand All @@ -86,13 +86,15 @@ type ComponentGlobals struct {
// from a River block.
type ComponentNode struct {
id ComponentID
globalID string
label string
componentName string
nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called.
reg component.Registration
managedOpts component.Options
registry *prometheus.Registry
exportsType reflect.Type
moduleController ModuleController
OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate

mut sync.RWMutex
Expand Down Expand Up @@ -139,13 +141,25 @@ func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode
UpdateTime: time.Now(),
}

// We need to generate a globally unique component ID to give to the
// component and for use with telemetry data which doesn't support
// reconstructing the global ID. For everything else (HTTP, data), we can
// just use the controller-local ID as those values are guaranteed to be
// globally unique.
globalID := nodeID
if globals.ControllerID != "" {
globalID = path.Join(globals.ControllerID, nodeID)
}

cn := &ComponentNode{
id: id,
globalID: globalID,
label: b.Label,
nodeID: nodeID,
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
moduleController: globals.NewModuleController(globalID),
OnComponentUpdate: globals.OnComponentUpdate,

block: b,
Expand All @@ -170,24 +184,14 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op
prefix = "/" + prefix
}

// We need to generate a globally unique component ID to give to the
// component and for use with telemetry data which doesn't support
// reconstructing the global ID. For everything else (HTTP, data), we can
// just use the controller-local ID as those values are guaranteed to be
// globally unique.
globalID := cn.nodeID
if globals.ControllerID != "" {
globalID = path.Join(globals.ControllerID, cn.nodeID)
}

cn.registry = prometheus.NewRegistry()
return component.Options{
ID: globalID,
Logger: log.With(globals.Logger, "component", globalID),
ID: cn.globalID,
Logger: log.With(globals.Logger, "component", cn.globalID),
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{
"component_id": globalID,
"component_id": cn.globalID,
}, cn.registry),
Tracer: tracing.WrapTracer(globals.TraceProvider, globalID),
Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID),
Clusterer: globals.Clusterer,

DataPath: filepath.Join(globals.DataPath, cn.nodeID),
Expand All @@ -196,7 +200,7 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op
HTTPPath: path.Join(prefix, cn.nodeID) + "/",

OnStateChange: cn.setExports,
ModuleController: globals.NewModuleController(globalID),
ModuleController: cn.moduleController,
}
}

Expand Down Expand Up @@ -514,3 +518,9 @@ func (cn *ComponentNode) HTTPHandler() http.Handler {
}
return handler.Handler()
}

// ModuleIDs returns the current list of modules that this component is
// managing.
func (cn *ComponentNode) ModuleIDs() []string {
return cn.moduleController.ModuleIDs()
}
5 changes: 2 additions & 3 deletions pkg/flow/internal/controller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"testing"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/pkg/flow/internal/controller"
"github.com/grafana/agent/pkg/flow/internal/dag"
Expand Down Expand Up @@ -74,7 +73,7 @@ func TestLoader(t *testing.T) {
DataPath: t.TempDir(),
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) component.ModuleController {
NewModuleController: func(id string) controller.ModuleController {
return nil
},
}
Expand Down Expand Up @@ -220,7 +219,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
OnComponentUpdate: func(cn *controller.ComponentNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
Clusterer: noOpClusterer(),
NewModuleController: func(id string) component.ModuleController {
NewModuleController: func(id string) controller.ModuleController {
return nil
},
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/flow/internal/controller/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package controller

import "github.com/grafana/agent/component"

// ModuleController is a lower-level interface for module controllers which
// allows probing for the list of managed modules.
type ModuleController interface {
component.ModuleController

// ModuleIDs returns the list of managed modules in unspecified order.
ModuleIDs() []string
}
13 changes: 10 additions & 3 deletions pkg/flow/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/web/api"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/maps"
)

type moduleController struct {
mut sync.Mutex
mut sync.RWMutex
o *moduleControllerOptions
ids map[string]struct{}
}
Expand All @@ -28,7 +29,7 @@ var (
)

// newModuleController is the entrypoint into creating module instances.
func newModuleController(o *moduleControllerOptions) component.ModuleController {
func newModuleController(o *moduleControllerOptions) controller.ModuleController {
return &moduleController{
o: o,
ids: map[string]struct{}{},
Expand Down Expand Up @@ -63,6 +64,13 @@ func (m *moduleController) removeID(id string) {
delete(m.ids, id)
}

// ModuleIDs implements [controller.ModuleController].
func (m *moduleController) ModuleIDs() []string {
m.mut.RLock()
defer m.mut.RUnlock()
return maps.Keys(m.ids)
}

type module struct {
mut sync.Mutex
f *Flow
Expand Down Expand Up @@ -146,7 +154,6 @@ func (c *module) ComponentHandler() (_ http.Handler) {

// moduleControllerOptions holds static options for module controller.
type moduleControllerOptions struct {

// Logger to use for controller logs and components. A no-op logger will be
// created if this is nil.
Logger *logging.Logger
Expand Down
24 changes: 24 additions & 0 deletions pkg/flow/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,29 @@ func TestExportsWhenNotUsed(t *testing.T) {
}
}

func TestIDList(t *testing.T) {
nc := newModuleController(&moduleControllerOptions{
Logger: nil,
Tracer: nil,
Clusterer: nil,
Reg: nil,
DataPath: "",
HTTPListenAddr: "",
HTTPPath: "",
DialFunc: nil,
ID: "test",
})
require.Len(t, nc.ModuleIDs(), 0)

_, err := nc.NewModule("t1", nil)
require.NoError(t, err)
require.Len(t, nc.ModuleIDs(), 1)

_, err = nc.NewModule("t2", nil)
require.NoError(t, err)
require.Len(t, nc.ModuleIDs(), 2)
}

func TestIDCollision(t *testing.T) {
nc := newModuleController(&moduleControllerOptions{
Logger: nil,
Expand Down Expand Up @@ -253,6 +276,7 @@ func (t *testModule) Run(ctx context.Context) error {
if err != nil {
return err
}

err = m.LoadConfig([]byte(t.content), t.args)
if err != nil {
return err
Expand Down

0 comments on commit a2226c1

Please sign in to comment.