Skip to content

Commit

Permalink
[exporter/elasticsearch] Add mapping.mode: raw configuration option (
Browse files Browse the repository at this point in the history
…#29619)

**Description:** 

This PR adds a new configuration option, `mapping.mode: raw`, to the
Elasticsearch exporter. When set, the Elasticsearch exporter will not
prefix log or span attributes with `Attributes.` when forming the
Elasticsearch document field names for these fields. Additionally, the
exporter will also not prefix span events with `Events.*` with forming
the Elasticsearch document field names for these fields.

**Link to tracking Issue:** Resolves
#26647

**Testing:** 

Besides adding/updating relevant unit tests in this PR, I also tested
the changes in this PR against a local Elasticsearch cluster, using the
following collector configurations:

1. Without the new `mapping.mode: raw` setting.
   ```yaml
   receivers:
     tcplog:
       listen_address: "0.0.0.0:54545"
   
   processors:
     attributes:
       actions:
         - action: insert
           key: first_attribute
           value: one
         - action: insert
           key: second_attribute
           value: two
   
   exporters:
     debug:
       verbosity: detailed
     elasticsearch:
       endpoints: [ "https://localhost:9200" ]
       user: elastic
       password: XXXXXXXX
       logs_index: otel-logs
       tls:
         insecure_skip_verify: true
       flush:
         interval: 1s
   
   service:
     pipelines:
       logs:
         receivers: [tcplog]
         processors: [attributes]
         exporters: [debug,elasticsearch]
   ```

   _Resulting document in Elasticsearch:_
   ```json
   {
     "_index": "otel-logs",
     "_id": "l1E5J4wBD9bb2EmZJuDR",
     "_score": 1,
     "_source": {
       "@timestamp": "1970-01-01T00:00:00.000000000Z",
       "Attributes": {
         "first_attribute": "one",
         "second_attribute": "two"
       },
       "Body": "bar",
       "Scope": {
         "name": "",
         "version": ""
       },
       "SeverityNumber": 0,
       "TraceFlags": 0
     }
   }
   ```

2. With the new `mapping.mode: raw` setting.
   ```yaml
   receivers:
     tcplog:
       listen_address: "0.0.0.0:54545"
   
   processors:
     attributes:
       actions:
         - action: insert
           key: first_attribute
           value: one
         - action: insert
           key: second_attribute
           value: two
   
   exporters:
     debug:
       verbosity: detailed
     elasticsearch:
       endpoints: [ "https://localhost:9200" ]
       user: elastic
       password: XXXXXXXX
       logs_index: otel-logs
       tls:
         insecure_skip_verify: true
       flush:
         interval: 1s
       mapping:
         mode: raw
   
   service:
     pipelines:
       logs:
         receivers: [tcplog]
         processors: [attributes]
         exporters: [debug,elasticsearch]
   ```

   _Resulting document in Elasticsearch:_
   ```json
   {
     "_index": "otel-logs",
     "_id": "jlE4J4wBD9bb2EmZp-Cd",
     "_score": 1,
     "_source": {
       "@timestamp": "1970-01-01T00:00:00.000000000Z",
       "Body": "foo bar baz",
       "Scope": {
         "name": "",
         "version": ""
       },
       "SeverityNumber": 0,
       "TraceFlags": 0,
       "first_attribute": "one",
       "second_attribute": "two"
     }
   }      
   ```

**Documentation:** Documented the new configuration option in the
Elasticsearch exporter's `README.md`.

---------

Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
ycombinator and andrzej-stencel authored Feb 6, 2024
1 parent b90d987 commit aa36c85
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 7 deletions.
33 changes: 33 additions & 0 deletions .chloggen/exp-es-omit-attributes-prefix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `mapping.mode: raw` configuration option"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26647]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Setting `mapping.mode: raw` in the Elasticsearch exporter's configuration
will result logs and traces being indexed into Elasticsearch with their
attribute fields directly at the root level of the document instead inside an
`Attributes` object. Similarly, this setting will also result in traces being
indexed into Elasticsearch with their event fields directly at the root level
of the document instead of inside an `Events` object.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www
- `ecs`: Try to map fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions)
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html).
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
- `fields` (optional): Configure additional fields mappings.
- `file` (optional): Read additional field mappings from the provided YAML file.
- `dedup` (default=true): Try to find and remove duplicate fields/attributes
Expand Down
11 changes: 11 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type MappingMode int
const (
MappingNone MappingMode = iota
MappingECS
MappingRaw
)

