Skip to content

Commit

Permalink
Add 'pipeline' attribute to processor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 13, 2024
1 parent 3b50b38 commit f9d4226
Show file tree
Hide file tree
Showing 34 changed files with 402 additions and 280 deletions.
6 changes: 3 additions & 3 deletions cmd/mdatagen/templates/component_telemetry_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"go.opentelemetry.io/collector/component"
{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -28,8 +28,8 @@ type componentTestTelemetry struct {
}

{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings()
func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt component.DataType{{- end -}}) {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}})
settings.MeterProvider = tt.meterProvider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return tt.meterProvider
Expand Down
6 changes: 3 additions & 3 deletions cmd/mdatagen/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestComponentLifecycle(t *testing.T) {
switch test.name {
case "logs":
e, ok := c.(exporter.Logs)
require.True(t, ok)
require.True(t, ok)
logs := generateLifecycleTestLogs()
if !e.Capabilities().MutatesData {
logs.MarkReadOnly()
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestComponentLifecycle(t *testing.T) {
for _, test := range tests {
{{- if not .Tests.SkipShutdown }}
t.Run(test.name + "-shutdown", func(t *testing.T) {
c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg)
require.NoError(t, err)
err = c.Shutdown(context.Background())
require.NoError(t, err)
Expand All @@ -213,7 +213,7 @@ func TestComponentLifecycle(t *testing.T) {

{{- if not .Tests.SkipLifecycle }}
t.Run(test.name + "-lifecycle", func(t *testing.T) {
c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg)
require.NoError(t, err)
host := {{ .Tests.Host }}
err = c.Start(context.Background(), host)
Expand Down
16 changes: 11 additions & 5 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ const (
transportTag = "transport"
exporterTag = "exporter"
processorTag = "processor"
pipelineTag = "pipeline"
)

type TestTelemetry struct {
ts component.TelemetrySettings
id component.ID
extraAttrs []attribute.KeyValue
SpanRecorder *tracetest.SpanRecorder

prometheusChecker *prometheusChecker
Expand Down Expand Up @@ -83,19 +85,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext
// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorTraces(attrs, acceptedSpans, refusedSpans, droppedSpans)

Check warning on line 89 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorMetrics(attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)

Check warning on line 96 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorLogs(attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords)

Check warning on line 103 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down Expand Up @@ -137,16 +142,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings {
return tts.ts
}

// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters.
// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters.
// The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods.
// The caller must defer a call to `Shutdown` on the returned TestTelemetry.
func SetupTelemetry(id component.ID) (TestTelemetry, error) {
func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) {

Check warning on line 148 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L148

Added line #L148 was not covered by tests
sr := new(tracetest.SpanRecorder)
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))

settings := TestTelemetry{
ts: NewNopTelemetrySettings(),
id: id,
extraAttrs: extraAttrs,

Check warning on line 155 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L155

Added line #L155 was not covered by tests
SpanRecorder: sr,
}
settings.ts.TracerProvider = tp
Expand Down
25 changes: 12 additions & 13 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,23 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot
pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorTraces(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "spans", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorMetrics(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "metric_points", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorLogs(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "log_records", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
func (pc *prometheusChecker) checkProcessor(attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error {
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, attrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, attrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, attrs),
)
}

Expand Down Expand Up @@ -190,8 +189,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att
}
}

func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(processorTag, processor.String())}
func attributesForProcessorMetrics(processor component.ID, extraAttrs []attribute.KeyValue) []attribute.KeyValue {
return append(extraAttrs, attribute.String(processorTag, processor.String()))

Check warning on line 193 in component/componenttest/otelprometheuschecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelprometheuschecker.go#L192-L193

Added lines #L192 - L193 were not covered by tests
}

// attributesForExporterMetrics returns the attributes that are needed for the receiver metrics.
Expand Down
15 changes: 12 additions & 3 deletions component/componenttest/otelprometheuschecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,26 @@ func TestPromChecker(t *testing.T) {
)

assert.NoError(t,
pc.checkProcessorTraces(processor, 42, 13, 7),
pc.checkProcessorTraces([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("pipeline", "traces/fakePipeline"),
}, 42, 13, 7),
"metrics from Receiver Traces should be valid",
)

assert.NoError(t,
pc.checkProcessorMetrics(processor, 7, 41, 13),
pc.checkProcessorMetrics([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("pipeline", "metrics/fakePipeline"),
}, 7, 41, 13),
"metrics from Receiver Metrics should be valid",
)

assert.NoError(t,
pc.checkProcessorLogs(processor, 102, 35, 14),
pc.checkProcessorLogs([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("pipeline", "logs/fakePipeline"),
}, 102, 35, 14),
"metrics from Receiver Logs should be valid",
)

Expand Down
24 changes: 12 additions & 12 deletions component/componenttest/testdata/prometheus_response
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,40 @@ otelcol_exporter_send_failed_log_records{exporter="fakeExporter"} 36
otelcol_exporter_sent_log_records{exporter="fakeExporter"} 103
# HELP otelcol_processor_accepted_spans Number of spans successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_spans counter
otelcol_processor_accepted_spans{processor="fakeProcessor"} 42
otelcol_processor_accepted_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 42
# HELP otelcol_processor_refused_spans Number of spans that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_spans counter
otelcol_processor_refused_spans{processor="fakeProcessor"} 13
otelcol_processor_refused_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 13
# HELP otelcol_processor_dropped_spans Number of spans that were dropped.
# TYPE otelcol_processor_dropped_spans counter
otelcol_processor_dropped_spans{processor="fakeProcessor"} 7
otelcol_processor_dropped_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 7
# HELP otelcol_processor_inserted_spans Number of spans that were inserted.
# TYPE otelcol_processor_inserted_spans counter
otelcol_processor_inserted_spans{processor="fakeProcessor"} 5
otelcol_processor_inserted_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 5
# HELP otelcol_processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_metric_points counter
otelcol_processor_accepted_metric_points{processor="fakeProcessor"} 7
otelcol_processor_accepted_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 7
# HELP otelcol_processor_refused_metric_points Number of metric points that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_metric_points counter
otelcol_processor_refused_metric_points{processor="fakeProcessor"} 41
otelcol_processor_refused_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 41
# HELP otelcol_processor_dropped_metric_points Number of metric points that were dropped.
# TYPE otelcol_processor_dropped_metric_points counter
otelcol_processor_dropped_metric_points{processor="fakeProcessor"} 13
otelcol_processor_dropped_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 13
# HELP otelcol_processor_inserted_metric_points Number of metric points that were inserted.
# TYPE otelcol_processor_inserted_metric_points counter
otelcol_processor_inserted_metric_points{processor="fakeProcessor"} 4
otelcol_processor_inserted_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 4
# HELP otelcol_processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_log_records counter
otelcol_processor_accepted_log_records{processor="fakeProcessor"} 102
otelcol_processor_accepted_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 102
# HELP otelcol_processor_refused_log_records Number of log records that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_log_records counter
otelcol_processor_refused_log_records{processor="fakeProcessor"} 35
otelcol_processor_refused_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 35
# HELP otelcol_processor_dropped_log_records Number of log records that were dropped.
# TYPE otelcol_processor_dropped_log_records counter
otelcol_processor_dropped_log_records{processor="fakeProcessor"} 14
otelcol_processor_dropped_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 14
# HELP otelcol_processor_inserted_log_records Number of log records that were inserted.
# TYPE otelcol_processor_inserted_log_records counter
otelcol_processor_inserted_log_records{processor="fakeProcessor"} 3
otelcol_processor_inserted_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 3
# HELP otelcol_receiver_accepted_log_records Number of log records successfully pushed into the pipeline.
# TYPE otelcol_receiver_accepted_log_records counter
otelcol_receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102
Expand Down
Loading

0 comments on commit f9d4226

Please sign in to comment.