Skip to content

Commit

Permalink
[exporter/elasticsearch] remove dedup config (open-telemetry#33776)
Browse files Browse the repository at this point in the history
**Description:**

Remove the `dedup` configuration setting, and always de-duplicate.
Elasticsearch does not permit duplicate keys in JSON objects, and this
configuration is adding more complexity to the code than it's worth.

I've simplified the `internal/objmodel` API slightly, unexporting the
`Sort` methods, which are internally called by the now unconditional
call to `Dedup`.

**Link to tracking Issue:**

Closes
open-telemetry#33773

**Testing:**

Ran the unit tests, which cover deduplication. None of the tests in
package elasticsearchexporter covered `dedup: false`.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Carson Ip <[email protected]>
Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent fce2cfe commit e495816
Show file tree
Hide file tree
Showing 16 changed files with 150 additions and 144 deletions.
29 changes: 29 additions & 0 deletions .chloggen/elasticsearch-always-dedup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make "dedup" option no-op, always de-duplicate.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33773]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Elasticsearch does not permit duplicate keys in JSON objects,
so there is no value in being able to configure deduplication.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ require (
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.954 // indirect
github.com/tg123/go-htpasswd v1.2.2 // indirect
github.com/tidwall/gjson v1.14.2 // indirect
github.com/tidwall/gjson v1.17.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/tinylru v1.1.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,9 @@ behaviours, which may be configured through the following settings:
- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
- `dedup` (default=true; DEPRECATED, in future deduplication will always be enabled):
Try to find and remove duplicate fields/attributes from events before publishing
to Elasticsearch. Some structured logging libraries can produce duplicate fields
(for example zap). Elasticsearch will reject documents that have duplicate fields.
- `dedup` (DEPRECATED). This configuration is deprecated and non-operational,
and will be removed in the future. Object keys are always deduplicated to
avoid Elasticsearch rejecting documents.
- `dedot` (default=true; DEPRECATED, in future dedotting will always be enabled
for ECS mode, and never for other modes): When enabled attributes with `.`
will be split into proper json objects.
Expand Down
14 changes: 7 additions & 7 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ type MappingsSettings struct {
// Mode configures the field mappings.
Mode string `mapstructure:"mode"`

// Try to find and remove duplicate fields
// Dedup is non-operational, and will be removed in the future.
//
// Deprecated: [v0.104.0] deduplication will always be applied in future,
// with no option to disable. Disabling deduplication is not meaningful,
// as Elasticsearch will reject documents with duplicate JSON object keys.
Dedup bool `mapstructure:"dedup"`
// Deprecated: [v0.104.0] deduplication is always enabled, and cannot be
// disabled. Disabling deduplication is not meaningful, as Elasticsearch
// will always reject documents with duplicate JSON object keys.
Dedup *bool `mapstructure:"dedup,omitempty"`

// Deprecated: [v0.104.0] dedotting will always be applied for ECS mode
// in future, and never for other modes. Elasticsearch's "dot_expander"
Expand Down Expand Up @@ -322,8 +322,8 @@ func (cfg *Config) MappingMode() MappingMode {
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
if !cfg.Mapping.Dedup {
logger.Warn("dedup has been deprecated, and will always be enabled in future")
if cfg.Mapping.Dedup != nil {
logger.Warn("dedup is deprecated, and is always enabled")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
Expand Down
3 changes: 0 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func TestConfig(t *testing.T) {
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Expand Down Expand Up @@ -162,7 +161,6 @@ func TestConfig(t *testing.T) {
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Expand Down Expand Up @@ -224,7 +222,6 @@ func TestConfig(t *testing.T) {
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Expand Down
1 change: 0 additions & 1 deletion exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func newExporter(
}

model := &encodeModel{
dedup: cfg.Mapping.Dedup,
dedot: cfg.Mapping.Dedot,
mode: cfg.MappingMode(),
}
Expand Down
1 change: 0 additions & 1 deletion exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func createDefaultConfig() component.Config {
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Expand Down
9 changes: 5 additions & 4 deletions exporter/elasticsearchexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing
func TestFactory_DedupDeprecated(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
dedup := false
cfg.Endpoint = "http://testing.invalid:9200"
cfg.Mapping.Dedup = false
cfg.Mapping.Dedup = &dedup
cfg.Mapping.Dedot = false // avoid dedot warnings
})

Expand All @@ -123,9 +124,9 @@ func TestFactory_DedupDeprecated(t *testing.T) {

records := logObserver.AllUntimed()
assert.Len(t, records, 3)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[0].Message)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[1].Message)
assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[2].Message)
assert.Equal(t, "dedup is deprecated, and is always enabled", records[0].Message)
assert.Equal(t, "dedup is deprecated, and is always enabled", records[1].Message)
assert.Equal(t, "dedup is deprecated, and is always enabled", records[2].Message)
}

func TestFactory_DedotDeprecated(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.104.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.104.0
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.17.1
go.opentelemetry.io/collector/component v0.104.1-0.20240712081520-6227646b0146
go.opentelemetry.io/collector/config/configauth v0.104.1-0.20240712081520-6227646b0146
go.opentelemetry.io/collector/config/configcompression v1.11.1-0.20240712081520-6227646b0146
Expand Down Expand Up @@ -64,6 +65,8 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/cors v1.11.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.elastic.co/apm/module/apmzap/v2 v2.6.0 // indirect
go.elastic.co/apm/v2 v2.6.0 // indirect
go.elastic.co/fastjson v1.3.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/integrationtest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,14 @@ func (doc *Document) AddEvents(key string, events ptrace.SpanEventSlice) {
}
}

// Sort sorts all fields in the document by key name.
func (doc *Document) Sort() {
func (doc *Document) sort() {
sort.SliceStable(doc.fields, func(i, j int) bool {
return doc.fields[i].key < doc.fields[j].key
})

for i := range doc.fields {
fld := &doc.fields[i]
fld.value.Sort()
fld.value.sort()
}
}

Expand All @@ -199,7 +198,7 @@ func (doc *Document) Sort() {
func (doc *Document) Dedup() {
// 1. Always ensure the fields are sorted, Dedup support requires
// Fields to be sorted.
doc.Sort()
doc.sort()

// 2. rename fields if a primitive value is overwritten by an object.
// For example the pair (path.x=1, path.x.a="test") becomes:
Expand All @@ -223,7 +222,7 @@ func (doc *Document) Dedup() {
}
}
if renamed {
doc.Sort()
doc.sort()
}

// 3. mark duplicates as 'ignore'
Expand Down Expand Up @@ -423,14 +422,13 @@ func ValueFromAttribute(attr pcommon.Value) Value {
}
}

// Sort recursively sorts all keys in docuemts held by the value.
func (v *Value) Sort() {
func (v *Value) sort() {
switch v.kind {
case KindObject:
v.doc.Sort()
v.doc.sort()
case KindArr:
for i := range v.arr {
v.arr[i].Sort()
v.arr[i].sort()
}
}
}
Expand Down
36 changes: 0 additions & 36 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,46 +80,11 @@ func TestObjectModel_CreateMap(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
doc := test.build()
doc.Sort()
assert.Equal(t, test.want, doc)
})
}
}

func TestDocument_Sort(t *testing.T) {
tests := map[string]struct {
build func() Document
want Document
}{
"keys are sorted": {
build: func() (doc Document) {
doc.AddInt("z", 26)
doc.AddInt("a", 1)
return doc
},
want: Document{[]field{{"a", IntValue(1)}, {"z", IntValue(26)}}},
},
"sorting is stable": {
build: func() (doc Document) {
doc.AddInt("a", 1)
doc.AddInt("c", 3)
doc.AddInt("a", 2)
return doc
},
want: Document{[]field{{"a", IntValue(1)}, {"a", IntValue(2)}, {"c", IntValue(3)}}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
doc := test.build()
doc.Sort()
assert.Equal(t, test.want, doc)
})
}

}

func TestObjectModel_Dedup(t *testing.T) {
tests := map[string]struct {
build func() Document
Expand Down Expand Up @@ -200,7 +165,6 @@ func TestObjectModel_Dedup(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
doc := test.build()
doc.Sort()
doc.Dedup()
assert.Equal(t, test.want, doc)
})
Expand Down
20 changes: 3 additions & 17 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type mappingModel interface {
//
// See: https://github.com/open-telemetry/oteps/blob/master/text/logs/0097-log-data-model.md
type encodeModel struct {
dedup bool
dedot bool
mode MappingMode
}
Expand All @@ -95,13 +94,9 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
document.Dedup()

var buf bytes.Buffer
if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}
err := document.Serialize(&buf, m.dedot)
return buf.Bytes(), err
}
Expand Down Expand Up @@ -171,11 +166,7 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo
}

func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) {
if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}
document.Dedup()

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
Expand Down Expand Up @@ -225,12 +216,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, sc
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))

if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}
document.Dedup()

var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
Expand Down
Loading

0 comments on commit e495816

Please sign in to comment.