Skip to content

Commit

Permalink
Add ingoing and outgoing counts to processorhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Aug 19, 2024
1 parent d2ed276 commit f574d98
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 3 deletions.
33 changes: 33 additions & 0 deletions .chloggen/processor-helper-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add incoming and outgoing counts for processors using processorhelper.

# One or more tracking issues or pull requests related to the change
issues: [10910]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Any processor using the processorhelper package (this is most processors) will automatically report
incoming and outgoing item counts. The new metrics are:
- otelcol_processor_incoming_spans
- otelcol_processor_outgoing_spans
- otelcol_processor_incoming_metric_points
- otelcol_processor_outgoing_metric_points
- otelcol_processor_incoming_log_records
- otelcol_processor_outgoing_log_records
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
48 changes: 48 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_incoming_log_records

Number of log records passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_incoming_metric_points

Number of metric points passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_incoming_spans

Number of spans passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_inserted_log_records

Number of log records that were inserted.
Expand All @@ -78,6 +102,30 @@ Number of spans that were inserted.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_outgoing_log_records

Number of log records emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_outgoing_metric_points

Number of metric points emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_outgoing_spans

Number of spans emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_refused_log_records

Number of log records that were rejected by the next component in the pipeline.
Expand Down
42 changes: 42 additions & 0 deletions processor/processorhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -39,12 +40,25 @@ func NewLogsProcessor(
return nil, errors.New("nil logsFunc")
}

if set.MeterProvider == nil {
set.MeterProvider = noop.NewMeterProvider()

Check warning on line 44 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L44

Added line #L44 was not covered by tests
}

obs, err := newObsReport(ObsReportSettings{
ProcessorID: set.ID,
ProcessorCreateSettings: set,
})
if err != nil {
return nil, err

Check warning on line 52 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L52

Added line #L52 was not covered by tests
}

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
recordsIn := ld.LogRecordCount()

ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -53,6 +67,8 @@ func NewLogsProcessor(
}
return err
}
recordsOut := ld.LogRecordCount()
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -67,3 +73,72 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
return ld, retError
}
}

func TestLogsProcessor_RecordInOut(t *testing.T) {
// Regardless of how many logs are ingested, emit just one
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
return ld, nil
}

incomingLogs := plog.NewLogs()
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()

// Add 3 records to the incoming
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))

lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
assert.NoError(t, lp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
}
49 changes: 49 additions & 0 deletions processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,55 @@ status:

telemetry:
metrics:

processor_incoming_spans:
enabled: true
description: Number of spans passed to the processor.
unit: "{spans}"
sum:
value_type: int
monotonic: true

processor_outgoing_spans:
enabled: true
description: Number of spans emitted from the processor.
unit: "{spans}"
sum:
value_type: int
monotonic: true

processor_incoming_metric_points:
enabled: true
description: Number of metric points passed to the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true

processor_outgoing_metric_points:
enabled: true
description: Number of metric points emitted from the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true

processor_incoming_log_records:
enabled: true
description: Number of log records passed to the processor.
unit: "{records}"
sum:
value_type: int
monotonic: true

processor_outgoing_log_records:
enabled: true
description: Number of log records emitted from the processor.
unit: "{records}"
sum:
value_type: int
monotonic: true

processor_accepted_spans:
enabled: true
description: Number of spans successfully pushed into the next component in the pipeline.
Expand Down
18 changes: 17 additions & 1 deletion processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -39,12 +40,25 @@ func NewMetricsProcessor(
return nil, errors.New("nil metricsFunc")
}

if set.MeterProvider == nil {
set.MeterProvider = noop.NewMeterProvider()

Check warning on line 44 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L44

Added line #L44 was not covered by tests
}

obs, err := newObsReport(ObsReportSettings{
ProcessorID: set.ID,
ProcessorCreateSettings: set,
})
if err != nil {
return nil, err

Check warning on line 52 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L52

Added line #L52 was not covered by tests
}

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
pointsIn := md.DataPointCount()

md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -53,6 +67,8 @@ func NewMetricsProcessor(
}
return err
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
Expand Down
Loading

0 comments on commit f574d98

Please sign in to comment.