From c159fe51179d44acb604cac2b844857603399985 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 11 Jul 2024 17:16:39 +0200 Subject: [PATCH] add live debugging support to all otelcol receivers (#1260) --- CHANGELOG.md | 2 + docs/sources/troubleshoot/debug.md | 1 + .../component/otelcol/receiver/loki/loki.go | 35 +++++++++++-- .../otelcol/receiver/prometheus/prometheus.go | 31 +++++++++-- .../component/otelcol/receiver/receiver.go | 52 +++++++++++++++---- 5 files changed, 103 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41f05343d5..4ac253174f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Main (unreleased) - Added live debugging support to `otelcol.processor.*` components. (@wildum) +- Added live debugging support to `otelcol.receiver.*` components. (@wildum) + - Added a `namespace` label to probes scraped by the `prometheus.operator.probes` component to align with the upstream Prometheus Operator setup. (@toontijtgat2) ### Bugfixes diff --git a/docs/sources/troubleshoot/debug.md b/docs/sources/troubleshoot/debug.md index 7232ec86c5..dba4ee2650 100644 --- a/docs/sources/troubleshoot/debug.md +++ b/docs/sources/troubleshoot/debug.md @@ -106,6 +106,7 @@ Live debugging is not yet available in all components. Supported components: * `prometheus.relabel` * `otelcol.processor.*` +* `otelcol.receiver.*` {{< /admonition >}} diff --git a/internal/component/otelcol/receiver/loki/loki.go b/internal/component/otelcol/receiver/loki/loki.go index a839d9dd8d..c75f462d4b 100644 --- a/internal/component/otelcol/receiver/loki/loki.go +++ b/internal/component/otelcol/receiver/loki/loki.go @@ -12,8 +12,10 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/otelcol" "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/service/livedebugging" loki_translator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" @@ -54,17 +56,32 @@ type Component struct { mut sync.RWMutex receiver loki.LogsReceiver logsSink consumer.Logs + + liveDebuggingConsumer *livedebuggingconsumer.Consumer + debugDataPublisher livedebugging.DebugDataPublisher + + args Arguments } -var _ component.Component = (*Component)(nil) +var ( + _ component.Component = (*Component)(nil) + _ component.LiveDebugging = (*Component)(nil) +) // New creates a new otelcol.receiver.loki component. func New(o component.Options, c Arguments) (*Component, error) { + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + // TODO(@tpaschalis) Create a metrics struct to count // total/successful/errored log entries? res := &Component{ - log: o.Logger, - opts: o, + log: o.Logger, + opts: o, + liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID), + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } // Create and immediately export the receiver which remains the same for @@ -102,8 +119,12 @@ func (c *Component) Update(newConfig component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() - cfg := newConfig.(Arguments) - c.logsSink = fanoutconsumer.Logs(cfg.Output.Logs) + c.args = newConfig.(Arguments) + logs := c.args.Output.Logs + if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) { + logs = append(logs, c.liveDebuggingConsumer) + } + c.logsSink = fanoutconsumer.Logs(logs) return nil } @@ -149,3 +170,7 @@ func convertLokiEntryToPlog(lokiEntry loki.Entry) plog.Logs { return logs } + +func (c *Component) LiveDebugging(_ int) { + c.Update(c.args) +} diff --git a/internal/component/otelcol/receiver/prometheus/prometheus.go b/internal/component/otelcol/receiver/prometheus/prometheus.go index 327ffde3a4..a4fc0564c9 100644 --- a/internal/component/otelcol/receiver/prometheus/prometheus.go +++ b/internal/component/otelcol/receiver/prometheus/prometheus.go @@ -14,8 +14,10 @@ import ( "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer" "github.com/grafana/alloy/internal/component/otelcol/receiver/prometheus/internal" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -65,15 +67,28 @@ type Component struct { mut sync.RWMutex cfg Arguments appendable storage.Appendable + + liveDebuggingConsumer *livedebuggingconsumer.Consumer + debugDataPublisher livedebugging.DebugDataPublisher } -var _ component.Component = (*Component)(nil) +var ( + _ component.Component = (*Component)(nil) + _ component.LiveDebugging = (*Component)(nil) +) // New creates a new otelcol.receiver.prometheus component. func New(o component.Options, c Arguments) (*Component, error) { + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + res := &Component{ - log: o.Logger, - opts: o, + log: o.Logger, + opts: o, + liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID), + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } if err := res.Update(c); err != nil { @@ -146,7 +161,11 @@ func (c *Component) Update(newConfig component.Arguments) error { Version: build.Version, }, } - metricsSink := fanoutconsumer.Metrics(cfg.Output.Metrics) + metrics := cfg.Output.Metrics + if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) { + metrics = append(metrics, c.liveDebuggingConsumer) + } + metricsSink := fanoutconsumer.Metrics(metrics) appendable, err := internal.NewAppendable( metricsSink, @@ -169,3 +188,7 @@ func (c *Component) Update(newConfig component.Arguments) error { return nil } + +func (c *Component) LiveDebugging(_ int) { + c.Update(c.cfg) +} diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 9f7e9e1cba..4ace133e51 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -13,8 +13,10 @@ import ( otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" + "github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/component/otelcol/internal/views" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/client_golang/prometheus" otelcomponent "go.opentelemetry.io/collector/component" @@ -59,11 +61,17 @@ type Receiver struct { sched *scheduler.Scheduler collector *lazycollector.Collector + + liveDebuggingConsumer *livedebuggingconsumer.Consumer + debugDataPublisher livedebugging.DebugDataPublisher + + args Arguments } var ( _ component.Component = (*Receiver)(nil) _ component.HealthComponent = (*Receiver)(nil) + _ component.LiveDebugging = (*Receiver)(nil) ) // New creates a new Alloy component which encapsulates an OpenTelemetry @@ -74,6 +82,11 @@ var ( // responsibility of the caller to export values when needed; the Receiver // component never exports any values. func New(opts component.Options, f otelreceiver.Factory, args Arguments) (*Receiver, error) { + debugDataPublisher, err := opts.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) // Create a lazy collector where metrics from the upstream component will be @@ -90,6 +103,9 @@ func New(opts component.Options, f otelreceiver.Factory, args Arguments) (*Recei sched: scheduler.New(opts.Logger), collector: collector, + + liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } if err := r.Update(args); err != nil { return nil, err @@ -107,12 +123,12 @@ func (r *Receiver) Run(ctx context.Context) error { // configuration for OpenTelemetry Collector receiver configuration and manage // the underlying OpenTelemetry Collector receiver. func (r *Receiver) Update(args component.Arguments) error { - rargs := args.(Arguments) + r.args = args.(Arguments) host := scheduler.NewHost( r.opts.Logger, - scheduler.WithHostExtensions(rargs.Extensions()), - scheduler.WithHostExporters(rargs.Exporters()), + scheduler.WithHostExtensions(r.args.Extensions()), + scheduler.WithHostExporters(r.args.Exporters()), ) reg := prometheus.NewRegistry() @@ -123,7 +139,7 @@ func (r *Receiver) Update(args component.Arguments) error { return err } - debugMetricsCfg := rargs.DebugMetricsConfig() + debugMetricsCfg := r.args.DebugMetricsConfig() metricOpts := []metric.Option{metric.WithReader(promExporter)} if debugMetricsCfg.DisableHighCardinalityMetrics { metricOpts = append(metricOpts, metric.WithView(views.DropHighCardinalityServerAttributes()...)) @@ -152,19 +168,25 @@ func (r *Receiver) Update(args component.Arguments) error { }, } - receiverConfig, err := rargs.Convert() + receiverConfig, err := r.args.Convert() if err != nil { return err } - next := rargs.NextConsumers() + next := r.args.NextConsumers() // Create instances of the receiver from our factory for each of our // supported telemetry signals. var components []otelcomponent.Component + liveDebuggingActive := r.debugDataPublisher.IsActive(livedebugging.ComponentID(r.opts.ID)) + if len(next.Traces) > 0 { - nextTraces := fanoutconsumer.Traces(next.Traces) + traces := next.Traces + if liveDebuggingActive { + traces = append(traces, r.liveDebuggingConsumer) + } + nextTraces := fanoutconsumer.Traces(traces) tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err @@ -174,7 +196,11 @@ func (r *Receiver) Update(args component.Arguments) error { } if len(next.Metrics) > 0 { - nextMetrics := fanoutconsumer.Metrics(next.Metrics) + metrics := next.Metrics + if liveDebuggingActive { + metrics = append(metrics, r.liveDebuggingConsumer) + } + nextMetrics := fanoutconsumer.Metrics(metrics) metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err @@ -184,7 +210,11 @@ func (r *Receiver) Update(args component.Arguments) error { } if len(next.Logs) > 0 { - nextLogs := fanoutconsumer.Logs(next.Logs) + logs := next.Logs + if liveDebuggingActive { + logs = append(logs, r.liveDebuggingConsumer) + } + nextLogs := fanoutconsumer.Logs(logs) logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err @@ -202,3 +232,7 @@ func (r *Receiver) Update(args component.Arguments) error { func (r *Receiver) CurrentHealth() component.Health { return r.sched.CurrentHealth() } + +func (p *Receiver) LiveDebugging(_ int) { + p.Update(p.args) +}