Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch]: add automatic mapping mode #36110

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter-automatic-mapping-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for automatic mapping mode using a client header.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36092]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 11 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ behaviours, which may be configured through the following settings:
for ECS mode, and never for other modes): When enabled attributes with `.`
will be split into proper json objects.

It is also possible to configure the mapping mode dynamically by setting the metadata `X-Elastic-Mapping-Mode` using the [open-telemetry client](https://pkg.go.dev/go.opentelemetry.io/collector/client):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you implemented here is to always check for the header and override the mapping mode config. However, my understanding of the internal spec is that we should expose it as a config value "auto", and when it is set, it will switch mapping mode per request based on header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing that up; I misunderstood the specs at first. So, the auto-mapping mode should only kick in if it was explicitly allowed in the config. I'll make the changes accordingly.


```go
cl := client.FromContext(ctx)
cl.Metadata = client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}})
// Propagate the context down the pipeline
next.ConsumeLogs(ctx, td)
```

A mapping mode set in the metadata will take precedence over the configuration and will only apply to the current request. If the metadata is not set, the values from the configuration will be used instead.

#### ECS mapping mode

> [!WARNING]
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"go.uber.org/zap"
)

// HeaderXElasticMappingMode is the HTTP header key used to specify a
// mapping mode for a specific request.
const HeaderXElasticMappingMode = "X-Elastic-Mapping-Mode"

// Config defines configuration for Elastic exporter.
type Config struct {
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
Expand Down
77 changes: 60 additions & 17 deletions exporter/elasticsearchexporter/exporter.go
mauri870 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -32,7 +33,6 @@ type elasticsearchExporter struct {
logstashFormat LogstashFormatSettings
dynamicIndex bool
model mappingModel
otel bool

wg sync.WaitGroup // active sessions
bulkIndexer bulkIndexer
Expand All @@ -49,8 +49,6 @@ func newExporter(
mode: cfg.MappingMode(),
}

otel := model.mode == MappingOTel

userAgent := fmt.Sprintf(
"%s/%s (%s/%s)",
set.BuildInfo.Description,
Expand All @@ -68,7 +66,6 @@ func newExporter(
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
}
}

Expand Down Expand Up @@ -115,6 +112,8 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
}
defer session.End()

otel := e.isOtelMode(ctx)

var errs []error
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -126,7 +125,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil {
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session, otel); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -151,6 +150,37 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return errors.Join(errs...)
}

// modelFromContext returns the model to use for encoding.
// The mapping mode from the client metadata takes precedence over the exporter configuration.
func (e *elasticsearchExporter) modelFromContext(ctx context.Context) mappingModel {
c := client.FromContext(ctx)
model := e.model
values := c.Metadata.Get(HeaderXElasticMappingMode)
if len(values) > 0 && values[0] != "" {
mode, ok := mappingModes[values[0]]
if !ok {
e.Logger.Warn("invalid mapping mode", zap.String("mode", values[0]))
return model
}

model = &encodeModel{
dedot: e.config.Mapping.Dedot,
mode: mode,
}
}

return model
}

// isOtelMode returns true if the exporter is in OTel mode.
func (e *elasticsearchExporter) isOtelMode(ctx context.Context) bool {
m, ok := e.modelFromContext(ctx).(*encodeModel)
if !ok {
return false
}
return m.mode == MappingOTel
}

