diff --git a/.chloggen/add-encoding-extension-pulsarreceiver.yaml b/.chloggen/add-encoding-extension-pulsarreceiver.yaml new file mode 100644 index 000000000000..9b9e4f9473db --- /dev/null +++ b/.chloggen/add-encoding-extension-pulsarreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pulsarreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add newly added encoding extension to pulsarreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29322] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 000b221b0ff1..982b18d8673e 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -69,7 +69,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.92.0 // indirect @@ -82,14 +82,14 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor v0.92.0 // indirect @@ -495,6 +495,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect @@ -1160,3 +1161,15 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockda replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed => ../../testbed replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter => ../../exporter/syslogexporter + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension => ../../extension/encoding/textencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension => ../../extension/encoding/jsonlogencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension => ../../extension/encoding/zipkinencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension => ../../extension/encoding/jaegerencodingextension diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 6957f4e084b8..2e8e351a2c52 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -59,11 +59,11 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension v0.92.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension v0.92.1-0.20240112172857-83d463ceba06 github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder v0.92.0 @@ -81,7 +81,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.92.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.92.0 @@ -529,7 +529,7 @@ require ( github.com/nginxinc/nginx-prometheus-exporter v0.8.1-0.20201110005315-f5a5f8086c19 // indirect github.com/oklog/ulid/v2 v2.1.0 // indirect github.com/open-telemetry/opamp-go v0.10.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect @@ -556,14 +556,14 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.92.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect diff --git a/go.mod b/go.mod index 22ca73520fbb..1fd9fb5470da 100644 --- a/go.mod +++ b/go.mod @@ -500,6 +500,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/nginxinc/nginx-prometheus-exporter v0.8.1-0.20201110005315-f5a5f8086c19 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect @@ -511,7 +512,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.92.0 // indirect @@ -529,14 +530,14 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.92.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect @@ -1162,3 +1163,15 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockda replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed => ./testbed replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter => ./exporter/syslogexporter + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ./extension/encoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension => ./extension/encoding/textencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension => ./extension/encoding/jsonlogencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension => ./extension/encoding/zipkinencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension => ./extension/encoding/jaegerencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ./extension/encoding/otlpencodingextension diff --git a/receiver/pulsarreceiver/config.go b/receiver/pulsarreceiver/config.go index 20b799e0cd3c..1984f72b126d 100644 --- a/receiver/pulsarreceiver/config.go +++ b/receiver/pulsarreceiver/config.go @@ -20,7 +20,7 @@ type Config struct { // The Subscription that receiver will be consuming messages from (default "otlp_subscription") Subscription string `mapstructure:"subscription"` // Encoding of the messages (default "otlp_proto") - Encoding string `mapstructure:"encoding"` + EncodingExtension *component.ID `mapstructure:"encoding_extension"` // Name specifies the consumer name. ConsumerName string `mapstructure:"consumer_name"` // Set the path to the trusted TLS certificate file diff --git a/receiver/pulsarreceiver/config_test.go b/receiver/pulsarreceiver/config_test.go index dfa0ea121b2c..6fbd70ff4004 100644 --- a/receiver/pulsarreceiver/config_test.go +++ b/receiver/pulsarreceiver/config_test.go @@ -30,7 +30,6 @@ func TestLoadConfig(t *testing.T) { Endpoint: "pulsar://localhost:6500", ConsumerName: "otel-collector", Subscription: "otel-collector", - Encoding: defaultEncoding, TLSTrustCertsFilePath: "ca.pem", Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}}, }, diff --git a/receiver/pulsarreceiver/factory.go b/receiver/pulsarreceiver/factory.go index e084d6473a31..470aebc64385 100644 --- a/receiver/pulsarreceiver/factory.go +++ b/receiver/pulsarreceiver/factory.go @@ -23,63 +23,18 @@ const ( defaultServiceURL = "pulsar://localhost:6650" ) -// FactoryOption applies changes to PulsarExporterFactory. -type FactoryOption func(factory *pulsarReceiverFactory) - -// withTracesUnmarshalers adds Unmarshalers. -func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range tracesUnmarshalers { - factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - -// withMetricsUnmarshalers adds MetricsUnmarshalers. -func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range metricsUnmarshalers { - factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - -// withLogsUnmarshalers adds LogsUnmarshalers. -func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption { - return func(factory *pulsarReceiverFactory) { - for _, unmarshaler := range logsUnmarshalers { - factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler - } - } -} - // NewFactory creates Pulsar receiver factory. -func NewFactory(options ...FactoryOption) receiver.Factory { - - f := &pulsarReceiverFactory{ - tracesUnmarshalers: defaultTracesUnmarshalers(), - metricsUnmarshalers: defaultMetricsUnmarshalers(), - logsUnmarshalers: defaultLogsUnmarshalers(), - } - for _, o := range options { - o(f) - } +func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability), - receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability), - receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability), + receiver.WithTraces(createTracesReceiver, metadata.TracesStability), + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(createLogsReceiver, metadata.LogsStability), ) } -type pulsarReceiverFactory struct { - tracesUnmarshalers map[string]TracesUnmarshaler - metricsUnmarshalers map[string]MetricsUnmarshaler - logsUnmarshalers map[string]LogsUnmarshaler -} - -func (f *pulsarReceiverFactory) createTracesReceiver( +func createTracesReceiver( _ context.Context, set receiver.CreateSettings, cfg component.Config, @@ -89,14 +44,14 @@ func (f *pulsarReceiverFactory) createTracesReceiver( if len(c.Topic) == 0 { c.Topic = defaultTraceTopic } - r, err := newTracesReceiver(c, set, f.tracesUnmarshalers, nextConsumer) + r, err := newTracesReceiver(c, set, nextConsumer) if err != nil { return nil, err } return r, nil } -func (f *pulsarReceiverFactory) createMetricsReceiver( +func createMetricsReceiver( _ context.Context, set receiver.CreateSettings, cfg component.Config, @@ -106,14 +61,14 @@ func (f *pulsarReceiverFactory) createMetricsReceiver( if len(c.Topic) == 0 { c.Topic = defaultMeticsTopic } - r, err := newMetricsReceiver(c, set, f.metricsUnmarshalers, nextConsumer) + r, err := newMetricsReceiver(c, set, nextConsumer) if err != nil { return nil, err } return r, nil } -func (f *pulsarReceiverFactory) createLogsReceiver( +func createLogsReceiver( _ context.Context, set receiver.CreateSettings, cfg component.Config, @@ -123,7 +78,7 @@ func (f *pulsarReceiverFactory) createLogsReceiver( if len(c.Topic) == 0 { c.Topic = defaultLogsTopic } - r, err := newLogsReceiver(c, set, f.logsUnmarshalers, nextConsumer) + r, err := newLogsReceiver(c, set, nextConsumer) if err != nil { return nil, err } @@ -132,7 +87,6 @@ func (f *pulsarReceiverFactory) createLogsReceiver( func createDefaultConfig() component.Config { return &Config{ - Encoding: defaultEncoding, ConsumerName: defaultConsumerName, Subscription: defaultSubscription, Endpoint: defaultServiceURL, diff --git a/receiver/pulsarreceiver/factory_test.go b/receiver/pulsarreceiver/factory_test.go index cffcd4840d87..2d42001bf439 100644 --- a/receiver/pulsarreceiver/factory_test.go +++ b/receiver/pulsarreceiver/factory_test.go @@ -9,9 +9,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -19,7 +16,6 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() assert.Equal(t, &Config{ Topic: "", - Encoding: defaultEncoding, ConsumerName: defaultConsumerName, Subscription: defaultSubscription, Endpoint: defaultServiceURL, @@ -27,182 +23,27 @@ func TestCreateDefaultConfig(t *testing.T) { }, cfg) } -// trace -func TestCreateTracesReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()} - r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateTracesReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{tracesUnmarshalers: make(map[string]TracesUnmarshaler)} - r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - func Test_CreateTraceReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) - f := pulsarReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()} - recv, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + + recv, err := createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, recv) } -func TestWithTracesUnmarshalers(t *testing.T) { - unmarshaler := &customTracesUnmarshaler{} - f := NewFactory(withTracesUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, receiver) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, receiver) - }) -} - -// metrics -func TestCreateMetricsReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()} - r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateMetricsReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{metricsUnmarshalers: make(map[string]MetricsUnmarshaler)} - r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - func Test_CreateMetricsReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) - f := pulsarReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()} - recv, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + recv, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, recv) } -func TestWithMetricsUnmarshalers(t *testing.T) { - unmarshaler := &customMetricsUnmarshaler{} - f := NewFactory(withMetricsUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, receiver) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, receiver) - }) -} - -// logs -func TestCreateLogsReceiver_err_addr(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = "invalid:6650" - - f := pulsarReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} - r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - -func TestCreateLogsReceiver_err_marshallers(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - - f := pulsarReceiverFactory{logsUnmarshalers: make(map[string]LogsUnmarshaler)} - r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.Error(t, err) - assert.Nil(t, r) -} - func Test_CreateLogsReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Endpoint = defaultServiceURL - f := pulsarReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} - recv, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + cfg.Endpoint = defaultServiceURL + recv, err := createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, recv) } - -func TestWithLogsUnmarshalers(t *testing.T) { - unmarshaler := &customLogsUnmarshaler{} - f := NewFactory(withLogsUnmarshalers(unmarshaler)) - cfg := createDefaultConfig().(*Config) - - t.Run("custom_encoding", func(t *testing.T) { - cfg.Encoding = unmarshaler.Encoding() - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - require.NotNil(t, exporter) - }) - t.Run("default_encoding", func(t *testing.T) { - cfg.Encoding = defaultEncoding - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, exporter) - }) -} - -type customTracesUnmarshaler struct { -} - -type customMetricsUnmarshaler struct { -} - -type customLogsUnmarshaler struct { -} - -func (c customTracesUnmarshaler) Unmarshal([]byte) (ptrace.Traces, error) { - panic("implement me") -} - -func (c customTracesUnmarshaler) Encoding() string { - return "custom" -} - -func (c customMetricsUnmarshaler) Unmarshal([]byte) (pmetric.Metrics, error) { - panic("implement me") -} - -func (c customMetricsUnmarshaler) Encoding() string { - return "custom" -} - -func (c customLogsUnmarshaler) Unmarshal([]byte) (plog.Logs, error) { - panic("implement me") -} - -func (c customLogsUnmarshaler) Encoding() string { - return "custom" -} diff --git a/receiver/pulsarreceiver/go.mod b/receiver/pulsarreceiver/go.mod index 468630836d66..e67eb90900ed 100644 --- a/receiver/pulsarreceiver/go.mod +++ b/receiver/pulsarreceiver/go.mod @@ -4,20 +4,19 @@ go 1.20 require ( github.com/apache/pulsar-client-go v0.8.1 - github.com/apache/thrift v0.19.0 - github.com/gogo/protobuf v1.3.2 - github.com/jaegertracing/jaeger v1.53.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 - github.com/openzipkin/zipkin-go v0.4.2 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension v0.92.1-0.20240112172857-83d463ceba06 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension v0.92.1-0.20240112172857-83d463ceba06 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/confmap v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/consumer v0.92.1-0.20240112172857-83d463ceba06 - go.opentelemetry.io/collector/pdata v1.0.2-0.20240112172857-83d463ceba06 + go.opentelemetry.io/collector/extension v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/receiver v0.92.1-0.20240112172857-83d463ceba06 - go.opentelemetry.io/collector/semconv v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/zap v1.26.0 @@ -29,6 +28,7 @@ require ( github.com/AthenZ/athenz v1.10.39 // indirect github.com/DataDog/zstd v1.5.0 // indirect github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e // indirect + github.com/apache/thrift v0.19.0 // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -41,12 +41,14 @@ require ( github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/go-version v1.6.0 // indirect + github.com/jaegertracing/jaeger v1.53.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect github.com/klauspost/compress v1.17.4 // indirect @@ -62,7 +64,11 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect + github.com/openzipkin/zipkin-go v0.4.2 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -77,6 +83,8 @@ require ( go.opentelemetry.io/collector v0.92.1-0.20240112172857-83d463ceba06 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.92.1-0.20240112172857-83d463ceba06 // indirect go.opentelemetry.io/collector/featuregate v1.0.2-0.20240112172857-83d463ceba06 // indirect + go.opentelemetry.io/collector/pdata v1.0.2-0.20240112172857-83d463ceba06 // indirect + go.opentelemetry.io/collector/semconv v0.92.1-0.20240112172857-83d463ceba06 // indirect go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect @@ -114,3 +122,15 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension => ../../extension/encoding/jaegerencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension => ../../extension/encoding/zipkinencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension => ../../extension/encoding/jsonlogencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension => ../../extension/encoding/textencodingextension diff --git a/receiver/pulsarreceiver/go.sum b/receiver/pulsarreceiver/go.sum index f096e6b78e4e..fc2920743611 100644 --- a/receiver/pulsarreceiver/go.sum +++ b/receiver/pulsarreceiver/go.sum @@ -327,8 +327,9 @@ github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ib github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= @@ -459,6 +460,8 @@ go.opentelemetry.io/collector/confmap v0.92.1-0.20240112172857-83d463ceba06 h1:i go.opentelemetry.io/collector/confmap v0.92.1-0.20240112172857-83d463ceba06/go.mod h1:CmqTszB2uwiJ9ieEqISdecuoVuyt3jMnJ/9kD53GYHs= go.opentelemetry.io/collector/consumer v0.92.1-0.20240112172857-83d463ceba06 h1:zf0Zy0DwiN9rBWyhg/WWrTJar3CULGziEUG6YTkCXOE= go.opentelemetry.io/collector/consumer v0.92.1-0.20240112172857-83d463ceba06/go.mod h1:KhNeje82anY3f123+R/RKtc2Q8/rdnYYi4tG2WSnxqk= +go.opentelemetry.io/collector/extension v0.92.1-0.20240112172857-83d463ceba06 h1:Hbzik+qRPGEkT0MTMIu9HHokPUnQvh44yWyIVNLDpQ8= +go.opentelemetry.io/collector/extension v0.92.1-0.20240112172857-83d463ceba06/go.mod h1:gCukg3D3VIzfp8NXAhuaPadDQhe2Hg3YGtsBuOr06+Q= go.opentelemetry.io/collector/featuregate v1.0.2-0.20240112172857-83d463ceba06 h1:LN4c/jJkIAOJYFRTapRigYDp3k+4BllH4Pqr7TT5b2w= go.opentelemetry.io/collector/featuregate v1.0.2-0.20240112172857-83d463ceba06/go.mod h1:QQXjP4etmJQhkQ20j4P/rapWuItYxoFozg/iIwuKnYg= go.opentelemetry.io/collector/pdata v1.0.2-0.20240112172857-83d463ceba06 h1:v2SnEUqor+9FuJK3pPRR6jd342lzzmicuafWCrfvQZ8= diff --git a/receiver/pulsarreceiver/jaeger_unmarshaler.go b/receiver/pulsarreceiver/jaeger_unmarshaler.go deleted file mode 100644 index 28791ddb600e..000000000000 --- a/receiver/pulsarreceiver/jaeger_unmarshaler.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "bytes" - - "github.com/gogo/protobuf/jsonpb" - jaegerproto "github.com/jaegertracing/jaeger/model" - "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" -) - -// copy from kafka receiver -type jaegerProtoSpanUnmarshaler struct { -} - -var _ TracesUnmarshaler = (*jaegerProtoSpanUnmarshaler)(nil) - -func (j jaegerProtoSpanUnmarshaler) Unmarshal(bytes []byte) (ptrace.Traces, error) { - span := &jaegerproto.Span{} - err := span.Unmarshal(bytes) - if err != nil { - return ptrace.NewTraces(), err - } - return jaegerSpanToTraces(span) -} - -func (j jaegerProtoSpanUnmarshaler) Encoding() string { - return "jaeger_proto" -} - -type jaegerJSONSpanUnmarshaler struct { -} - -var _ TracesUnmarshaler = (*jaegerJSONSpanUnmarshaler)(nil) - -func (j jaegerJSONSpanUnmarshaler) Unmarshal(data []byte) (ptrace.Traces, error) { - span := &jaegerproto.Span{} - err := jsonpb.Unmarshal(bytes.NewReader(data), span) - if err != nil { - return ptrace.NewTraces(), err - } - return jaegerSpanToTraces(span) -} - -func (j jaegerJSONSpanUnmarshaler) Encoding() string { - return "jaeger_json" -} - -func jaegerSpanToTraces(span *jaegerproto.Span) (ptrace.Traces, error) { - batch := jaegerproto.Batch{ - Spans: []*jaegerproto.Span{span}, - Process: span.Process, - } - return jaeger.ProtoToTraces([]*jaegerproto.Batch{&batch}) -} diff --git a/receiver/pulsarreceiver/jaeger_unmarshaler_test.go b/receiver/pulsarreceiver/jaeger_unmarshaler_test.go deleted file mode 100644 index 70849f70ba59..000000000000 --- a/receiver/pulsarreceiver/jaeger_unmarshaler_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "bytes" - "testing" - - "github.com/gogo/protobuf/jsonpb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" -) - -// copy from kafka receiver -func TestUnmarshalJaeger(t *testing.T) { - td := ptrace.NewTraces() - span := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() - span.SetName("foo") - span.SetStartTimestamp(pcommon.Timestamp(10)) - span.SetEndTimestamp(pcommon.Timestamp(20)) - span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) - span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) - batches, err := jaeger.ProtoFromTraces(td) - require.NoError(t, err) - - protoBytes, err := batches[0].Spans[0].Marshal() - require.NoError(t, err) - - jsonMarshaler := &jsonpb.Marshaler{} - jsonBytes := new(bytes.Buffer) - require.NoError(t, jsonMarshaler.Marshal(jsonBytes, batches[0].Spans[0])) - - tests := []struct { - unmarshaler TracesUnmarshaler - encoding string - bytes []byte - }{ - { - unmarshaler: jaegerProtoSpanUnmarshaler{}, - encoding: "jaeger_proto", - bytes: protoBytes, - }, - { - unmarshaler: jaegerJSONSpanUnmarshaler{}, - encoding: "jaeger_json", - bytes: jsonBytes.Bytes(), - }, - } - for _, test := range tests { - t.Run(test.encoding, func(t *testing.T) { - got, err := test.unmarshaler.Unmarshal(test.bytes) - require.NoError(t, err) - assert.Equal(t, td, got) - assert.Equal(t, test.encoding, test.unmarshaler.Encoding()) - }) - } -} - -func TestUnmarshalJaegerProto_error(t *testing.T) { - p := jaegerProtoSpanUnmarshaler{} - got, err := p.Unmarshal([]byte("+$%")) - assert.Equal(t, ptrace.NewTraces(), got) - assert.Error(t, err) -} - -func TestUnmarshalJaegerJSON_error(t *testing.T) { - p := jaegerJSONSpanUnmarshaler{} - got, err := p.Unmarshal([]byte("+$%")) - assert.Equal(t, ptrace.NewTraces(), got) - assert.Error(t, err) -} diff --git a/receiver/pulsarreceiver/pdata_unmarshaler.go b/receiver/pulsarreceiver/pdata_unmarshaler.go deleted file mode 100644 index 776e250499ff..000000000000 --- a/receiver/pulsarreceiver/pdata_unmarshaler.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -// copy from kafka receiver -type pdataLogsUnmarshaler struct { - plog.Unmarshaler - encoding string -} - -func (p pdataLogsUnmarshaler) Unmarshal(buf []byte) (plog.Logs, error) { - return p.Unmarshaler.UnmarshalLogs(buf) -} - -func (p pdataLogsUnmarshaler) Encoding() string { - return p.encoding -} - -func newPdataLogsUnmarshaler(unmarshaler plog.Unmarshaler, encoding string) LogsUnmarshaler { - return pdataLogsUnmarshaler{ - Unmarshaler: unmarshaler, - encoding: encoding, - } -} - -type pdataTracesUnmarshaler struct { - ptrace.Unmarshaler - encoding string -} - -func (p pdataTracesUnmarshaler) Unmarshal(buf []byte) (ptrace.Traces, error) { - return p.Unmarshaler.UnmarshalTraces(buf) -} - -func (p pdataTracesUnmarshaler) Encoding() string { - return p.encoding -} - -func newPdataTracesUnmarshaler(unmarshaler ptrace.Unmarshaler, encoding string) TracesUnmarshaler { - return pdataTracesUnmarshaler{ - Unmarshaler: unmarshaler, - encoding: encoding, - } -} - -type pdataMetricsUnmarshaler struct { - pmetric.Unmarshaler - encoding string -} - -func (p pdataMetricsUnmarshaler) Unmarshal(buf []byte) (pmetric.Metrics, error) { - return p.Unmarshaler.UnmarshalMetrics(buf) -} - -func (p pdataMetricsUnmarshaler) Encoding() string { - return p.encoding -} - -func newPdataMetricsUnmarshaler(unmarshaler pmetric.Unmarshaler, encoding string) MetricsUnmarshaler { - return pdataMetricsUnmarshaler{ - Unmarshaler: unmarshaler, - encoding: encoding, - } -} diff --git a/receiver/pulsarreceiver/pdata_unmarshaler_test.go b/receiver/pulsarreceiver/pdata_unmarshaler_test.go deleted file mode 100644 index ba18e2869b0f..000000000000 --- a/receiver/pulsarreceiver/pdata_unmarshaler_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -// copy from kafka receiver -func TestNewPdataTracesUnmarshaler(t *testing.T) { - um := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, "test") - assert.Equal(t, "test", um.Encoding()) -} - -func TestNewPdataMetricsUnmarshaler(t *testing.T) { - um := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, "test") - assert.Equal(t, "test", um.Encoding()) -} - -func TestNewPdataLogsUnmarshaler(t *testing.T) { - um := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, "test") - assert.Equal(t, "test", um.Encoding()) -} diff --git a/receiver/pulsarreceiver/pulsar_receiver.go b/receiver/pulsarreceiver/pulsar_receiver.go index c4b08991ce30..3264cd8dd2a7 100644 --- a/receiver/pulsarreceiver/pulsar_receiver.go +++ b/receiver/pulsarreceiver/pulsar_receiver.go @@ -6,9 +6,12 @@ package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collec import ( "context" "errors" + "fmt" "strings" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" + "github.com/apache/pulsar-client-go/pulsar" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -16,7 +19,7 @@ import ( "go.uber.org/zap" ) -var errUnrecognizedEncoding = errors.New("unrecognized encoding") +var errIncorrectEncoding = "encoding %s doesn't implement %s" const alreadyClosedError = "AlreadyClosedError" @@ -26,17 +29,13 @@ type pulsarTracesConsumer struct { client pulsar.Client cancel context.CancelFunc consumer pulsar.Consumer - unmarshaler TracesUnmarshaler + unmarshaler encoding.TracesUnmarshalerExtension settings receiver.CreateSettings consumerOptions pulsar.ConsumerOptions + encoding *component.ID } -func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*pulsarTracesConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - +func newTracesReceiver(config Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (*pulsarTracesConsumer, error) { options := config.clientOptions() client, err := pulsar.NewClient(options) if err != nil { @@ -51,17 +50,27 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers return &pulsarTracesConsumer{ tracesConsumer: nextConsumer, topic: config.Topic, - unmarshaler: unmarshaler, + encoding: config.EncodingID, settings: set, client: client, consumerOptions: consumerOptions, }, nil } -func (c *pulsarTracesConsumer) Start(context.Context, component.Host) error { +func (c *pulsarTracesConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel + extension, ok := host.GetExtensions()[*c.encoding] + if !ok { + return fmt.Errorf("extension '%s' not found", c.encoding) + } + unmarshaler, ok := extension.(encoding.TracesUnmarshalerExtension) + if !ok { + return fmt.Errorf(errIncorrectEncoding, c.encoding, "TracesUnmarshaler") + } + c.unmarshaler = unmarshaler + _consumer, err := c.client.Subscribe(c.consumerOptions) if err == nil { c.consumer = _consumer @@ -94,7 +103,7 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { continue } - traces, err := unmarshaler.Unmarshal(message.Payload()) + traces, err := unmarshaler.UnmarshalTraces(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err)) c.consumer.Ack(message) @@ -120,21 +129,17 @@ func (c *pulsarTracesConsumer) Shutdown(context.Context) error { type pulsarMetricsConsumer struct { metricsConsumer consumer.Metrics - unmarshaler MetricsUnmarshaler + unmarshaler encoding.MetricsUnmarshalerExtension topic string client pulsar.Client consumer pulsar.Consumer cancel context.CancelFunc settings receiver.CreateSettings consumerOptions pulsar.ConsumerOptions + encoding *component.ID } -func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*pulsarMetricsConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - +func newMetricsReceiver(config Config, set receiver.CreateSettings, nextConsumer consumer.Metrics) (*pulsarMetricsConsumer, error) { options := config.clientOptions() client, err := pulsar.NewClient(options) if err != nil { @@ -149,16 +154,26 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers return &pulsarMetricsConsumer{ metricsConsumer: nextConsumer, topic: config.Topic, - unmarshaler: unmarshaler, + encoding: config.EncodingID, settings: set, client: client, consumerOptions: consumerOptions, }, nil } -func (c *pulsarMetricsConsumer) Start(context.Context, component.Host) error { +func (c *pulsarMetricsConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel + extension, ok := host.GetExtensions()[*c.encoding] + if !ok { + return fmt.Errorf("extension '%s' not found", c.encoding) + } + unmarshaler, ok := extension.(encoding.MetricsUnmarshalerExtension) + if !ok { + return fmt.Errorf(errIncorrectEncoding, c.encoding, "MetricsUnmarshaler") + + } + c.unmarshaler = unmarshaler _consumer, err := c.client.Subscribe(c.consumerOptions) if err == nil { @@ -194,7 +209,7 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { continue } - metrics, err := unmarshaler.Unmarshal(message.Payload()) + metrics, err := unmarshaler.UnmarshalMetrics(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err)) c.consumer.Ack(message) @@ -221,21 +236,17 @@ func (c *pulsarMetricsConsumer) Shutdown(context.Context) error { type pulsarLogsConsumer struct { logsConsumer consumer.Logs - unmarshaler LogsUnmarshaler + unmarshaler encoding.LogsUnmarshalerExtension topic string client pulsar.Client consumer pulsar.Consumer cancel context.CancelFunc settings receiver.CreateSettings consumerOptions pulsar.ConsumerOptions + encoding *component.ID } -func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*pulsarLogsConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] - if nil == unmarshaler { - return nil, errUnrecognizedEncoding - } - +func newLogsReceiver(config Config, set receiver.CreateSettings, nextConsumer consumer.Logs) (*pulsarLogsConsumer, error) { options := config.clientOptions() client, err := pulsar.NewClient(options) if err != nil { @@ -250,18 +261,27 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma return &pulsarLogsConsumer{ logsConsumer: nextConsumer, topic: config.Topic, - cancel: nil, - unmarshaler: unmarshaler, + encoding: config.EncodingID, settings: set, client: client, consumerOptions: consumerOptions, }, nil } -func (c *pulsarLogsConsumer) Start(context.Context, component.Host) error { +func (c *pulsarLogsConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel + extension, ok := host.GetExtensions()[*c.encoding] + if !ok { + return fmt.Errorf("extension '%s' not found", c.encoding) + } + unmarshaler, ok := extension.(encoding.LogsUnmarshalerExtension) + if !ok { + return fmt.Errorf(errIncorrectEncoding, c.encoding, "LogsUnmarshaler") + } + c.unmarshaler = unmarshaler + _consumer, err := c.client.Subscribe(c.consumerOptions) if err == nil { c.consumer = _consumer @@ -294,7 +314,7 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { continue } - logs, err := unmarshaler.Unmarshal(message.Payload()) + logs, err := unmarshaler.UnmarshalLogs(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err)) c.consumer.Ack(message) diff --git a/receiver/pulsarreceiver/pulsar_receiver_test.go b/receiver/pulsarreceiver/pulsar_receiver_test.go index a221c368c7ba..392b7ca42806 100644 --- a/receiver/pulsarreceiver/pulsar_receiver_test.go +++ b/receiver/pulsarreceiver/pulsar_receiver_test.go @@ -4,17 +4,409 @@ package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" import ( + "context" + "errors" "testing" + "time" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/receiver/receivertest" ) -func Test_newTracesReceiver_err(t *testing.T) { - c := Config{ - Encoding: defaultEncoding, +type mockHost struct { + component.Host + ext map[component.ID]component.Component +} + +func (mh mockHost) GetExtensions() map[component.ID]component.Component { + return mh.ext +} + +type mockPulsarConsumer struct { + messageChan chan pulsar.ConsumerMessage +} + +func (c mockPulsarConsumer) Subscription() string { + panic("implement me") +} + +func (c mockPulsarConsumer) Unsubscribe() error { + panic("implement me") +} + +func (c mockPulsarConsumer) Receive(_ context.Context) (pulsar.Message, error) { + msg, ok := <-c.messageChan + if !ok { + return nil, errors.New(alreadyClosedError) + } + return msg, nil +} + +func (c mockPulsarConsumer) Chan() <-chan pulsar.ConsumerMessage { + return c.messageChan +} + +func (c mockPulsarConsumer) Ack(_ pulsar.Message) { + // no-op +} + +func (c mockPulsarConsumer) ReconsumeLater(_ pulsar.Message, _ time.Duration) { + panic("implement me") +} + +func (c mockPulsarConsumer) AckID(_ pulsar.MessageID) { + panic("implement me") +} + +func (c mockPulsarConsumer) Nack(_ pulsar.Message) { + // no-op +} + +func (c mockPulsarConsumer) NackID(_ pulsar.MessageID) { + panic("implement me") +} + +func (c mockPulsarConsumer) Close() {} + +func (c mockPulsarConsumer) Seek(_ pulsar.MessageID) error { + panic("implement me") +} + +func (c mockPulsarConsumer) SeekByTime(_ time.Time) error { + panic("implement me") +} + +func (c mockPulsarConsumer) Name() string { + panic("implement me") +} + +type mockPulsarMessage struct { + payload []byte +} + +func (msg mockPulsarMessage) Topic() string { + panic("implement me") +} + +func (msg mockPulsarMessage) Properties() map[string]string { + panic("implement me") +} + +func (msg mockPulsarMessage) Payload() []byte { + return msg.payload +} + +func (msg mockPulsarMessage) ID() pulsar.MessageID { + panic("implement me") +} + +func (msg mockPulsarMessage) PublishTime() time.Time { + panic("implement me") +} + +func (msg mockPulsarMessage) EventTime() time.Time { + panic("implement me") +} + +func (msg mockPulsarMessage) Key() string { + panic("implement me") +} + +func (msg mockPulsarMessage) OrderingKey() string { + panic("implement me") +} + +func (msg mockPulsarMessage) RedeliveryCount() uint32 { + panic("implement me") +} + +func (msg mockPulsarMessage) IsReplicated() bool { + panic("implement me") +} + +func (msg mockPulsarMessage) GetReplicatedFrom() string { + panic("implement me") +} + +func (msg mockPulsarMessage) GetSchemaValue(_ interface{}) error { + panic("implement me") +} + +func (msg mockPulsarMessage) ProducerName() string { + panic("implement me") +} + +func (msg mockPulsarMessage) GetEncryptionContext() *pulsar.EncryptionContext { + panic("implement me") +} + +type mockPulsarClient struct{} + +func (_ mockPulsarClient) CreateProducer(pulsar.ProducerOptions) (pulsar.Producer, error) { + panic("implement me") +} + +func (_ mockPulsarClient) Subscribe(pulsar.ConsumerOptions) (pulsar.Consumer, error) { + return mockPulsarConsumer{}, nil +} + +func (_ mockPulsarClient) CreateReader(pulsar.ReaderOptions) (pulsar.Reader, error) { + panic("implement me") +} + +func (_ mockPulsarClient) TopicPartitions(topic string) ([]string, error) { + panic("implement me") +} + +func (_ mockPulsarClient) Close() {} + +func TestTracesReceiverCreate(t *testing.T) { + factory := NewFactory() + _, err := factory.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), factory.CreateDefaultConfig(), nil) + assert.Nil(t, err) +} + +func TestTracesReceiverEncoding(t *testing.T) { + tests := []struct { + name string + id component.ID + getExtension func() (extension.Extension, error) + expectedErr string + }{ + { + name: "otlp", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := otlpencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "jaeger", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := jaegerencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "zipkin", + id: component.NewIDWithName(component.DataTypeTraces, "zipkin"), + getExtension: func() (extension.Extension, error) { + factory := zipkinencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "jsonlog", + id: component.NewIDWithName(component.DataTypeTraces, "jsonlog"), + getExtension: func() (extension.Extension, error) { + factory := jsonlogencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement TracesUnmarshaler", + }, + { + name: "text", + id: component.NewIDWithName(component.DataTypeTraces, "text"), + getExtension: func() (extension.Extension, error) { + factory := textencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement TracesUnmarshaler", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ext, err := test.getExtension() + assert.Nil(t, err) + host := mockHost{ + ext: map[component.ID]component.Component{ + test.id: ext, + }, + } + receiver := &pulsarTracesConsumer{ + tracesConsumer: nil, + encoding: &test.id, + settings: receivertest.NewNopCreateSettings(), + client: mockPulsarClient{}, + } + if test.expectedErr != "" { + assert.ErrorContains(t, receiver.Start(context.Background(), host), test.expectedErr) + } else { + assert.NoError(t, receiver.Start(context.Background(), host)) + } + }) + } +} + +func TestMetricsReceiverCreate(t *testing.T) { + factory := NewFactory() + _, err := factory.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), factory.CreateDefaultConfig(), nil) + assert.Nil(t, err) +} + +func TestMetricsReceiverEncoding(t *testing.T) { + tests := []struct { + name string + id component.ID + getExtension func() (extension.Extension, error) + expectedErr string + }{ + { + name: "otlp", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := otlpencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "jaeger", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := jaegerencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement MetricsUnmarshaler", + }, + { + name: "zipkin", + id: component.NewIDWithName(component.DataTypeTraces, "zipkin"), + getExtension: func() (extension.Extension, error) { + factory := zipkinencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement MetricsUnmarshaler", + }, + { + name: "jsonlog", + id: component.NewIDWithName(component.DataTypeTraces, "jsonlog"), + getExtension: func() (extension.Extension, error) { + factory := jsonlogencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement MetricsUnmarshaler", + }, + { + name: "text", + id: component.NewIDWithName(component.DataTypeTraces, "text"), + getExtension: func() (extension.Extension, error) { + factory := textencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement MetricsUnmarshaler", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ext, err := test.getExtension() + assert.Nil(t, err) + host := mockHost{ + ext: map[component.ID]component.Component{ + test.id: ext, + }, + } + receiver := &pulsarMetricsConsumer{ + metricsConsumer: nil, + encoding: &test.id, + settings: receivertest.NewNopCreateSettings(), + client: mockPulsarClient{}, + } + if test.expectedErr != "" { + assert.ErrorContains(t, receiver.Start(context.Background(), host), test.expectedErr) + } else { + assert.NoError(t, receiver.Start(context.Background(), host)) + } + }) + } +} + +func TestLogsReceiverCreate(t *testing.T) { + factory := NewFactory() + _, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), factory.CreateDefaultConfig(), nil) + assert.Nil(t, err) +} + +func TestLogsReceiverEncoding(t *testing.T) { + tests := []struct { + name string + id component.ID + getExtension func() (extension.Extension, error) + expectedErr string + }{ + { + name: "otlp", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := otlpencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "jaeger", + id: component.NewIDWithName(component.DataTypeTraces, "otlp"), + getExtension: func() (extension.Extension, error) { + factory := jaegerencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement LogsUnmarshaler", + }, + { + name: "zipkin", + id: component.NewIDWithName(component.DataTypeTraces, "zipkin"), + getExtension: func() (extension.Extension, error) { + factory := zipkinencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + expectedErr: "doesn't implement LogsUnmarshaler", + }, + { + name: "jsonlog", + id: component.NewIDWithName(component.DataTypeTraces, "jsonlog"), + getExtension: func() (extension.Extension, error) { + factory := jsonlogencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + { + name: "text", + id: component.NewIDWithName(component.DataTypeTraces, "text"), + getExtension: func() (extension.Extension, error) { + factory := textencodingextension.NewFactory() + return factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), factory.CreateDefaultConfig()) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ext, err := test.getExtension() + assert.Nil(t, err) + host := mockHost{ + ext: map[component.ID]component.Component{ + test.id: ext, + }, + } + receiver := &pulsarLogsConsumer{ + logsConsumer: nil, + encoding: &test.id, + settings: receivertest.NewNopCreateSettings(), + client: mockPulsarClient{}, + } + if test.expectedErr != "" { + assert.ErrorContains(t, receiver.Start(context.Background(), host), test.expectedErr) + } else { + assert.NoError(t, receiver.Start(context.Background(), host)) + } + }) } - _, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) - assert.Error(t, err) } diff --git a/receiver/pulsarreceiver/unmarshaler.go b/receiver/pulsarreceiver/unmarshaler.go deleted file mode 100644 index 12ae7178b52f..000000000000 --- a/receiver/pulsarreceiver/unmarshaler.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" -) - -// copy from kafka receiver - -// TracesUnmarshaler deserializes the message body. -type TracesUnmarshaler interface { - // Unmarshal deserializes the message body into tracesConsumer. - Unmarshal([]byte) (ptrace.Traces, error) - - // Encoding of the serialized messages. - Encoding() string -} - -// MetricsUnmarshaler deserializes the message body -type MetricsUnmarshaler interface { - // Unmarshal deserializes the message body into tracesConsumer - Unmarshal([]byte) (pmetric.Metrics, error) - - // Encoding of the serialized messages - Encoding() string -} - -// LogsUnmarshaler deserializes the message body. -type LogsUnmarshaler interface { - // Unmarshal deserializes the message body into tracesConsumer. - Unmarshal([]byte) (plog.Logs, error) - - // Encoding of the serialized messages. - Encoding() string -} - -// defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. -func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { - otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) - jaegerProto := jaegerProtoSpanUnmarshaler{} - jaegerJSON := jaegerJSONSpanUnmarshaler{} - zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") - zipkinJSON := newPdataTracesUnmarshaler(zipkinv2.NewJSONTracesUnmarshaler(false), "zipkin_json") - zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") - return map[string]TracesUnmarshaler{ - otlpPb.Encoding(): otlpPb, - jaegerProto.Encoding(): jaegerProto, - jaegerJSON.Encoding(): jaegerJSON, - zipkinProto.Encoding(): zipkinProto, - zipkinJSON.Encoding(): zipkinJSON, - zipkinThrift.Encoding(): zipkinThrift, - } -} - -func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { - otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) - return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, - } -} - -func defaultLogsUnmarshalers() map[string]LogsUnmarshaler { - otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) - return map[string]LogsUnmarshaler{ - otlpPb.Encoding(): otlpPb, - } -} diff --git a/receiver/pulsarreceiver/unmarshaler_test.go b/receiver/pulsarreceiver/unmarshaler_test.go deleted file mode 100644 index eacdc46eef0b..000000000000 --- a/receiver/pulsarreceiver/unmarshaler_test.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// copy from kafka receiver -func TestDefaultTracesUnMarshaler(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - "jaeger_proto", - "jaeger_json", - "zipkin_proto", - "zipkin_json", - "zipkin_thrift", - } - marshalers := defaultTracesUnmarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { - t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) - assert.NotNil(t, m) - }) - } -} - -func TestDefaultMetricsUnMarshaler(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - } - marshalers := defaultMetricsUnmarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { - t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) - assert.NotNil(t, m) - }) - } -} - -func TestDefaultLogsUnMarshaler(t *testing.T) { - expectedEncodings := []string{ - "otlp_proto", - } - marshalers := defaultLogsUnmarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) - for _, e := range expectedEncodings { - t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) - assert.NotNil(t, m) - }) - } -} diff --git a/receiver/pulsarreceiver/zipkin_unmarshaler.go b/receiver/pulsarreceiver/zipkin_unmarshaler.go deleted file mode 100644 index 3a82b75cf028..000000000000 --- a/receiver/pulsarreceiver/zipkin_unmarshaler.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" -) - -// copy from kafka receiver -const ( - zipkinProtobufEncoding = "zipkin_proto" - zipkinJSONEncoding = "zipkin_json" - zipkinThriftEncoding = "zipkin_thrift" -) - -func newZipkinProtobufUnmarshaler() TracesUnmarshaler { - return newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), zipkinProtobufEncoding) -} - -func newZipkinJSONUnmarshaler() TracesUnmarshaler { - return newPdataTracesUnmarshaler(zipkinv2.NewJSONTracesUnmarshaler(false), zipkinJSONEncoding) -} - -func newZipkinThriftUnmarshaler() TracesUnmarshaler { - return newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), zipkinThriftEncoding) -} diff --git a/receiver/pulsarreceiver/zipkin_unmarshaler_test.go b/receiver/pulsarreceiver/zipkin_unmarshaler_test.go deleted file mode 100644 index 1e2765511c2e..000000000000 --- a/receiver/pulsarreceiver/zipkin_unmarshaler_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package pulsarreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/pulsarreceiver" - -import ( - "context" - "testing" - - "github.com/apache/thrift/lib/go/thrift" - "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" - "github.com/openzipkin/zipkin-go/proto/zipkin_proto3" - zipkinreporter "github.com/openzipkin/zipkin-go/reporter" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" -) - -// copy from kafka receiver -var v2FromTranslator zipkinv2.FromTranslator - -func TestUnmarshalZipkin(t *testing.T) { - td := ptrace.NewTraces() - rs := td.ResourceSpans().AppendEmpty() - rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "my_service") - span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() - span.SetName("foo") - span.SetStartTimestamp(pcommon.Timestamp(1597759000)) - span.SetEndTimestamp(pcommon.Timestamp(1597769000)) - span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) - span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) - span.SetParentSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0}) - spans, err := v2FromTranslator.FromTraces(td) - require.NoError(t, err) - - serializer := zipkinreporter.JSONSerializer{} - jsonBytes, err := serializer.Serialize(spans) - require.NoError(t, err) - - tSpan := &zipkincore.Span{Name: "foo"} - thriftTransport := thrift.NewTMemoryBuffer() - protocolTransport := thrift.NewTBinaryProtocolConf(thriftTransport, nil) - require.NoError(t, protocolTransport.WriteListBegin(context.Background(), thrift.STRUCT, 1)) - err = tSpan.Write(context.Background(), protocolTransport) - require.NoError(t, err) - require.NoError(t, protocolTransport.WriteListEnd(context.Background())) - - tdThrift, err := newZipkinThriftUnmarshaler().Unmarshal(thriftTransport.Buffer.Bytes()) - require.NoError(t, err) - - protoBytes, err := new(zipkin_proto3.SpanSerializer).Serialize(spans) - require.NoError(t, err) - - tests := []struct { - unmarshaler TracesUnmarshaler - encoding string - bytes []byte - expected ptrace.Traces - }{ - { - unmarshaler: newZipkinProtobufUnmarshaler(), - encoding: "zipkin_proto", - bytes: protoBytes, - expected: td, - }, - { - unmarshaler: newZipkinJSONUnmarshaler(), - encoding: "zipkin_json", - bytes: jsonBytes, - expected: td, - }, - { - unmarshaler: newZipkinThriftUnmarshaler(), - encoding: "zipkin_thrift", - bytes: thriftTransport.Buffer.Bytes(), - expected: tdThrift, - }, - } - for _, test := range tests { - t.Run(test.encoding, func(t *testing.T) { - traces, err := test.unmarshaler.Unmarshal(test.bytes) - require.NoError(t, err) - assert.Equal(t, test.expected, traces) - assert.Equal(t, test.encoding, test.unmarshaler.Encoding()) - }) - } -} - -func TestUnmarshalZipkinThrift_error(t *testing.T) { - p := newZipkinThriftUnmarshaler() - _, err := p.Unmarshal([]byte("+$%")) - assert.Error(t, err) -} - -func TestUnmarshalZipkinJSON_error(t *testing.T) { - p := newZipkinJSONUnmarshaler() - _, err := p.Unmarshal([]byte("+$%")) - assert.Error(t, err) -} - -func TestUnmarshalZipkinProto_error(t *testing.T) { - p := newZipkinProtobufUnmarshaler() - _, err := p.Unmarshal([]byte("+$%")) - assert.Error(t, err) -}