var (
Expand All @@ -188,6 +189,8 @@ func (m MappingMode) String() string {
return ""
case MappingECS:
return "ecs"
case MappingRaw:
return "raw"
default:
return ""
}
Expand All @@ -198,6 +201,7 @@ var mappingModes = func() map[string]MappingMode {
for _, m := range []MappingMode{
MappingNone,
MappingECS,
MappingRaw,
} {
table[strings.ToLower(m.String())] = m
}
Expand Down Expand Up @@ -231,3 +235,10 @@ func (cfg *Config) Validate() error {

return nil
}

// MappingMode returns the mapping.mode defined in the given cfg
// object. This method must be called after cfg.Validate() has been
// called without returning an error.
func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func TestLoadConfig(t *testing.T) {
defaultLogstashFormatCfg.(*Config).Endpoints = []string{"http://localhost:9200"}
defaultLogstashFormatCfg.(*Config).LogstashFormat.Enabled = true

defaultRawCfg := createDefaultConfig()
defaultRawCfg.(*Config).Endpoints = []string{"http://localhost:9200"}
defaultRawCfg.(*Config).Mapping.Mode = "raw"

tests := []struct {
configFile string
id component.ID
Expand Down Expand Up @@ -200,6 +204,11 @@ func TestLoadConfig(t *testing.T) {
configFile: "config.yaml",
expected: defaultLogstashFormatCfg,
},
{
id: component.NewIDWithName(metadata.Type, "raw"),
configFile: "config.yaml",
expected: defaultRawCfg,
},
}

for _, tt := range tests {
Expand Down
6 changes: 5 additions & 1 deletion exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte
maxAttempts = cfg.Retry.MaxRequests
}

model := &encodeModel{dedup: cfg.Mapping.Dedup, dedot: cfg.Mapping.Dedot}
model := &encodeModel{
dedup: cfg.Mapping.Dedup,
dedot: cfg.Mapping.Dedot,
mode: cfg.MappingMode(),
}

indexStr := cfg.LogsIndex
if cfg.Index != "" {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestExporter_New(t *testing.T) {
cfg.Mapping.Dedot = false
cfg.Mapping.Dedup = true
}),
want: successWithInternalModel(&encodeModel{dedot: false, dedup: true}),
want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}),
},
}

Expand Down
23 changes: 20 additions & 3 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type mappingModel interface {
type encodeModel struct {
dedup bool
dedot bool
mode MappingMode
}

const (
Expand All @@ -47,7 +48,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
document.AddString("SeverityText", record.SeverityText())
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
document.AddAttributes("Attributes", record.Attributes())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))

Expand All @@ -74,9 +75,9 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc
document.AddInt("TraceStatus", int64(span.Status().Code()))
document.AddString("TraceStatusDescription", span.Status().Message())
document.AddString("Link", spanLinksToString(span.Links()))
document.AddAttributes("Attributes", span.Attributes())
m.encodeAttributes(&document, span.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddEvents("Events", span.Events())
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))

Expand All @@ -91,6 +92,22 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc
return buf.Bytes(), err
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
key := "Attributes"
if m.mode == MappingRaw {
key = ""
}
document.AddAttributes(key, attributes)
}

func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) {
key := "Events"
if m.mode == MappingRaw {
key = ""
}
document.AddEvents(key, events)
}

func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
linkArray := make([]map[string]any, 0, spanLinkSlice.Len())
for i := 0; i < spanLinkSlice.Len(); i++ {
Expand Down
111 changes: 111 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
package elasticsearchexporter

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
)

var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":2,"TraceStatusDescription":"Test"}`
Expand Down Expand Up @@ -63,3 +67,110 @@ func mockResourceSpans() ptrace.Traces {
event.Attributes().PutStr("evnetMockBar", "bar")
return traces
}

func TestEncodeAttributes(t *testing.T) {
t.Parallel()

attributes := pcommon.NewMap()
err := attributes.FromRaw(map[string]any{
"s": "baz",
"o": map[string]any{
"sub_i": 19,
},
})
require.NoError(t, err)

tests := map[string]struct {
mappingMode MappingMode
want func() objmodel.Document
}{
"raw": {
mappingMode: MappingRaw,
want: func() objmodel.Document {
return objmodel.DocumentFromAttributes(attributes)
},
},
"none": {
mappingMode: MappingNone,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
return doc
},
},
"ecs": {
mappingMode: MappingECS,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
return doc
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
m := encodeModel{
mode: test.mappingMode,
}

doc := objmodel.Document{}
m.encodeAttributes(&doc, attributes)
require.Equal(t, test.want(), doc)
})
}
}

func TestEncodeEvents(t *testing.T) {
t.Parallel()

events := ptrace.NewSpanEventSlice()
events.EnsureCapacity(4)
for i := 0; i < 4; i++ {
event := events.AppendEmpty()
event.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Duration(i) * time.Minute)))
event.SetName(fmt.Sprintf("event_%d", i))
}

tests := map[string]struct {
mappingMode MappingMode
want func() objmodel.Document
}{
"raw": {
mappingMode: MappingRaw,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddEvents("", events)
return doc
},
},
"none": {
mappingMode: MappingNone,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddEvents("Events", events)
return doc
},
},
"ecs": {
mappingMode: MappingECS,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddEvents("Events", events)
return doc
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
m := encodeModel{
mode: test.mappingMode,
}

doc := objmodel.Document{}
m.encodeEvents(&doc, events)
require.Equal(t, test.want(), doc)
})
}
}
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ elasticsearch/logstash_format:
endpoints: [http://localhost:9200]
logstash_format:
enabled: true
elasticsearch/raw:
endpoints: [http://localhost:9200]
mapping:
mode: raw
6 changes: 5 additions & 1 deletion exporter/elasticsearchexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp
maxAttempts = cfg.Retry.MaxRequests
}

model := &encodeModel{dedup: cfg.Mapping.Dedup, dedot: cfg.Mapping.Dedot}
model := &encodeModel{
dedup: cfg.Mapping.Dedup,
dedot: cfg.Mapping.Dedot,
mode: cfg.MappingMode(),
}

return &elasticsearchTracesExporter{
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/traces_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestTracesExporter_New(t *testing.T) {
cfg.Mapping.Dedot = false
cfg.Mapping.Dedup = true
}),
want: successWithInternalModel(&encodeModel{dedot: false, dedup: true}),
want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}),
},
}

Expand Down

0 comments on commit aa36c85

Please sign in to comment.