Skip to content

Commit

Permalink
[receiver/kafkareceiver] standardize on topic names for spans, metric…
Browse files Browse the repository at this point in the history
…s, and logs (#30163)

**Description:**
Updates the default topics for metrics and logs consumers to be the same
as the `kafkaexporter` default topics for metrics and logs exporters.

**Link to tracking Issue:**
#27292

**Testing:** Added testing in `factory_test.go` to test that topics are
set correctly for spans, metrics, and logs consumers.

**Documentation:** Updated the comments in `config.go` and `README.md`
to match the updates.
  • Loading branch information
jwafle authored Feb 6, 2024
1 parent 6bb9cb1 commit dcf66ce
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 24 deletions.
31 changes: 31 additions & 0 deletions .chloggen/kafka-receiver-standardize-default-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: standardizes the default topic name for metrics and logs receivers to the same topic name as the metrics and logs exporters of the kafkaexporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27292]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
If you are using the Kafka receiver in a logs and/or a metrics pipeline
and you are not customizing the name of the topic to read from with the `topic` property,
the receiver will now read from `otlp_logs` or `otlp_metrics` topic instead of `otlp_spans` topic.
To maintain previous behavior, set the `topic` property to `otlp_spans`.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `topic` (default = otlp_spans): The name of the kafka topic to read from
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to consume from (default "otlp_spans")
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
Expand Down
32 changes: 21 additions & 11 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
)

const (
defaultTopic = "otlp_spans"
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
Expand Down Expand Up @@ -94,7 +96,6 @@ func NewFactory(options ...FactoryOption) receiver.Factory {

func createDefaultConfig() component.Config {
return &Config{
Topic: defaultTopic,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
Expand Down Expand Up @@ -137,13 +138,16 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
f.tracesUnmarshalers[encoding] = unmarshal
}

c := cfg.(*Config)
unmarshaler := f.tracesUnmarshalers[c.Encoding]
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
unmarshaler := f.tracesUnmarshalers[oCfg.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

r, err := newTracesReceiver(*c, set, unmarshaler, nextConsumer)
r, err := newTracesReceiver(oCfg, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
Expand All @@ -160,13 +164,16 @@ func (f *kafkaReceiverFactory) createMetricsReceiver(
f.metricsUnmarshalers[encoding] = unmarshal
}

c := cfg.(*Config)
unmarshaler := f.metricsUnmarshalers[c.Encoding]
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
unmarshaler := f.metricsUnmarshalers[oCfg.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

r, err := newMetricsReceiver(*c, set, unmarshaler, nextConsumer)
r, err := newMetricsReceiver(oCfg, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
Expand All @@ -183,13 +190,16 @@ func (f *kafkaReceiverFactory) createLogsReceiver(
f.logsUnmarshalers[encoding] = unmarshaler
}

c := cfg.(*Config)
unmarshaler, err := getLogsUnmarshaler(c.Encoding, f.logsUnmarshalers)
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers)
if err != nil {
return nil, err
}

r, err := newLogsReceiver(*c, set, unmarshaler, nextConsumer)
r, err := newLogsReceiver(oCfg, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
Expand Down
36 changes: 25 additions & 11 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, defaultTopic, cfg.Topic)
assert.Equal(t, defaultGroupID, cfg.GroupID)
assert.Equal(t, defaultClientID, cfg.ClientID)
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
Expand Down Expand Up @@ -62,12 +61,18 @@ func TestWithTracesUnmarshalers(t *testing.T) {
t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaler.Encoding()
receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
tracesConsumer, ok := receiver.(*kafkaTracesConsumer)
require.True(t, ok)
require.Equal(t, defaultTracesTopic, tracesConsumer.config.Topic)
require.NoError(t, err)
require.NotNil(t, receiver)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
receiver, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
tracesConsumer, ok := receiver.(*kafkaTracesConsumer)
require.True(t, ok)
require.Equal(t, defaultTracesTopic, tracesConsumer.config.Topic)
require.NoError(t, err)
assert.NotNil(t, receiver)
})
Expand Down Expand Up @@ -106,12 +111,18 @@ func TestWithMetricsUnmarshalers(t *testing.T) {
t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaler.Encoding()
receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
metricsConsumer, ok := receiver.(*kafkaMetricsConsumer)
require.True(t, ok)
require.Equal(t, defaultMetricsTopic, metricsConsumer.config.Topic)
require.NoError(t, err)
require.NotNil(t, receiver)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
receiver, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
metricsConsumer, ok := receiver.(*kafkaMetricsConsumer)
require.True(t, ok)
require.Equal(t, defaultMetricsTopic, metricsConsumer.config.Topic)
require.NoError(t, err)
assert.NotNil(t, receiver)
})
Expand Down Expand Up @@ -171,26 +182,29 @@ func TestWithLogsUnmarshalers(t *testing.T) {

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaler.Encoding()
exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
receiver, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
logsConsumer, ok := receiver.(*kafkaLogsConsumer)
require.True(t, ok)
require.Equal(t, defaultLogsTopic, logsConsumer.config.Topic)
require.NoError(t, err)
require.NotNil(t, exporter)
require.NotNil(t, receiver)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
exporter, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
receiver, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
logsConsumer, ok := receiver.(*kafkaLogsConsumer)
require.True(t, ok)
require.Equal(t, defaultLogsTopic, logsConsumer.config.Topic)
require.NoError(t, err)
assert.NotNil(t, exporter)
assert.NotNil(t, receiver)
})
}

type customTracesUnmarshaler struct {
}
type customTracesUnmarshaler struct{}

type customMetricsUnmarshaler struct {
}
type customMetricsUnmarshaler struct{}

type customLogsUnmarshaler struct {
}
type customLogsUnmarshaler struct{}

var _ TracesUnmarshaler = (*customTracesUnmarshaler)(nil)

Expand Down

0 comments on commit dcf66ce

Please sign in to comment.