func (e *elasticsearchExporter) pushLogRecord(
ctx context.Context,
resource pcommon.Resource,
Expand All @@ -159,10 +189,11 @@ func (e *elasticsearchExporter) pushLogRecord(
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
otel bool,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand All @@ -173,7 +204,9 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)

document, err := model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand All @@ -197,6 +230,7 @@ func (e *elasticsearchExporter) pushMetricsData(
validationErrs []error // log instead of returning these so that upstream does not retry
errs []error
)
otel := e.isOtelMode(ctx)
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
Expand All @@ -212,7 +246,7 @@ func (e *elasticsearchExporter) pushMetricsData(
metric := scopeMetrics.Metrics().At(k)

upsertDataPoint := func(dp dataPoint) error {
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp)
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp, otel)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,13 +323,15 @@ func (e *elasticsearchExporter) pushMetricsData(
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}

model := e.modelFromContext(ctx)

for fIndex, docs := range resourceDocs {
for _, doc := range docs {
var (
docBytes []byte
err error
)
docBytes, err = e.model.encodeDocument(doc)
docBytes, err = model.encodeDocument(doc)
if err != nil {
errs = append(errs, err)
continue
Expand Down Expand Up @@ -323,10 +359,11 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
dataPoint dataPoint,
otel bool,
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand All @@ -352,6 +389,8 @@ func (e *elasticsearchExporter) pushTraceData(
}
defer session.End()

otel := e.isOtelMode(ctx)

var errs []error
resourceSpans := td.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
Expand All @@ -364,15 +403,15 @@ func (e *elasticsearchExporter) pushTraceData(
spans := scopeSpan.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session); err != nil {
if err := e.pushTraceRecord(ctx, resource, il.SchemaUrl(), span, scope, scopeSpan.SchemaUrl(), session, otel); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
for ii := 0; ii < span.Events().Len(); ii++ {
spanEvent := span.Events().At(ii)
if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil {
if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session, otel); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -397,10 +436,11 @@ func (e *elasticsearchExporter) pushTraceRecord(
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
otel bool,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, span.Name())
fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, span.Name())
}

if e.logstashFormat.Enabled {
Expand All @@ -411,7 +451,8 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)
document, err := model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
Expand All @@ -427,10 +468,11 @@ func (e *elasticsearchExporter) pushSpanEvent(
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
otel bool,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, e.otel, scope.Name())
fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel, scope.Name())
}

if e.logstashFormat.Enabled {
Expand All @@ -441,11 +483,12 @@ func (e *elasticsearchExporter) pushSpanEvent(
fIndex = formattedIndex
}

document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)
document := model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if document == nil {
return nil
}
docBytes, err := e.model.encodeDocument(*document)
docBytes, err := model.encodeDocument(*document)
if err != nil {
return err
}
Expand Down
74 changes: 74 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configauth"
Expand Down Expand Up @@ -297,6 +298,74 @@ func TestExporterLogs(t *testing.T) {
<-done
})

t.Run("publish with configured mapping mode header", func(t *testing.T) {
expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"message":"hello world"}`
expectedOtel := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","body":{"text":"hello world"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`
tests := []struct {
name string
cfgMode string
header string
expected string
}{
{
name: "mapping mode in header",
cfgMode: "ecs",
header: "otel",
expected: expectedOtel,
},
{
name: "invalid mapping mode in header",
cfgMode: "ecs",
header: "invalid",
expected: expectedECS,
},
{
name: "empty mapping mode in header",
cfgMode: "ecs",
header: "",
expected: expectedECS,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

actual := string(docs[0].Document)
assert.Equal(t, tt.expected, actual)

// Second client call does not include a header so it
// should use the default mapping mode.
actual = string(docs[1].Document)
assert.Equal(t, expectedECS, actual)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = tt.cfgMode // this should be overridden by the header
})

logs := newLogsWithAttributes(nil, nil, nil)
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world")

// Set the mapping mode via the header
ctx := context.Background()
cl := client.FromContext(ctx)
cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}})
ctx = client.NewContext(ctx, cl)

mustSendLogsCtx(ctx, t, exporter, logs)

// // Send logs again without the header this time.
mustSendLogs(t, exporter, logs)
rec.WaitItems(2)
})
}
})

t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) {

rec := newBulkRecorder()
Expand Down Expand Up @@ -1847,6 +1916,11 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) {
require.NoError(t, err)
}

func mustSendLogsCtx(ctx context.Context, t *testing.T, exporter exporter.Logs, logs plog.Logs) {
err := exporter.ConsumeLogs(ctx, logs)
require.NoError(t, err)
}

func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) {
metrics := pmetric.NewMetrics()
scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
Expand Down
Loading