Skip to content

Commit

Permalink
[exporter/kafka] Implement partitioning by resource attributes for lo…
Browse files Browse the repository at this point in the history
…gs (#33230)
  • Loading branch information
SHaaD94 authored Aug 4, 2024
1 parent 2027679 commit 3f1c619
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 261 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-partition-logs-by-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add an ability to partition logs based on resource attributes.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33229]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The following settings can be optionally configured:
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Config struct {

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -114,6 +115,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -168,6 +170,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
45 changes: 7 additions & 38 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,45 +40,16 @@ const (
defaultFluxMaxMessages = 0
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
// partitioning logs by resource attributes is disabled by default
defaultPartitionLogsByResourceAttributesEnabled = false
)

// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// withTracesMarshalers adds tracesMarshalers.
func withTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range tracesMarshalers {
factory.tracesMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// withMetricsMarshalers adds additional metric marshalers to the exporter factory.
func withMetricsMarshalers(metricMarshalers ...MetricsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range metricMarshalers {
factory.metricsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// withLogsMarshalers adds additional log marshalers to the exporter factory.
func withLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range logsMarshalers {
factory.logsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) exporter.Factory {
f := &kafkaExporterFactory{
tracesMarshalers: tracesMarshalers(),
metricsMarshalers: metricsMarshalers(),
logsMarshalers: logsMarshalers(),
}
f := &kafkaExporterFactory{}
for _, o := range options {
o(f)
}
Expand All @@ -102,6 +73,7 @@ func createDefaultConfig() component.Config {
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -119,9 +91,6 @@ func createDefaultConfig() component.Config {
}

type kafkaExporterFactory struct {
tracesMarshalers map[string]TracesMarshaler
metricsMarshalers map[string]MetricsMarshaler
logsMarshalers map[string]LogsMarshaler
}

func (f *kafkaExporterFactory) createTracesExporter(
Expand All @@ -136,7 +105,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(oCfg, set, f.tracesMarshalers)
exp, err := newTracesExporter(oCfg, set)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +136,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(oCfg, set, f.metricsMarshalers)
exp, err := newMetricsExporter(oCfg, set)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,7 +167,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Encoding == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(oCfg, set, f.logsMarshalers)
exp, err := newLogsExporter(oCfg, set)
if err != nil {
return nil, err
}
Expand Down
72 changes: 3 additions & 69 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,15 @@ package kafkaexporter

import (
"context"
"errors"
"net"
"testing"

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// data is a simple means of allowing
// interchangeability between the
// different marshaller types
type data interface {
ptrace.Traces | plog.Logs | pmetric.Metrics
}

type mockMarshaler[Data data] struct {
consume func(d Data, topic string) ([]*sarama.ProducerMessage, error)
encoding string
}

func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding }

func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) {
if mm.consume != nil {
return mm.consume(d, topic)
}
return nil, errors.New("not implemented")
}

func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] {
return &mockMarshaler[Data]{encoding: encoding}
}

// applyConfigOption is used to modify values of the
// the default exporter config to make it easier to
// use the return in a test table set up
Expand Down Expand Up @@ -100,26 +70,14 @@ func TestCreateMetricExporter(t *testing.T) {
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []MetricsMarshaler{
newMockMarshaler[pmetric.Metrics]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory(withMetricsMarshalers(tc.marshalers...))
f := NewFactory()
exporter, err := f.CreateMetricsExporter(
context.Background(),
exportertest.NewNopSettings(),
Expand Down Expand Up @@ -177,26 +135,14 @@ func TestCreateLogExporter(t *testing.T) {
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []LogsMarshaler{
newMockMarshaler[plog.Logs]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory(withLogsMarshalers(tc.marshalers...))
f := NewFactory()
exporter, err := f.CreateLogsExporter(
context.Background(),
exportertest.NewNopSettings(),
Expand Down Expand Up @@ -254,26 +200,14 @@ func TestCreateTraceExporter(t *testing.T) {
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []TracesMarshaler{
newMockMarshaler[ptrace.Traces]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory(withTracesMarshalers(tc.marshalers...))
f := NewFactory()
exporter, err := f.CreateTracesExporter(
context.Background(),
exportertest.NewNopSettings(),
Expand Down
34 changes: 14 additions & 20 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,15 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
return producer, nil
}

func newMetricsExporter(config Config, set exporter.Settings, marshalers map[string]MetricsMarshaler) (*kafkaMetricsProducer, error) {
marshaler := marshalers[config.Encoding]
func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProducer, error) {
marshaler, err := createMetricMarshaler(config)
if err != nil {
return nil, err
}

if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key()
}
}

return &kafkaMetricsProducer{
cfg: config,
Expand All @@ -224,15 +223,10 @@ func newMetricsExporter(config Config, set exporter.Settings, marshalers map[str
}

// newTracesExporter creates Kafka exporter.
func newTracesExporter(config Config, set exporter.Settings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) {
marshaler := marshalers[config.Encoding]
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionTracesByID {
if keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler); ok {
keyableMarshaler.Key()
}
func newTracesExporter(config Config, set exporter.Settings) (*kafkaTracesProducer, error) {
marshaler, err := createTracesMarshaler(config)
if err != nil {
return nil, err
}

return &kafkaTracesProducer{
Expand All @@ -242,10 +236,10 @@ func newTracesExporter(config Config, set exporter.Settings, marshalers map[stri
}, nil
}

func newLogsExporter(config Config, set exporter.Settings, marshalers map[string]LogsMarshaler) (*kafkaLogsProducer, error) {
marshaler := marshalers[config.Encoding]
if marshaler == nil {
return nil, errUnrecognizedEncoding
func newLogsExporter(config Config, set exporter.Settings) (*kafkaLogsProducer, error) {
marshaler, err := createLogMarshaler(config)
if err != nil {
return nil, err
}

return &kafkaLogsProducer{
Expand Down
Loading

0 comments on commit 3f1c619

Please sign in to comment.