Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/pulsarreceiver] - Add encoding extension #29322

Closed
Closed
27 changes: 27 additions & 0 deletions .chloggen/add-encoding-extension-pulsarreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pulsarreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add newly added encoding extension to pulsarreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29322]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
19 changes: 16 additions & 3 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.92.0 // indirect
Expand All @@ -82,14 +82,14 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor v0.92.0 // indirect
Expand Down Expand Up @@ -495,6 +495,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect
Expand Down Expand Up @@ -1160,3 +1161,15 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockda
replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed => ../../testbed

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter => ../../exporter/syslogexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension => ../../extension/encoding/textencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension => ../../extension/encoding/jsonlogencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension => ../../extension/encoding/zipkinencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension => ../../extension/encoding/jaegerencodingextension
18 changes: 9 additions & 9 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder v0.92.0
Expand All @@ -81,7 +81,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.92.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.92.0
Expand Down Expand Up @@ -529,7 +529,7 @@ require (
github.com/nginxinc/nginx-prometheus-exporter v0.8.1-0.20201110005315-f5a5f8086c19 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/open-telemetry/opamp-go v0.10.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect
Expand All @@ -556,14 +556,14 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.92.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
Expand Down
19 changes: 16 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/nginxinc/nginx-prometheus-exporter v0.8.1-0.20201110005315-f5a5f8086c19 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight v0.92.0 // indirect
Expand All @@ -511,7 +512,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.92.0 // indirect
Expand All @@ -529,14 +530,14 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/skywalking v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.92.1-0.20240112172857-83d463ceba06 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.92.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
Expand Down Expand Up @@ -1162,3 +1163,15 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed/mockda
replace github.com/open-telemetry/opentelemetry-collector-contrib/testbed => ./testbed

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter => ./exporter/syslogexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ./extension/encoding

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/textencodingextension => ./extension/encoding/textencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension => ./extension/encoding/jsonlogencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension => ./extension/encoding/zipkinencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jaegerencodingextension => ./extension/encoding/jaegerencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ./extension/encoding/otlpencodingextension
2 changes: 1 addition & 1 deletion receiver/pulsarreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Config struct {
// The Subscription that receiver will be consuming messages from (default "otlp_subscription")
Subscription string `mapstructure:"subscription"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
EncodingID *component.ID `mapstructure:"encoding"`
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
// Name specifies the consumer name.
ConsumerName string `mapstructure:"consumer_name"`
// Set the path to the trusted TLS certificate file
Expand Down
1 change: 0 additions & 1 deletion receiver/pulsarreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "pulsar://localhost:6500",
ConsumerName: "otel-collector",
Subscription: "otel-collector",
Encoding: defaultEncoding,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default behavior if encoding_extension isn't set? I believe it should be OTLP encoding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so.
Will change it to OTLP

TLSTrustCertsFilePath: "ca.pem",
Authentication: Authentication{TLS: &TLS{CertFile: "cert.pem", KeyFile: "key.pem"}},
},
Expand Down
66 changes: 10 additions & 56 deletions receiver/pulsarreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,63 +23,18 @@ const (
defaultServiceURL = "pulsar://localhost:6650"
)

// FactoryOption applies changes to PulsarExporterFactory.
type FactoryOption func(factory *pulsarReceiverFactory)

// withTracesUnmarshalers adds Unmarshalers.
func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption {
return func(factory *pulsarReceiverFactory) {
for _, unmarshaler := range tracesUnmarshalers {
factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// withMetricsUnmarshalers adds MetricsUnmarshalers.
func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
return func(factory *pulsarReceiverFactory) {
for _, unmarshaler := range metricsUnmarshalers {
factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// withLogsUnmarshalers adds LogsUnmarshalers.
func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
return func(factory *pulsarReceiverFactory) {
for _, unmarshaler := range logsUnmarshalers {
factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// NewFactory creates Pulsar receiver factory.
func NewFactory(options ...FactoryOption) receiver.Factory {

f := &pulsarReceiverFactory{
tracesUnmarshalers: defaultTracesUnmarshalers(),
metricsUnmarshalers: defaultMetricsUnmarshalers(),
logsUnmarshalers: defaultLogsUnmarshalers(),
}
for _, o := range options {
o(f)
}
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithTraces(f.createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(f.createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(f.createLogsReceiver, metadata.LogsStability),
receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
)
}

type pulsarReceiverFactory struct {
tracesUnmarshalers map[string]TracesUnmarshaler
metricsUnmarshalers map[string]MetricsUnmarshaler
logsUnmarshalers map[string]LogsUnmarshaler
}

func (f *pulsarReceiverFactory) createTracesReceiver(
func createTracesReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
Expand All @@ -89,14 +44,14 @@ func (f *pulsarReceiverFactory) createTracesReceiver(
if len(c.Topic) == 0 {
c.Topic = defaultTraceTopic
}
r, err := newTracesReceiver(c, set, f.tracesUnmarshalers, nextConsumer)
r, err := newTracesReceiver(c, set, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}

func (f *pulsarReceiverFactory) createMetricsReceiver(
func createMetricsReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
Expand All @@ -106,14 +61,14 @@ func (f *pulsarReceiverFactory) createMetricsReceiver(
if len(c.Topic) == 0 {
c.Topic = defaultMeticsTopic
}
r, err := newMetricsReceiver(c, set, f.metricsUnmarshalers, nextConsumer)
r, err := newMetricsReceiver(c, set, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}

func (f *pulsarReceiverFactory) createLogsReceiver(
func createLogsReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
Expand All @@ -123,7 +78,7 @@ func (f *pulsarReceiverFactory) createLogsReceiver(
if len(c.Topic) == 0 {
c.Topic = defaultLogsTopic
}
r, err := newLogsReceiver(c, set, f.logsUnmarshalers, nextConsumer)
r, err := newLogsReceiver(c, set, nextConsumer)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +87,6 @@ func (f *pulsarReceiverFactory) createLogsReceiver(

func createDefaultConfig() component.Config {
return &Config{
Encoding: defaultEncoding,
ConsumerName: defaultConsumerName,
Subscription: defaultSubscription,
Endpoint: defaultServiceURL,
Expand Down
Loading
Loading