Skip to content

Commit

Permalink
add live debugging support to all otelcol receivers (#1260)
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum authored Jul 11, 2024
1 parent 7e04ec2 commit c159fe5
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Main (unreleased)

- Added live debugging support to `otelcol.processor.*` components. (@wildum)

- Added live debugging support to `otelcol.receiver.*` components. (@wildum)

- Added a `namespace` label to probes scraped by the `prometheus.operator.probes` component to align with the upstream Prometheus Operator setup. (@toontijtgat2)

### Bugfixes
Expand Down
1 change: 1 addition & 0 deletions docs/sources/troubleshoot/debug.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Live debugging is not yet available in all components.
Supported components:
* `prometheus.relabel`
* `otelcol.processor.*`
* `otelcol.receiver.*`
{{< /admonition >}}


Expand Down
35 changes: 30 additions & 5 deletions internal/component/otelcol/receiver/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/livedebugging"
loki_translator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -54,17 +56,32 @@ type Component struct {
mut sync.RWMutex
receiver loki.LogsReceiver
logsSink consumer.Logs

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments
}

var _ component.Component = (*Component)(nil)
var (
_ component.Component = (*Component)(nil)
_ component.LiveDebugging = (*Component)(nil)
)

// New creates a new otelcol.receiver.loki component.
func New(o component.Options, c Arguments) (*Component, error) {
debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

// TODO(@tpaschalis) Create a metrics struct to count
// total/successful/errored log entries?
res := &Component{
log: o.Logger,
opts: o,
log: o.Logger,
opts: o,
liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

// Create and immediately export the receiver which remains the same for
Expand Down Expand Up @@ -102,8 +119,12 @@ func (c *Component) Update(newConfig component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

cfg := newConfig.(Arguments)
c.logsSink = fanoutconsumer.Logs(cfg.Output.Logs)
c.args = newConfig.(Arguments)
logs := c.args.Output.Logs
if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) {
logs = append(logs, c.liveDebuggingConsumer)
}
c.logsSink = fanoutconsumer.Logs(logs)

return nil
}
Expand Down Expand Up @@ -149,3 +170,7 @@ func convertLokiEntryToPlog(lokiEntry loki.Entry) plog.Logs {

return logs
}

func (c *Component) LiveDebugging(_ int) {
c.Update(c.args)
}
31 changes: 27 additions & 4 deletions internal/component/otelcol/receiver/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/receiver/prometheus/internal"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -65,15 +67,28 @@ type Component struct {
mut sync.RWMutex
cfg Arguments
appendable storage.Appendable

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher
}

var _ component.Component = (*Component)(nil)
var (
_ component.Component = (*Component)(nil)
_ component.LiveDebugging = (*Component)(nil)
)

// New creates a new otelcol.receiver.prometheus component.
func New(o component.Options, c Arguments) (*Component, error) {
debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

res := &Component{
log: o.Logger,
opts: o,
log: o.Logger,
opts: o,
liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

if err := res.Update(c); err != nil {
Expand Down Expand Up @@ -146,7 +161,11 @@ func (c *Component) Update(newConfig component.Arguments) error {
Version: build.Version,
},
}
metricsSink := fanoutconsumer.Metrics(cfg.Output.Metrics)
metrics := cfg.Output.Metrics
if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) {
metrics = append(metrics, c.liveDebuggingConsumer)
}
metricsSink := fanoutconsumer.Metrics(metrics)

appendable, err := internal.NewAppendable(
metricsSink,
Expand All @@ -169,3 +188,7 @@ func (c *Component) Update(newConfig component.Arguments) error {

return nil
}

func (c *Component) LiveDebugging(_ int) {
c.Update(c.cfg)
}
52 changes: 43 additions & 9 deletions internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/component/otelcol/internal/views"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -59,11 +61,17 @@ type Receiver struct {

sched *scheduler.Scheduler
collector *lazycollector.Collector

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments
}

var (
_ component.Component = (*Receiver)(nil)
_ component.HealthComponent = (*Receiver)(nil)
_ component.LiveDebugging = (*Receiver)(nil)
)

// New creates a new Alloy component which encapsulates an OpenTelemetry
Expand All @@ -74,6 +82,11 @@ var (
// responsibility of the caller to export values when needed; the Receiver
// component never exports any values.
func New(opts component.Options, f otelreceiver.Factory, args Arguments) (*Receiver, error) {
debugDataPublisher, err := opts.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())

// Create a lazy collector where metrics from the upstream component will be
Expand All @@ -90,6 +103,9 @@ func New(opts component.Options, f otelreceiver.Factory, args Arguments) (*Recei

sched: scheduler.New(opts.Logger),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
if err := r.Update(args); err != nil {
return nil, err
Expand All @@ -107,12 +123,12 @@ func (r *Receiver) Run(ctx context.Context) error {
// configuration for OpenTelemetry Collector receiver configuration and manage
// the underlying OpenTelemetry Collector receiver.
func (r *Receiver) Update(args component.Arguments) error {
rargs := args.(Arguments)
r.args = args.(Arguments)

host := scheduler.NewHost(
r.opts.Logger,
scheduler.WithHostExtensions(rargs.Extensions()),
scheduler.WithHostExporters(rargs.Exporters()),
scheduler.WithHostExtensions(r.args.Extensions()),
scheduler.WithHostExporters(r.args.Exporters()),
)

reg := prometheus.NewRegistry()
Expand All @@ -123,7 +139,7 @@ func (r *Receiver) Update(args component.Arguments) error {
return err
}

debugMetricsCfg := rargs.DebugMetricsConfig()
debugMetricsCfg := r.args.DebugMetricsConfig()
metricOpts := []metric.Option{metric.WithReader(promExporter)}
if debugMetricsCfg.DisableHighCardinalityMetrics {
metricOpts = append(metricOpts, metric.WithView(views.DropHighCardinalityServerAttributes()...))
Expand Down Expand Up @@ -152,19 +168,25 @@ func (r *Receiver) Update(args component.Arguments) error {
},
}

receiverConfig, err := rargs.Convert()
receiverConfig, err := r.args.Convert()
if err != nil {
return err
}

next := rargs.NextConsumers()
next := r.args.NextConsumers()

// Create instances of the receiver from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

liveDebuggingActive := r.debugDataPublisher.IsActive(livedebugging.ComponentID(r.opts.ID))

if len(next.Traces) > 0 {
nextTraces := fanoutconsumer.Traces(next.Traces)
traces := next.Traces
if liveDebuggingActive {
traces = append(traces, r.liveDebuggingConsumer)
}
nextTraces := fanoutconsumer.Traces(traces)
tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
Expand All @@ -174,7 +196,11 @@ func (r *Receiver) Update(args component.Arguments) error {
}

if len(next.Metrics) > 0 {
nextMetrics := fanoutconsumer.Metrics(next.Metrics)
metrics := next.Metrics
if liveDebuggingActive {
metrics = append(metrics, r.liveDebuggingConsumer)
}
nextMetrics := fanoutconsumer.Metrics(metrics)
metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
Expand All @@ -184,7 +210,11 @@ func (r *Receiver) Update(args component.Arguments) error {
}

if len(next.Logs) > 0 {
nextLogs := fanoutconsumer.Logs(next.Logs)
logs := next.Logs
if liveDebuggingActive {
logs = append(logs, r.liveDebuggingConsumer)
}
nextLogs := fanoutconsumer.Logs(logs)
logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
Expand All @@ -202,3 +232,7 @@ func (r *Receiver) Update(args component.Arguments) error {
func (r *Receiver) CurrentHealth() component.Health {
return r.sched.CurrentHealth()
}

func (p *Receiver) LiveDebugging(_ int) {
p.Update(p.args)
}

0 comments on commit c159fe5

Please sign in to comment.