Skip to content

Commit

Permalink
flow: separate component and controller path IDs into new labels (#6786)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Apr 3, 2024
1 parent d8d8872 commit 1a035e4
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 45 deletions.
14 changes: 11 additions & 3 deletions internal/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ func NewLoader(opts LoaderOptions) *Loader {
reg = opts.ComponentRegistry
)

parent, id := splitPath(globals.ControllerID)

if reg == nil {
reg = NewDefaultComponentRegistry(opts.ComponentGlobals.MinStability)
}

l := &Loader{
log: log.With(globals.Logger, "controller_id", globals.ControllerID),
log: log.With(globals.Logger, "controller_path", parent, "controller_id", id),
tracer: tracing.WrapTracerForLoader(globals.TraceProvider, globals.ControllerID),
globals: globals,
services: services,
Expand All @@ -99,9 +101,9 @@ func NewLoader(opts LoaderOptions) *Loader {
graph: &dag.Graph{},
originalGraph: &dag.Graph{},
cache: newValueCache(),
cm: newControllerMetrics(globals.ControllerID),
cm: newControllerMetrics(parent, id),
}
l.cc = newControllerCollector(l, globals.ControllerID)
l.cc = newControllerCollector(l, parent, id)

if globals.Registerer != nil {
globals.Registerer.MustRegister(l.cc)
Expand Down Expand Up @@ -909,3 +911,9 @@ func (l *Loader) collectCustomComponentReferences(stmts ast.Body, uniqueReferenc
}
}
}

func splitPath(id string) (string, string) {
parent, id := path.Split(id)
parent, _ = strings.CutSuffix(parent, "/")
return "/" + parent, id
}
16 changes: 8 additions & 8 deletions internal/flow/internal/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type controllerMetrics struct {
}

// newControllerMetrics inits the metrics for the components controller
func newControllerMetrics(id string) *controllerMetrics {
func newControllerMetrics(parent, id string) *controllerMetrics {
cm := &controllerMetrics{
slowComponentThreshold: 1 * time.Minute,
}
Expand All @@ -31,14 +31,14 @@ func newControllerMetrics(id string) *controllerMetrics {
cm.controllerEvaluation = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "agent_component_controller_evaluating",
Help: "Tracks if the controller is currently in the middle of a graph evaluation",
ConstLabels: map[string]string{"controller_id": id},
ConstLabels: map[string]string{"controller_path": parent, "controller_id": id},
})

cm.componentEvaluationTime = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "agent_component_evaluation_seconds",
Help: "Time spent performing component evaluation",
ConstLabels: map[string]string{"controller_id": id},
ConstLabels: map[string]string{"controller_path": parent, "controller_id": id},
Buckets: evaluationTimesBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
Expand All @@ -49,7 +49,7 @@ func newControllerMetrics(id string) *controllerMetrics {
prometheus.HistogramOpts{
Name: "agent_component_dependencies_wait_seconds",
Help: "Time spent by components waiting to be evaluated after their dependency is updated.",
ConstLabels: map[string]string{"controller_id": id},
ConstLabels: map[string]string{"controller_path": parent, "controller_id": id},
Buckets: evaluationTimesBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
Expand All @@ -60,13 +60,13 @@ func newControllerMetrics(id string) *controllerMetrics {
cm.evaluationQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "agent_component_evaluation_queue_size",
Help: "Tracks the number of components waiting to be evaluated in the worker pool",
ConstLabels: map[string]string{"controller_id": id},
ConstLabels: map[string]string{"controller_path": parent, "controller_id": id},
})

cm.slowComponentEvaluationTime = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "agent_component_evaluation_slow_seconds",
Help: fmt.Sprintf("Number of seconds spent evaluating components that take longer than %v to evaluate", cm.slowComponentThreshold),
ConstLabels: map[string]string{"controller_id": id},
ConstLabels: map[string]string{"controller_path": parent, "controller_id": id},
}, []string{"component_id"})

return cm
Expand Down Expand Up @@ -100,14 +100,14 @@ type controllerCollector struct {
runningComponentsTotal *prometheus.Desc
}

func newControllerCollector(l *Loader, id string) *controllerCollector {
func newControllerCollector(l *Loader, parent, id string) *controllerCollector {
return &controllerCollector{
l: l,
runningComponentsTotal: prometheus.NewDesc(
"agent_component_controller_running_components",
"Total number of running components.",
[]string{"health_type"},
map[string]string{"controller_id": id},
map[string]string{"controller_path": parent, "controller_id": id},
),
}
}
Expand Down
6 changes: 4 additions & 2 deletions internal/flow/internal/controller/node_builtin_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,13 @@ func NewBuiltinComponentNode(globals ComponentGlobals, reg component.Registratio

func getManagedOptions(globals ComponentGlobals, cn *BuiltinComponentNode) component.Options {
cn.registry = prometheus.NewRegistry()
parent, id := splitPath(cn.globalID)
return component.Options{
ID: cn.globalID,
Logger: log.With(globals.Logger, "component", cn.globalID),
Logger: log.With(globals.Logger, "component_path", parent, "component_id", id),
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{
"component_id": cn.globalID,
"component_path": parent,
"component_id": id,
}, cn.registry),
Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID),

Expand Down
25 changes: 25 additions & 0 deletions internal/flow/internal/controller/node_builtin_component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,28 @@ func TestLocalID(t *testing.T) {
})
require.Equal(t, "/data/local.id", filepath.ToSlash(mo.DataPath))
}

func TestSplitPath(t *testing.T) {
var testcases = []struct {
input string
path string
id string
}{
{"", "/", ""},
{"remotecfg", "/", "remotecfg"},
{"prometheus.remote_write", "/", "prometheus.remote_write"},
{"custom_component.default/prometheus.remote_write", "/custom_component.default", "prometheus.remote_write"},

{"local.file.default", "/", "local.file.default"},
{"a_namespace.a.default/local.file.default", "/a_namespace.a.default", "local.file.default"},
{"a_namespace.a.default/b_namespace.b.default/local.file.default", "/a_namespace.a.default/b_namespace.b.default", "local.file.default"},

{"a_namespace.a.default/b_namespace.b.default/c_namespace.c.default", "/a_namespace.a.default/b_namespace.b.default", "c_namespace.c.default"},
}

for _, tt := range testcases {
path, id := splitPath(tt.input)
require.Equal(t, tt.path, path)
require.Equal(t, tt.id, id)
}
}
6 changes: 4 additions & 2 deletions internal/flow/internal/controller/node_config_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ func NewImportConfigNode(block *ast.BlockStmt, globals ComponentGlobals, sourceT

func getImportManagedOptions(globals ComponentGlobals, cn *ImportConfigNode) component.Options {
cn.registry = prometheus.NewRegistry()
parent, id := splitPath(cn.globalID)
return component.Options{
ID: cn.globalID,
Logger: log.With(globals.Logger, "config", cn.globalID),
Logger: log.With(globals.Logger, "config_path", parent, "config_id", id),
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{
"config_id": cn.globalID,
"config_path": parent,
"config_id": id,
}, cn.registry),
Tracer: tracing.WrapTracer(globals.TraceProvider, cn.globalID),
DataPath: filepath.Join(globals.DataPath, cn.globalID),
Expand Down
3 changes: 2 additions & 1 deletion internal/flow/internal/controller/node_custom_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi

componentName := b.GetBlockName()
importNamespace, customComponentName := ExtractImportAndDeclare(componentName)
parent, node := splitPath(globalID)

cn := &CustomComponentNode{
id: id,
Expand All @@ -115,7 +116,7 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi
customComponentName: customComponentName,
moduleController: globals.NewModuleController(globalID),
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,
logger: log.With(globals.Logger, "component", globalID),
logger: log.With(globals.Logger, "component_path", parent, "component_id", node),
getConfig: getConfig,

block: b,
Expand Down
2 changes: 1 addition & 1 deletion operations/agent-flow-mixin/alerts/controller.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ alert.newGroup(
// Component evaluations are taking too long, which can lead to e.g. stale targets.
alert.newRule(
'SlowComponentEvaluations',
'sum by (cluster, namespace, component_id) (rate(agent_component_evaluation_slow_seconds[10m])) > 0',
'sum by (cluster, namespace, component_path, component_id) (rate(agent_component_evaluation_slow_seconds[10m])) > 0',
'Flow component evaluations are taking too long.',
'15m',
),
Expand Down
4 changes: 2 additions & 2 deletions operations/agent-flow-mixin/dashboards/controller.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ local filename = 'agent-flow-controller.json';
panel.withQueries([
panel.newQuery(
expr=|||
sum by (component_id) (rate(agent_component_evaluation_slow_seconds{cluster="$cluster", namespace="$namespace"}[$__rate_interval]))
sum by (component_path, component_id) (rate(agent_component_evaluation_slow_seconds{cluster="$cluster", namespace="$namespace"}[$__rate_interval]))
/ scalar(sum(rate(agent_component_evaluation_seconds_sum{cluster="$cluster", namespace="$namespace"}[$__rate_interval])))
|||,
legendFormat='{{component_id}}',
legendFormat='{{component path}} {{component_id}}',
),
])
),
Expand Down
Loading

0 comments on commit 1a035e4

Please sign in to comment.