Skip to content

Commit

Permalink
[exporter/elasticsearch]: add automatic mapping mode
Browse files Browse the repository at this point in the history
This adds support for detecting the mapping mode based on a metadata field
set by the client, this mapping mode takes precedence over the
configuration.
  • Loading branch information
mauri870 committed Oct 31, 2024
1 parent c4e91e1 commit 6e4e4e4
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 5 deletions.
11 changes: 11 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ behaviours, which may be configured through the following settings:
for ECS mode, and never for other modes): When enabled attributes with `.`
will be split into proper json objects.

It is also possible to configure the mapping mode dynamically by setting the metadata `X-Elastic-Mapping-Mode` using the [open-telemetry client](https://pkg.go.dev/go.opentelemetry.io/collector/client):

```go
cl := client.FromContext(ctx)
cl.Metadata = client.NewMetadata(map[string][]string{"X-Elastic-Mapping-Mode": {"ecs"}})
// Propagate the context down the pipeline
next.ConsumeLogs(ctx, td)
```

A mapping mode set in the metadata will take precedence over the configuration and will only apply to the current request. If the metadata is not set, the values from the configuration will be used instead.

#### ECS mapping mode

> [!WARNING]
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"go.uber.org/zap"
)

// HeaderXElasticMappingMode is the HTTP header key used to specify a
// mapping mode for a specific request.
const HeaderXElasticMappingMode = "X-Elastic-Mapping-Mode"

// Config defines configuration for Elastic exporter.
type Config struct {
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
Expand Down
39 changes: 34 additions & 5 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -151,6 +152,28 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return errors.Join(errs...)
}

// modelFromContext returns the model to use for encoding.
// The mapping mode from the client metadata takes precedence over the exporter configuration.
func (e *elasticsearchExporter) modelFromContext(ctx context.Context) mappingModel {
c := client.FromContext(ctx)
model := e.model
values := c.Metadata.Get(HeaderXElasticMappingMode)
if len(values) > 0 && values[0] != "" {
mode, ok := mappingModes[values[0]]
if !ok {
e.Logger.Warn("invalid mapping mode", zap.String("mode", values[0]))
return model
}

model = &encodeModel{
dedot: e.config.Mapping.Dedot,
mode: mode,
}
}

return model
}

func (e *elasticsearchExporter) pushLogRecord(
ctx context.Context,
resource pcommon.Resource,
Expand All @@ -173,7 +196,9 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)

document, err := model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand Down Expand Up @@ -289,13 +314,15 @@ func (e *elasticsearchExporter) pushMetricsData(
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}

model := e.modelFromContext(ctx)

for fIndex, docs := range resourceDocs {
for _, doc := range docs {
var (
docBytes []byte
err error
)
docBytes, err = e.model.encodeDocument(doc)
docBytes, err = model.encodeDocument(doc)
if err != nil {
errs = append(errs, err)
continue
Expand Down Expand Up @@ -411,7 +438,8 @@ func (e *elasticsearchExporter) pushTraceRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)
document, err := model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
Expand Down Expand Up @@ -441,11 +469,12 @@ func (e *elasticsearchExporter) pushSpanEvent(
fIndex = formattedIndex
}

document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
model := e.modelFromContext(ctx)
document := model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if document == nil {
return nil
}
docBytes, err := e.model.encodeDocument(*document)
docBytes, err := model.encodeDocument(*document)
if err != nil {
return err
}
Expand Down
79 changes: 79 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configauth"
Expand Down Expand Up @@ -297,6 +298,79 @@ func TestExporterLogs(t *testing.T) {
<-done
})

t.Run("publish with configured mapping mode header", func(t *testing.T) {
expectedECS := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","agent":{"name":"otlp"},"application":"myapp","message":"hello world","service":{"name":"myservice"}}`
expectedOtlp := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Attributes":{"application":"myapp","service":{"name":"myservice"}},"Body":"hello world","Scope":{"name":"","version":""},"SeverityNumber":0,"TraceFlags":0}`
tests := []struct {
name string
cfgMode string
header string
expected string
}{
{
name: "mapping mode in header",
cfgMode: "otlp",
header: "ecs",
expected: expectedECS,
},
{
name: "invalid mapping mode in header",
cfgMode: "otlp",
header: "invalid",
expected: expectedOtlp,
},
{
name: "absent mapping mode in header",
cfgMode: "otlp",
header: "",
expected: expectedOtlp,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

actual := string(docs[0].Document)
assert.Equal(t, tt.expected, actual)

// Second document should fallback to the config mapping
actual = string(docs[1].Document)
assert.Equal(t, expectedOtlp, actual)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = tt.cfgMode // this should be overridden by the header
})

logs := newLogsWithAttributes(
map[string]any{
"application": "myapp",
"service.name": "myservice",
},
nil,
nil,
)
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world")

// Set the mapping mode via the header
cl := client.FromContext(context.Background())
cl.Metadata = client.NewMetadata(map[string][]string{HeaderXElasticMappingMode: {tt.header}})
ctx := client.NewContext(context.Background(), cl)

mustSendLogsCtx(t, ctx, exporter, logs)

// Send logs again without the header this time.
mustSendLogs(t, exporter, logs)
rec.WaitItems(1)
})
}
})

t.Run("publish with dynamic index, prefix_suffix", func(t *testing.T) {

rec := newBulkRecorder()
Expand Down Expand Up @@ -1865,6 +1939,11 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) {
require.NoError(t, err)
}

func mustSendLogsCtx(t *testing.T, ctx context.Context, exporter exporter.Logs, logs plog.Logs) {
err := exporter.ConsumeLogs(ctx, logs)
require.NoError(t, err)
}

func mustSendMetricSumDataPoints(t *testing.T, exporter exporter.Metrics, dataPoints ...pmetric.NumberDataPoint) {
metrics := pmetric.NewMetrics()
scopeMetrics := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
Expand Down

0 comments on commit 6e4e4e4

Please sign in to comment.