From 6e4e4e4c1e5f4435d9df4b247e44878eca3c3fac Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 30 Oct 2024 08:59:34 -0300 Subject: [PATCH] [exporter/elasticsearch]: add automatic mapping mode This adds support for detecting the mapping mode based on a metadata field set by the client, this mapping mode takes precedence over the configuration. --- exporter/elasticsearchexporter/README.md | 11 +++ exporter/elasticsearchexporter/config.go | 4 + exporter/elasticsearchexporter/exporter.go | 39 +++++++-- .../elasticsearchexporter/exporter_test.go | 79 +++++++++++++++++++ 4 files changed, 128 insertions(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index eadb1e309803..0371eb0f5941 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -175,6 +175,17 @@ behaviours, which may be configured through the following settings: for ECS mode, and never for other modes): When enabled attributes with `.` will be split into proper json objects. +It is also possible to configure the mapping mode dynamically by setting the metadata `X-Elastic-Mapping-Mode` using the [open-telemetry client](https://pkg.go.dev/go.opentelemetry.io/collector/client): + +```go +cl := client.FromContext(ctx) +cl.Metadata = client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}}) +// Propagate the context down the pipeline +next.ConsumeLogs(ctx, td) +``` + +A mapping mode set in the metadata will take precedence over the configuration and will only apply to the current request. If the metadata is not set, the values from the configuration will be used instead. + #### ECS mapping mode > [!WARNING] diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 0835396d928f..35ce6aa7093a 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -20,6 +20,10 @@ import ( "go.uber.org/zap" ) +// HeaderXElasticMappingMode is the HTTP header key used to specify a +// mapping mode for a specific request. +const HeaderXElasticMappingMode = "X-Elastic-Mapping-Mode" + // Config defines configuration for Elastic exporter. type Config struct { QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2..f03f60172c3e 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -151,6 +152,28 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) return errors.Join(errs...) } +// modelFromContext returns the model to use for encoding. +// The mapping mode from the client metadata takes precedence over the exporter configuration. +func (e *elasticsearchExporter) modelFromContext(ctx context.Context) mappingModel { + c := client.FromContext(ctx) + model := e.model + values := c.Metadata.Get(HeaderXElasticMappingMode) + if len(values) > 0 && values[0] != "" { + mode, ok := mappingModes[values[0]] + if !ok { + e.Logger.Warn("invalid mapping mode", zap.String("mode", values[0])) + return model + } + + model = &encodeModel{ + dedot: e.config.Mapping.Dedot, + mode: mode, + } + } + + return model +} + func (e *elasticsearchExporter) pushLogRecord( ctx context.Context, resource pcommon.Resource, @@ -173,7 +196,9 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } - document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + + document, err := model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } @@ -289,13 +314,15 @@ func (e *elasticsearchExporter) pushMetricsData( e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) } + model := e.modelFromContext(ctx) + for fIndex, docs := range resourceDocs { for _, doc := range docs { var ( docBytes []byte err error ) - docBytes, err = e.model.encodeDocument(doc) + docBytes, err = model.encodeDocument(doc) if err != nil { errs = append(errs, err) continue @@ -411,7 +438,8 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + document, err := model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } @@ -441,11 +469,12 @@ func (e *elasticsearchExporter) pushSpanEvent( fIndex = formattedIndex } - document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + document := model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) if document == nil { return nil } - docBytes, err := e.model.encodeDocument(*document) + docBytes, err := model.encodeDocument(*document) if err != nil { return err } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 74933eaf40c3..97df068a1822 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configauth" @@ -297,6 +298,79 @@ func TestExporterLogs(t *testing.T) { <-done }) + t.Run("publish with configured mapping mode header", func(t *testing.T) { + expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","message":"hello world","service":{"name":"myservice"}}` + expectedOtlp := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes":{"application":"myapp","service":{"name":"myservice"}},"Body":"hello world","Scope":{"name":"","version":""},"SeverityNumber":0,"TraceFlags":0}` + tests := []struct { + name string + cfgMode string + header string + expected string + }{ + { + name: "mapping mode in header", + cfgMode: "otlp", + header: "ecs", + expected: expectedECS, + }, + { + name: "invalid mapping mode in header", + cfgMode: "otlp", + header: "invalid", + expected: expectedOtlp, + }, + { + name: "absent mapping mode in header", + cfgMode: "otlp", + header: "", + expected: expectedOtlp, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + actual := string(docs[0].Document) + assert.Equal(t, tt.expected, actual) + + // Second document should fallback to the config mapping + actual = string(docs[1].Document) + assert.Equal(t, expectedOtlp, actual) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = tt.cfgMode // this should be overridden by the header + }) + + logs := newLogsWithAttributes( + map[string]any{ + "application": "myapp", + "service.name": "myservice", + }, + nil, + nil, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + + // Set the mapping mode via the header + cl := client.FromContext(context.Background()) + cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}}) + ctx := client.NewContext(context.Background(), cl) + + mustSendLogsCtx(t, ctx, exporter, logs) + + // Send logs again without the header this time. + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + } + }) + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() @@ -1865,6 +1939,11 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { require.NoError(t, err) } +func mustSendLogsCtx(t *testing.T, ctx context.Context, exporter exporter.Logs, logs plog.Logs) { + err := exporter.ConsumeLogs(ctx, logs) + require.NoError(t, err) +} + func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()