From c9aabb7e4cd09b8482763ecca72b55152302ac2c Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 30 Oct 2024 08:59:34 -0300 Subject: [PATCH 1/5] [exporter/elasticsearch]: add automatic mapping mode This adds support for detecting the mapping mode based on a metadata field set by the client, this mapping mode takes precedence over the configuration. --- exporter/elasticsearchexporter/README.md | 11 +++ exporter/elasticsearchexporter/config.go | 4 + exporter/elasticsearchexporter/exporter.go | 39 +++++++-- .../elasticsearchexporter/exporter_test.go | 79 +++++++++++++++++++ 4 files changed, 128 insertions(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 53a093109557..9f7c69c19f06 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -175,6 +175,17 @@ behaviours, which may be configured through the following settings: for ECS mode, and never for other modes): When enabled attributes with `.` will be split into proper json objects. +It is also possible to configure the mapping mode dynamically by setting the metadata `X-Elastic-Mapping-Mode` using the [open-telemetry client](https://pkg.go.dev/go.opentelemetry.io/collector/client): + +```go +cl := client.FromContext(ctx) +cl.Metadata = client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}}) +// Propagate the context down the pipeline +next.ConsumeLogs(ctx, td) +``` + +A mapping mode set in the metadata will take precedence over the configuration and will only apply to the current request. If the metadata is not set, the values from the configuration will be used instead. + #### ECS mapping mode > [!WARNING] diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 0835396d928f..35ce6aa7093a 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -20,6 +20,10 @@ import ( "go.uber.org/zap" ) +// HeaderXElasticMappingMode is the HTTP header key used to specify a +// mapping mode for a specific request. +const HeaderXElasticMappingMode = "X-Elastic-Mapping-Mode" + // Config defines configuration for Elastic exporter. type Config struct { QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2..f03f60172c3e 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -151,6 +152,28 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) return errors.Join(errs...) } +// modelFromContext returns the model to use for encoding. +// The mapping mode from the client metadata takes precedence over the exporter configuration. +func (e *elasticsearchExporter) modelFromContext(ctx context.Context) mappingModel { + c := client.FromContext(ctx) + model := e.model + values := c.Metadata.Get(HeaderXElasticMappingMode) + if len(values) > 0 && values[0] != "" { + mode, ok := mappingModes[values[0]] + if !ok { + e.Logger.Warn("invalid mapping mode", zap.String("mode", values[0])) + return model + } + + model = &encodeModel{ + dedot: e.config.Mapping.Dedot, + mode: mode, + } + } + + return model +} + func (e *elasticsearchExporter) pushLogRecord( ctx context.Context, resource pcommon.Resource, @@ -173,7 +196,9 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } - document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + + document, err := model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } @@ -289,13 +314,15 @@ func (e *elasticsearchExporter) pushMetricsData( e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) } + model := e.modelFromContext(ctx) + for fIndex, docs := range resourceDocs { for _, doc := range docs { var ( docBytes []byte err error ) - docBytes, err = e.model.encodeDocument(doc) + docBytes, err = model.encodeDocument(doc) if err != nil { errs = append(errs, err) continue @@ -411,7 +438,8 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + document, err := model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } @@ -441,11 +469,12 @@ func (e *elasticsearchExporter) pushSpanEvent( fIndex = formattedIndex } - document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) + model := e.modelFromContext(ctx) + document := model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) if document == nil { return nil } - docBytes, err := e.model.encodeDocument(*document) + docBytes, err := model.encodeDocument(*document) if err != nil { return err } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index ab158dfc414b..245490e3428a 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configauth" @@ -297,6 +298,79 @@ func TestExporterLogs(t *testing.T) { <-done }) + t.Run("publish with configured mapping mode header", func(t *testing.T) { + expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","message":"hello world","service":{"name":"myservice"}}` + expectedOtlp := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes":{"application":"myapp","service":{"name":"myservice"}},"Body":"hello world","Scope":{"name":"","version":""},"SeverityNumber":0,"TraceFlags":0}` + tests := []struct { + name string + cfgMode string + header string + expected string + }{ + { + name: "mapping mode in header", + cfgMode: "otlp", + header: "ecs", + expected: expectedECS, + }, + { + name: "invalid mapping mode in header", + cfgMode: "otlp", + header: "invalid", + expected: expectedOtlp, + }, + { + name: "absent mapping mode in header", + cfgMode: "otlp", + header: "", + expected: expectedOtlp, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + actual := string(docs[0].Document) + assert.Equal(t, tt.expected, actual) + + // Second document should fallback to the config mapping + actual = string(docs[1].Document) + assert.Equal(t, expectedOtlp, actual) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = tt.cfgMode // this should be overridden by the header + }) + + logs := newLogsWithAttributes( + map[string]any{ + "application": "myapp", + "service.name": "myservice", + }, + nil, + nil, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + + // Set the mapping mode via the header + cl := client.FromContext(context.Background()) + cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}}) + ctx := client.NewContext(context.Background(), cl) + + mustSendLogsCtx(t, ctx, exporter, logs) + + // Send logs again without the header this time. + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + } + }) + t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) { rec := newBulkRecorder() @@ -1847,6 +1921,11 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { require.NoError(t, err) } +func mustSendLogsCtx(t *testing.T, ctx context.Context, exporter exporter.Logs, logs plog.Logs) { + err := exporter.ConsumeLogs(ctx, logs) + require.NoError(t, err) +} + func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) { metrics := pmetric.NewMetrics() scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() From 45edc7e4552a5d8ba7828722333d02bed19c89c0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 7 Nov 2024 10:12:50 -0300 Subject: [PATCH 2/5] fixes for MappingOTel --- exporter/elasticsearchexporter/exporter.go | 38 +++++++++++++------ .../elasticsearchexporter/exporter_test.go | 32 ++++++++-------- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index f03f60172c3e..f19d8d4d5e9f 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -33,7 +33,6 @@ type elasticsearchExporter struct { logstashFormat LogstashFormatSettings dynamicIndex bool model mappingModel - otel bool wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer @@ -50,8 +49,6 @@ func newExporter( mode: cfg.MappingMode(), } - otel := model.mode == MappingOTel - userAgent := fmt.Sprintf( "%s/%s (%s/%s)", set.BuildInfo.Description, @@ -69,7 +66,6 @@ func newExporter( dynamicIndex: dynamicIndex, model: model, logstashFormat: cfg.LogstashFormat, - otel: otel, } } @@ -116,6 +112,8 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) } defer session.End() + otel := e.isOtelMode(ctx) + var errs []error rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -127,7 +125,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) scope := ill.Scope() logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil { + if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session, otel); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -174,6 +172,15 @@ func (e *elasticsearchExporter) modelFromContext(ctx context.Context) mappingMod return model } +// isOtelMode returns true if the exporter is in OTel mode. +func (e *elasticsearchExporter) isOtelMode(ctx context.Context) bool { + m, ok := e.modelFromContext(ctx).(*encodeModel) + if !ok { + return false + } + return m.mode == MappingOTel +} + func (e *elasticsearchExporter) pushLogRecord( ctx context.Context, resource pcommon.Resource, @@ -182,10 +189,11 @@ func (e *elasticsearchExporter) pushLogRecord( scope pcommon.InstrumentationScope, scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, + otel bool, ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name()) } if e.logstashFormat.Enabled { @@ -222,6 +230,7 @@ func (e *elasticsearchExporter) pushMetricsData( validationErrs []error // log instead of returning these so that upstream does not retry errs []error ) + otel := e.isOtelMode(ctx) resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) @@ -237,7 +246,7 @@ func (e *elasticsearchExporter) pushMetricsData( metric := scopeMetrics.Metrics().At(k) upsertDataPoint := func(dp dataPoint) error { - fIndex, err := e.getMetricDataPointIndex(resource, scope, dp) + fIndex, err := e.getMetricDataPointIndex(resource, scope, dp, otel) if err != nil { return err } @@ -350,10 +359,11 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, scope pcommon.InstrumentationScope, dataPoint dataPoint, + otel bool, ) (string, error) { fIndex := e.index if e.dynamicIndex { - fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name()) } if e.logstashFormat.Enabled { @@ -379,6 +389,8 @@ func (e *elasticsearchExporter) pushTraceData( } defer session.End() + otel := e.isOtelMode(ctx) + var errs []error resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { @@ -391,7 +403,7 @@ func (e *elasticsearchExporter) pushTraceData( spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil { + if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session, otel); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -399,7 +411,7 @@ func (e *elasticsearchExporter) pushTraceData( } for ii := 0; ii < span.Events().Len(); ii++ { spanEvent := span.Events().At(ii) - if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil { + if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session, otel); err != nil { errs = append(errs, err) } } @@ -424,10 +436,11 @@ func (e *elasticsearchExporter) pushTraceRecord( scope pcommon.InstrumentationScope, scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, + otel bool, ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, span.Name()) + fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, span.Name()) } if e.logstashFormat.Enabled { @@ -455,10 +468,11 @@ func (e *elasticsearchExporter) pushSpanEvent( scope pcommon.InstrumentationScope, scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, + otel bool, ) error { fIndex := e.index if e.dynamicIndex { - fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name()) + fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name()) } if e.logstashFormat.Enabled { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 245490e3428a..c19f0730b93f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -300,7 +300,7 @@ func TestExporterLogs(t *testing.T) { t.Run("publish with configured mapping mode header", func(t *testing.T) { expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","message":"hello world","service":{"name":"myservice"}}` - expectedOtlp := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes":{"application":"myapp","service":{"name":"myservice"}},"Body":"hello world","Scope":{"name":"","version":""},"SeverityNumber":0,"TraceFlags":0}` + expectedOtel := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"application":"myapp","service.name":"myservice"},"body":{"text":"hello world"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}` tests := []struct { name string cfgMode string @@ -309,21 +309,21 @@ func TestExporterLogs(t *testing.T) { }{ { name: "mapping mode in header", - cfgMode: "otlp", - header: "ecs", - expected: expectedECS, + cfgMode: "ecs", + header: "otel", + expected: expectedOtel, }, { name: "invalid mapping mode in header", - cfgMode: "otlp", + cfgMode: "ecs", header: "invalid", - expected: expectedOtlp, + expected: expectedECS, }, { - name: "absent mapping mode in header", - cfgMode: "otlp", + name: "empty mapping mode in header", + cfgMode: "ecs", header: "", - expected: expectedOtlp, + expected: expectedECS, }, } @@ -336,9 +336,10 @@ func TestExporterLogs(t *testing.T) { actual := string(docs[0].Document) assert.Equal(t, tt.expected, actual) - // Second document should fallback to the config mapping + // Second client call does not include a header so it + // should use the default mapping mode. actual = string(docs[1].Document) - assert.Equal(t, expectedOtlp, actual) + assert.Equal(t, expectedECS, actual) return itemsAllOK(docs) }) @@ -358,15 +359,16 @@ func TestExporterLogs(t *testing.T) { logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") // Set the mapping mode via the header - cl := client.FromContext(context.Background()) + ctx := context.Background() + cl := client.FromContext(ctx) cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}}) - ctx := client.NewContext(context.Background(), cl) + ctx = client.NewContext(ctx, cl) mustSendLogsCtx(t, ctx, exporter, logs) - // Send logs again without the header this time. + // // Send logs again without the header this time. mustSendLogs(t, exporter, logs) - rec.WaitItems(1) + rec.WaitItems(2) }) } }) From 61de5b77bb5fca8ccef33b1f35e2a13302fc7a96 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 11 Nov 2024 08:57:35 -0300 Subject: [PATCH 3/5] add changelog entry --- ...searchexporter-automatic-mapping-mode.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/elasticsearchexporter-automatic-mapping-mode.yaml diff --git a/.chloggen/elasticsearchexporter-automatic-mapping-mode.yaml b/.chloggen/elasticsearchexporter-automatic-mapping-mode.yaml new file mode 100644 index 000000000000..e2df370e698f --- /dev/null +++ b/.chloggen/elasticsearchexporter-automatic-mapping-mode.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for automatic mapping mode using a client header. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36092] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 3af8b62387e70a921bc72f2889c87a01d05c77fe Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 11 Nov 2024 11:44:46 -0300 Subject: [PATCH 4/5] go mod tidy --- exporter/elasticsearchexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 26585e60c7a9..07bc27515034 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -14,6 +14,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.113.0 github.com/stretchr/testify v1.9.0 github.com/tidwall/gjson v1.18.0 + go.opentelemetry.io/collector/client v1.19.0 go.opentelemetry.io/collector/component v0.113.0 go.opentelemetry.io/collector/config/configauth v0.113.0 go.opentelemetry.io/collector/config/configcompression v1.19.0 @@ -63,7 +64,6 @@ require ( go.elastic.co/apm/module/apmzap/v2 v2.6.0 // indirect go.elastic.co/apm/v2 v2.6.0 // indirect go.elastic.co/fastjson v1.3.0 // indirect - go.opentelemetry.io/collector/client v1.19.0 // indirect go.opentelemetry.io/collector/config/configretry v1.19.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.113.0 // indirect go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect From 58fb2a0d821447583c74db78d165bb62d0f3ef80 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 12 Nov 2024 09:32:37 -0300 Subject: [PATCH 5/5] linter fix for ctx parameter --- exporter/elasticsearchexporter/exporter_test.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index c19f0730b93f..5541ca03710c 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -299,8 +299,8 @@ func TestExporterLogs(t *testing.T) { }) t.Run("publish with configured mapping mode header", func(t *testing.T) { - expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","message":"hello world","service":{"name":"myservice"}}` - expectedOtel := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"application":"myapp","service.name":"myservice"},"body":{"text":"hello world"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}` + expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"message":"hello world"}` + expectedOtel := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","body":{"text":"hello world"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}` tests := []struct { name string cfgMode string @@ -348,14 +348,7 @@ func TestExporterLogs(t *testing.T) { cfg.Mapping.Mode = tt.cfgMode // this should be overridden by the header }) - logs := newLogsWithAttributes( - map[string]any{ - "application": "myapp", - "service.name": "myservice", - }, - nil, - nil, - ) + logs := newLogsWithAttributes(nil, nil, nil) logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") // Set the mapping mode via the header @@ -364,7 +357,7 @@ func TestExporterLogs(t *testing.T) { cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}}) ctx = client.NewContext(ctx, cl) - mustSendLogsCtx(t, ctx, exporter, logs) + mustSendLogsCtx(ctx, t, exporter, logs) // // Send logs again without the header this time. mustSendLogs(t, exporter, logs) @@ -1923,7 +1916,7 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { require.NoError(t, err) } -func mustSendLogsCtx(t *testing.T, ctx context.Context, exporter exporter.Logs, logs plog.Logs) { +func mustSendLogsCtx(ctx context.Context, t *testing.T, exporter exporter.Logs, logs plog.Logs) { err := exporter.ConsumeLogs(ctx, logs) require.NoError(t, err) }