Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor): remove opencensus dependencies #122

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)

// newBatchProcessor returns a new batch processor component.
func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error) {

func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch) (*batchProcessor, error) {
// use lower-case, to be consistent with http/2 headers.
mks := make([]string, len(cfg.MetadataKeys))
for i, k := range cfg.MetadataKeys {
Expand Down Expand Up @@ -190,7 +189,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
}
}

bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel)
bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
}
Expand Down Expand Up @@ -616,18 +615,18 @@ func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(set processor.CreateSettings, next consumer.Traces, cfg *Config, useOtel bool) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) }, useOtel)
func newBatchTracesProcessor(set processor.CreateSettings, next consumer.Traces, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) })
}

// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(set processor.CreateSettings, next consumer.Metrics, cfg *Config, useOtel bool) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) }, useOtel)
func newBatchMetricsProcessor(set processor.CreateSettings, next consumer.Metrics, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) })
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg *Config, useOtel bool) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }, useOtel)
func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) })
}

func recoverError(retErr *error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestBatchProcessorSpansPanicRecover(t *testing.T) {
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg, true)
bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true)
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true)
bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestBatchProcessorCancelContext(t *testing.T) {
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightSizeMiB << 20)),
szr: &ptrace.ProtoMarshaler{},
}
bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true)
bp, err := newBatchTracesProcessor(creationSet, bc, cfg)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -506,7 +506,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -562,7 +562,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
cfg.Timeout = 2 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -606,7 +606,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
telemetryTest(t, testBatchProcessorSentBySize)
}

func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry, useOtel bool) {
func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
sizer := &ptrace.ProtoMarshaler{}
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand All @@ -615,7 +615,7 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry, useOtel bool)
cfg.Timeout = 5 * time.Second
creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, useOtel)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -667,7 +667,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
telemetryTest(t, testBatchProcessorSentBySizeWithMaxSize)
}

func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, useOtel bool) {
func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
Expand All @@ -677,7 +677,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us
cfg.Timeout = 5 * time.Second
creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, useOtel)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -725,7 +725,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -770,7 +770,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -808,7 +808,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -857,7 +857,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
telemetryTest(t, testBatchMetricProcessorBatchSize)
}

func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel bool) {
func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
sizer := &pmetric.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -876,7 +876,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel

creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, useOtel)
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -947,7 +947,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -993,7 +993,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1100,7 +1100,7 @@ func runMetricsProcessorBenchmark(b *testing.B, cfg Config) {
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
metricsPerRequest := 1000
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(b, err)
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))

Expand Down Expand Up @@ -1148,7 +1148,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1197,7 +1197,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
telemetryTest(t, testBatchLogProcessorBatchSize)
}

func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel bool) {
func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
sizer := &plog.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -1214,7 +1214,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo

creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, useOtel)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1265,7 +1265,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1311,7 +1311,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1427,7 +1427,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
cfg.MetadataKeys = []string{"token1", "token2"}
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1527,7 +1527,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
cfg.MetadataCardinalityLimit = cardLimit
cfg.Timeout = 1 * time.Second
creationSet := processortest.NewNopCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1581,7 +1581,7 @@ func TestBatchZeroConfig(t *testing.T) {
sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -1622,7 +1622,7 @@ func TestBatchSplitOnly(t *testing.T) {
sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down
10 changes: 3 additions & 7 deletions collector/processor/concurrentbatchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ const (
defaultMetadataCardinalityLimit = 1000
)

// This featuregate might already be registered, but is access is internal to the collector repo.
// In the meantime this batchprocessor will only support otel for metrics (no support for opencensus).
var UseOtelForInternalMetricsfeatureGate = true

// NewFactory returns a new factory for the Batch processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
Expand All @@ -57,7 +53,7 @@ func createTraces(
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config), UseOtelForInternalMetricsfeatureGate)
return newBatchTracesProcessor(set, nextConsumer, cfg.(*Config))
}

func createMetrics(
Expand All @@ -66,7 +62,7 @@ func createMetrics(
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config), UseOtelForInternalMetricsfeatureGate)
return newBatchMetricsProcessor(set, nextConsumer, cfg.(*Config))
}

func createLogs(
Expand All @@ -75,5 +71,5 @@ func createLogs(
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config), UseOtelForInternalMetricsfeatureGate)
return newBatchLogsProcessor(set, nextConsumer, cfg.(*Config))
}
44 changes: 19 additions & 25 deletions collector/processor/concurrentbatchprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ module github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchp
go 1.21

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.92.0
go.opentelemetry.io/collector/component v0.92.0
go.opentelemetry.io/collector/config/configtelemetry v0.92.0
go.opentelemetry.io/collector/confmap v0.92.0
go.opentelemetry.io/collector/consumer v0.92.0
go.opentelemetry.io/collector/exporter v0.92.0
go.opentelemetry.io/collector/pdata v1.0.1
go.opentelemetry.io/collector/processor v0.92.0
go.opentelemetry.io/collector v0.90.1
go.opentelemetry.io/collector/component v0.90.1
go.opentelemetry.io/collector/config/configtelemetry v0.90.1
go.opentelemetry.io/collector/confmap v0.90.1
go.opentelemetry.io/collector/consumer v0.90.0
go.opentelemetry.io/collector/exporter v0.90.0
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/processor v0.90.0
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2
go.opentelemetry.io/otel/metric v1.21.0
Expand All @@ -33,12 +31,9 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -52,18 +47,17 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
go.opentelemetry.io/collector/config/configretry v0.92.0 // indirect
go.opentelemetry.io/collector/extension v0.92.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/extension v0.90.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading