Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] cleaning up test to use mdatagen test harness #11098

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 30 additions & 51 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -91,60 +86,44 @@ func TestLogsProcessor_RecordInOut(t *testing.T) {
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
assert.NoError(t, lp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_log_records",
Description: "Number of log records passed to the processor.",
Unit: "{records}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_log_records",
Description: "Number of log records emitted from the processor.",
Unit: "{records}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}
89 changes: 34 additions & 55 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -76,76 +71,60 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
}

func TestMetricsProcessor_RecordInOut(t *testing.T) {
// Regardless of how many data points are ingested, emit just one
// Regardless of how many data points are ingested, emit 3
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
return md, nil
}

incomingMetrics := pmetric.NewMetrics()
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()

// Add 3 data points to the incoming
dps.AppendEmpty()
// Add 2 data points to the incoming
dps.AppendEmpty()
dps.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

mp, err := NewMetricsProcessor(context.Background(), set, &testMetricsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
assert.NoError(t, mp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_metric_points",
Description: "Number of metric points passed to the processor.",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_metric_points",
Description: "Number of metric points emitted from the processor.",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}
85 changes: 32 additions & 53 deletions processor/processorhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -86,66 +81,50 @@ func TestTracesProcessor_RecordInOut(t *testing.T) {
incomingTraces := ptrace.NewTraces()
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

// Add 3 records to the incoming
// Add 4 records to the incoming
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

tp, err := NewTracesProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces))
assert.NoError(t, tp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_spans",
Description: "Number of spans passed to the processor.",
Unit: "{spans}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_spans",
Description: "Number of spans emitted from the processor.",
Unit: "{spans}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}
Loading