From dcf66ce0a5a9a9cf804ce045fdef8d1401cc4d7c Mon Sep 17 00:00:00 2001 From: Jared Woelfel <89057381+jwafle@users.noreply.github.com> Date: Tue, 6 Feb 2024 17:21:22 -0600 Subject: [PATCH] [receiver/kafkareceiver] standardize on topic names for spans, metrics, and logs (#30163) **Description:** Updates the default topics for metrics and logs consumers to be the same as the `kafkaexporter` default topics for metrics and logs exporters. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27292 **Testing:** Added testing in `factory_test.go` to test that topics are set correctly for spans, metrics, and logs consumers. **Documentation:** Updated the comments in `config.go` and `README.md` to match the updates. --- ...ka-receiver-standardize-default-topic.yaml | 31 ++++++++++++++++ receiver/kafkareceiver/README.md | 2 +- receiver/kafkareceiver/config.go | 2 +- receiver/kafkareceiver/factory.go | 32 +++++++++++------ receiver/kafkareceiver/factory_test.go | 36 +++++++++++++------ 5 files changed, 79 insertions(+), 24 deletions(-) create mode 100644 .chloggen/kafka-receiver-standardize-default-topic.yaml diff --git a/.chloggen/kafka-receiver-standardize-default-topic.yaml b/.chloggen/kafka-receiver-standardize-default-topic.yaml new file mode 100644 index 000000000000..efa2c4e1bbb2 --- /dev/null +++ b/.chloggen/kafka-receiver-standardize-default-topic.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: standardizes the default topic name for metrics and logs receivers to the same topic name as the metrics and logs exporters of the kafkaexporter + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27292] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + If you are using the Kafka receiver in a logs and/or a metrics pipeline + and you are not customizing the name of the topic to read from with the `topic` property, + the receiver will now read from `otlp_logs` or `otlp_metrics` topic instead of `otlp_spans` topic. + To maintain previous behavior, set the `topic` property to `otlp_spans`. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 718e957964dc..100e70f57559 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -32,7 +32,7 @@ The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup -- `topic` (default = otlp_spans): The name of the kafka topic to read from +- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index fce6c0641254..2f2eff8e6d52 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -48,7 +48,7 @@ type Config struct { ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` // Kafka protocol version ProtocolVersion string `mapstructure:"protocol_version"` - // The name of the kafka topic to consume from (default "otlp_spans") + // The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs) Topic string `mapstructure:"topic"` // Encoding of the messages (default "otlp_proto") Encoding string `mapstructure:"encoding"` diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 91737264e182..7552f0c4653c 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -19,7 +19,9 @@ import ( ) const ( - defaultTopic = "otlp_spans" + defaultTracesTopic = "otlp_spans" + defaultMetricsTopic = "otlp_metrics" + defaultLogsTopic = "otlp_logs" defaultEncoding = "otlp_proto" defaultBroker = "localhost:9092" defaultClientID = "otel-collector" @@ -94,7 +96,6 @@ func NewFactory(options ...FactoryOption) receiver.Factory { func createDefaultConfig() component.Config { return &Config{ - Topic: defaultTopic, Encoding: defaultEncoding, Brokers: []string{defaultBroker}, ClientID: defaultClientID, @@ -137,13 +138,16 @@ func (f *kafkaReceiverFactory) createTracesReceiver( f.tracesUnmarshalers[encoding] = unmarshal } - c := cfg.(*Config) - unmarshaler := f.tracesUnmarshalers[c.Encoding] + oCfg := *(cfg.(*Config)) + if oCfg.Topic == "" { + oCfg.Topic = defaultTracesTopic + } + unmarshaler := f.tracesUnmarshalers[oCfg.Encoding] if unmarshaler == nil { return nil, errUnrecognizedEncoding } - r, err := newTracesReceiver(*c, set, unmarshaler, nextConsumer) + r, err := newTracesReceiver(oCfg, set, unmarshaler, nextConsumer) if err != nil { return nil, err } @@ -160,13 +164,16 @@ func (f *kafkaReceiverFactory) createMetricsReceiver( f.metricsUnmarshalers[encoding] = unmarshal } - c := cfg.(*Config) - unmarshaler := f.metricsUnmarshalers[c.Encoding] + oCfg := *(cfg.(*Config)) + if oCfg.Topic == "" { + oCfg.Topic = defaultMetricsTopic + } + unmarshaler := f.metricsUnmarshalers[oCfg.Encoding] if unmarshaler == nil { return nil, errUnrecognizedEncoding } - r, err := newMetricsReceiver(*c, set, unmarshaler, nextConsumer) + r, err := newMetricsReceiver(oCfg, set, unmarshaler, nextConsumer) if err != nil { return nil, err } @@ -183,13 +190,16 @@ func (f *kafkaReceiverFactory) createLogsReceiver( f.logsUnmarshalers[encoding] = unmarshaler } - c := cfg.(*Config) - unmarshaler, err := getLogsUnmarshaler(c.Encoding, f.logsUnmarshalers) + oCfg := *(cfg.(*Config)) + if oCfg.Topic == "" { + oCfg.Topic = defaultLogsTopic + } + unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers) if err != nil { return nil, err } - r, err := newLogsReceiver(*c, set, unmarshaler, nextConsumer) + r, err := newLogsReceiver(oCfg, set, unmarshaler, nextConsumer) if err != nil { return nil, err } diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 45386eca9657..f409498ad6d6 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -23,7 +23,6 @@ func TestCreateDefaultConfig(t *testing.T) { assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) assert.Equal(t, []string{defaultBroker}, cfg.Brokers) - assert.Equal(t, defaultTopic, cfg.Topic) assert.Equal(t, defaultGroupID, cfg.GroupID) assert.Equal(t, defaultClientID, cfg.ClientID) assert.Equal(t, defaultInitialOffset, cfg.InitialOffset) @@ -62,12 +61,18 @@ func TestWithTracesUnmarshalers(t *testing.T) { t.Run("custom_encoding", func(t *testing.T) { cfg.Encoding = unmarshaler.Encoding() receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + tracesConsumer, ok := receiver.(*kafkaTracesConsumer) + require.True(t, ok) + require.Equal(t, defaultTracesTopic, tracesConsumer.config.Topic) require.NoError(t, err) require.NotNil(t, receiver) }) t.Run("default_encoding", func(t *testing.T) { cfg.Encoding = defaultEncoding receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + tracesConsumer, ok := receiver.(*kafkaTracesConsumer) + require.True(t, ok) + require.Equal(t, defaultTracesTopic, tracesConsumer.config.Topic) require.NoError(t, err) assert.NotNil(t, receiver) }) @@ -106,12 +111,18 @@ func TestWithMetricsUnmarshalers(t *testing.T) { t.Run("custom_encoding", func(t *testing.T) { cfg.Encoding = unmarshaler.Encoding() receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + metricsConsumer, ok := receiver.(*kafkaMetricsConsumer) + require.True(t, ok) + require.Equal(t, defaultMetricsTopic, metricsConsumer.config.Topic) require.NoError(t, err) require.NotNil(t, receiver) }) t.Run("default_encoding", func(t *testing.T) { cfg.Encoding = defaultEncoding receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + metricsConsumer, ok := receiver.(*kafkaMetricsConsumer) + require.True(t, ok) + require.Equal(t, defaultMetricsTopic, metricsConsumer.config.Topic) require.NoError(t, err) assert.NotNil(t, receiver) }) @@ -171,26 +182,29 @@ func TestWithLogsUnmarshalers(t *testing.T) { t.Run("custom_encoding", func(t *testing.T) { cfg.Encoding = unmarshaler.Encoding() - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + receiver, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + logsConsumer, ok := receiver.(*kafkaLogsConsumer) + require.True(t, ok) + require.Equal(t, defaultLogsTopic, logsConsumer.config.Topic) require.NoError(t, err) - require.NotNil(t, exporter) + require.NotNil(t, receiver) }) t.Run("default_encoding", func(t *testing.T) { cfg.Encoding = defaultEncoding - exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + receiver, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + logsConsumer, ok := receiver.(*kafkaLogsConsumer) + require.True(t, ok) + require.Equal(t, defaultLogsTopic, logsConsumer.config.Topic) require.NoError(t, err) - assert.NotNil(t, exporter) + assert.NotNil(t, receiver) }) } -type customTracesUnmarshaler struct { -} +type customTracesUnmarshaler struct{} -type customMetricsUnmarshaler struct { -} +type customMetricsUnmarshaler struct{} -type customLogsUnmarshaler struct { -} +type customLogsUnmarshaler struct{} var _ TracesUnmarshaler = (*customTracesUnmarshaler)(nil)