From 4421a21c164ae56fb7a436bbcb7fc079466158b5 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 19 Aug 2024 15:25:04 -0400 Subject: [PATCH] Add ingoing and outgoing counts to processorhelper --- .chloggen/processor-helper-metrics.yaml | 33 ++++++++ .../memorylimiter_test.go | 3 +- processor/processorhelper/documentation.md | 48 +++++++++++ .../internal/metadata/generated_telemetry.go | 42 ++++++++++ processor/processorhelper/logs.go | 13 ++- processor/processorhelper/logs_test.go | 81 ++++++++++++++++++ processor/processorhelper/metadata.yaml | 49 +++++++++++ processor/processorhelper/metrics.go | 13 ++- processor/processorhelper/metrics_test.go | 82 +++++++++++++++++++ processor/processorhelper/obsreport.go | 18 ++++ processor/processorhelper/traces.go | 13 ++- processor/processorhelper/traces_test.go | 82 +++++++++++++++++++ 12 files changed, 473 insertions(+), 4 deletions(-) create mode 100644 .chloggen/processor-helper-metrics.yaml diff --git a/.chloggen/processor-helper-metrics.yaml b/.chloggen/processor-helper-metrics.yaml new file mode 100644 index 00000000000..c4cb21131ce --- /dev/null +++ b/.chloggen/processor-helper-metrics.yaml @@ -0,0 +1,33 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add incoming and outgoing counts for processors using processorhelper. + +# One or more tracking issues or pull requests related to the change +issues: [10910] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Any processor using the processorhelper package (this is most processors) will automatically report + incoming and outgoing item counts. The new metrics are: + - otelcol_processor_incoming_spans + - otelcol_processor_outgoing_spans + - otelcol_processor_incoming_metric_points + - otelcol_processor_outgoing_metric_points + - otelcol_processor_incoming_log_records + - otelcol_processor_outgoing_log_records + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 6172fa39889..a09c31e1ff0 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -57,7 +57,8 @@ func TestNoDataLoss(t *testing.T) { require.NoError(t, err) processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.Settings{ - ID: component.MustNewID("nop"), + ID: component.MustNewID("nop"), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), }, cfg, exporter, limiter.processLogs, processorhelper.WithStart(limiter.start), diff --git a/processor/processorhelper/documentation.md b/processor/processorhelper/documentation.md index 6091e8385a9..e14bbbd779c 100644 --- a/processor/processorhelper/documentation.md +++ b/processor/processorhelper/documentation.md @@ -54,6 +54,30 @@ Number of spans that were dropped. | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | +### otelcol_processor_incoming_log_records + +Number of log records passed to the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {records} | Sum | Int | true | + +### otelcol_processor_incoming_metric_points + +Number of metric points passed to the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoints} | Sum | Int | true | + +### otelcol_processor_incoming_spans + +Number of spans passed to the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {spans} | Sum | Int | true | + ### otelcol_processor_inserted_log_records Number of log records that were inserted. @@ -78,6 +102,30 @@ Number of spans that were inserted. | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | +### otelcol_processor_outgoing_log_records + +Number of log records emitted from the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {records} | Sum | Int | true | + +### otelcol_processor_outgoing_metric_points + +Number of metric points emitted from the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoints} | Sum | Int | true | + +### otelcol_processor_outgoing_spans + +Number of spans emitted from the processor. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {spans} | Sum | Int | true | + ### otelcol_processor_refused_log_records Number of log records that were rejected by the next component in the pipeline. diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index 114ba9df7c0..31b15261a2f 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -35,9 +35,15 @@ type TelemetryBuilder struct { ProcessorDroppedLogRecords metric.Int64Counter ProcessorDroppedMetricPoints metric.Int64Counter ProcessorDroppedSpans metric.Int64Counter + ProcessorIncomingLogRecords metric.Int64Counter + ProcessorIncomingMetricPoints metric.Int64Counter + ProcessorIncomingSpans metric.Int64Counter ProcessorInsertedLogRecords metric.Int64Counter ProcessorInsertedMetricPoints metric.Int64Counter ProcessorInsertedSpans metric.Int64Counter + ProcessorOutgoingLogRecords metric.Int64Counter + ProcessorOutgoingMetricPoints metric.Int64Counter + ProcessorOutgoingSpans metric.Int64Counter ProcessorRefusedLogRecords metric.Int64Counter ProcessorRefusedMetricPoints metric.Int64Counter ProcessorRefusedSpans metric.Int64Counter @@ -92,6 +98,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ProcessorIncomingLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_incoming_log_records", + metric.WithDescription("Number of log records passed to the processor."), + metric.WithUnit("{records}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorIncomingMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_incoming_metric_points", + metric.WithDescription("Number of metric points passed to the processor."), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorIncomingSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_incoming_spans", + metric.WithDescription("Number of spans passed to the processor."), + metric.WithUnit("{spans}"), + ) + errs = errors.Join(errs, err) builder.ProcessorInsertedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_processor_inserted_log_records", metric.WithDescription("Number of log records that were inserted."), @@ -110,6 +134,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ProcessorOutgoingLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_outgoing_log_records", + metric.WithDescription("Number of log records emitted from the processor."), + metric.WithUnit("{records}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorOutgoingMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_outgoing_metric_points", + metric.WithDescription("Number of metric points emitted from the processor."), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorOutgoingSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_outgoing_spans", + metric.WithDescription("Number of spans emitted from the processor."), + metric.WithUnit("{spans}"), + ) + errs = errors.Join(errs, err) builder.ProcessorRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_processor_refused_log_records", metric.WithDescription("Number of log records that were rejected by the next component in the pipeline."), diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index d593a5f74a5..1f3c7ccde09 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -39,12 +39,21 @@ func NewLogsProcessor( return nil, errors.New("nil logsFunc") } + obs, err := newObsReport(ObsReportSettings{ + ProcessorID: set.ID, + ProcessorCreateSettings: set, + }) + if err != nil { + return nil, err + } + eventOptions := spanAttributes(set.ID) bs := fromOptions(options) logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) - var err error + recordsIn := ld.LogRecordCount() + ld, err = logsFunc(ctx, ld) span.AddEvent("End processing.", eventOptions) if err != nil { @@ -53,6 +62,8 @@ func NewLogsProcessor( } return err } + recordsOut := ld.LogRecordCount() + obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut) return nextConsumer.ConsumeLogs(ctx, ld) }, bs.consumerOptions...) if err != nil { diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index a958ad33130..e4a35657455 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -6,13 +6,20 @@ package processorhelper import ( "context" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" @@ -67,3 +74,77 @@ func newTestLProcessor(retError error) ProcessLogsFunc { return ld, retError } } + +func TestLogsProcessor_RecordInOut(t *testing.T) { + // Regardless of how many logs are ingested, emit just one + mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + return ld, nil + } + + incomingLogs := plog.NewLogs() + incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + + // Add 3 records to the incoming + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + + metricReader := sdkmetric.NewManualReader() + set := processortest.NewNopSettings() + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic + set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { + if level >= configtelemetry.LevelBasic { + return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) + } + return nil + } + + lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs)) + assert.NoError(t, lp.Shutdown(context.Background())) + + ownMetrics := new(metricdata.ResourceMetrics) + require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) + + require.Len(t, ownMetrics.ScopeMetrics, 1) + require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) + + inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] + outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] + if strings.Contains(inMetric.Name, "outgoing") { + inMetric, outMetric = outMetric, inMetric + } + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 3, + }, + }, + }, inMetric.Data, metricdatatest.IgnoreTimestamp()) + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 1, + }, + }, + }, outMetric.Data, metricdatatest.IgnoreTimestamp()) +} diff --git a/processor/processorhelper/metadata.yaml b/processor/processorhelper/metadata.yaml index 95acaf640b8..9f6d1731f85 100644 --- a/processor/processorhelper/metadata.yaml +++ b/processor/processorhelper/metadata.yaml @@ -9,6 +9,55 @@ status: telemetry: metrics: + + processor_incoming_spans: + enabled: true + description: Number of spans passed to the processor. + unit: "{spans}" + sum: + value_type: int + monotonic: true + + processor_outgoing_spans: + enabled: true + description: Number of spans emitted from the processor. + unit: "{spans}" + sum: + value_type: int + monotonic: true + + processor_incoming_metric_points: + enabled: true + description: Number of metric points passed to the processor. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true + + processor_outgoing_metric_points: + enabled: true + description: Number of metric points emitted from the processor. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true + + processor_incoming_log_records: + enabled: true + description: Number of log records passed to the processor. + unit: "{records}" + sum: + value_type: int + monotonic: true + + processor_outgoing_log_records: + enabled: true + description: Number of log records emitted from the processor. + unit: "{records}" + sum: + value_type: int + monotonic: true + processor_accepted_spans: enabled: true description: Number of spans successfully pushed into the next component in the pipeline. diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index dc3388444c6..1d8771394b0 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -39,12 +39,21 @@ func NewMetricsProcessor( return nil, errors.New("nil metricsFunc") } + obs, err := newObsReport(ObsReportSettings{ + ProcessorID: set.ID, + ProcessorCreateSettings: set, + }) + if err != nil { + return nil, err + } + eventOptions := spanAttributes(set.ID) bs := fromOptions(options) metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) - var err error + pointsIn := md.DataPointCount() + md, err = metricsFunc(ctx, md) span.AddEvent("End processing.", eventOptions) if err != nil { @@ -53,6 +62,8 @@ func NewMetricsProcessor( } return err } + pointsOut := md.DataPointCount() + obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut) return nextConsumer.ConsumeMetrics(ctx, md) }, bs.consumerOptions...) if err != nil { diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index e3ac40e26bd..65f812fdb52 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -6,13 +6,20 @@ package processorhelper import ( "context" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" @@ -67,3 +74,78 @@ func newTestMProcessor(retError error) ProcessMetricsFunc { return md, retError } } + +func TestMetricsProcessor_RecordInOut(t *testing.T) { + // Regardless of how many logs are ingested, emit just one + mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + return md, nil + } + + incomingMetrics := pmetric.NewMetrics() + dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints() + + // Add 3 data points to the incoming + dps.AppendEmpty() + dps.AppendEmpty() + dps.AppendEmpty() + + metricReader := sdkmetric.NewManualReader() + set := processortest.NewNopSettings() + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic + set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { + if level >= configtelemetry.LevelBasic { + return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) + } + return nil + } + + mp, err := NewMetricsProcessor(context.Background(), set, &testMetricsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) + assert.NoError(t, mp.Shutdown(context.Background())) + + ownMetrics := new(metricdata.ResourceMetrics) + require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) + + require.Len(t, ownMetrics.ScopeMetrics, 1) + require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) + + inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] + outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] + if strings.Contains(inMetric.Name, "outgoing") { + inMetric, outMetric = outMetric, inMetric + } + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 3, + }, + }, + }, inMetric.Data, metricdatatest.IgnoreTimestamp()) + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 1, + }, + }, + }, outMetric.Data, metricdatatest.IgnoreTimestamp()) +} diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index 6fef0fd45ec..c69f601cf3d 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -60,6 +60,24 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) { }, nil } +func (or *ObsReport) recordInOut(ctx context.Context, dataType component.DataType, incoming, outgoing int) { + var incomingCount, outgoingCount metric.Int64Counter + switch dataType { + case component.DataTypeTraces: + incomingCount = or.telemetryBuilder.ProcessorIncomingSpans + outgoingCount = or.telemetryBuilder.ProcessorOutgoingSpans + case component.DataTypeMetrics: + incomingCount = or.telemetryBuilder.ProcessorIncomingMetricPoints + outgoingCount = or.telemetryBuilder.ProcessorOutgoingMetricPoints + case component.DataTypeLogs: + incomingCount = or.telemetryBuilder.ProcessorIncomingLogRecords + outgoingCount = or.telemetryBuilder.ProcessorOutgoingLogRecords + } + + incomingCount.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...)) + outgoingCount.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...)) +} + func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped, inserted int64) { var acceptedCount, refusedCount, droppedCount, insertedCount metric.Int64Counter switch dataType { diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 9f24c2a8b0f..b9988f5db55 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -39,12 +39,21 @@ func NewTracesProcessor( return nil, errors.New("nil tracesFunc") } + obs, err := newObsReport(ObsReportSettings{ + ProcessorID: set.ID, + ProcessorCreateSettings: set, + }) + if err != nil { + return nil, err + } + eventOptions := spanAttributes(set.ID) bs := fromOptions(options) traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) - var err error + spansIn := td.SpanCount() + td, err = tracesFunc(ctx, td) span.AddEvent("End processing.", eventOptions) if err != nil { @@ -53,6 +62,8 @@ func NewTracesProcessor( } return err } + spansOut := td.SpanCount() + obs.recordInOut(ctx, component.DataTypeTraces, spansIn, spansOut) return nextConsumer.ConsumeTraces(ctx, td) }, bs.consumerOptions...) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index dd6b5eaa18a..b99e09fd615 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -6,13 +6,20 @@ package processorhelper import ( "context" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" @@ -67,3 +74,78 @@ func newTestTProcessor(retError error) ProcessTracesFunc { return td, retError } } + +func TestTracesProcessor_RecordInOut(t *testing.T) { + // Regardless of how many logs are ingested, emit just one + mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + return td, nil + } + + incomingTraces := ptrace.NewTraces() + incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + + // Add 3 records to the incoming + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + + metricReader := sdkmetric.NewManualReader() + set := processortest.NewNopSettings() + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal + set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic + set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider { + if level >= configtelemetry.LevelBasic { + return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader)) + } + return nil + } + + tp, err := NewTracesProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces)) + assert.NoError(t, tp.Shutdown(context.Background())) + + ownMetrics := new(metricdata.ResourceMetrics) + require.NoError(t, metricReader.Collect(context.Background(), ownMetrics)) + + require.Len(t, ownMetrics.ScopeMetrics, 1) + require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2) + + inMetric := ownMetrics.ScopeMetrics[0].Metrics[0] + outMetric := ownMetrics.ScopeMetrics[0].Metrics[1] + if strings.Contains(inMetric.Name, "outgoing") { + inMetric, outMetric = outMetric, inMetric + } + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 3, + }, + }, + }, inMetric.Data, metricdatatest.IgnoreTimestamp()) + + metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("processor"), + Value: attribute.StringValue(set.ID.String()), + }), + Value: 1, + }, + }, + }, outMetric.Data, metricdatatest.IgnoreTimestamp()) +}