Skip to content

Commit

Permalink
[exporter/elasticsearch] Make OTel mapping mode send to data streams …
Browse files Browse the repository at this point in the history
…only (#35839)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Make OTel mapping mode use RequireDataStream in docappender, which means
it will only send to data streams. This prevents auto creating regular
indices in OTel mapping mode due to a race condition in Elasticsearch
where otel-data index templates are not ready.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
carsonip authored Oct 18, 2024
1 parent 17afe14 commit 1fab9bb
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-require-data-stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make OTel mapping mode send to data streams only

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35839]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This prevents auto creating regular indices in OTel mapping mode due to a race condition in Elasticsearch where otel-data index templates are not ready.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
RequireDataStream: config.MappingMode() == MappingOTel,
}
}

Expand Down
55 changes: 55 additions & 0 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,61 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
}
}

func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
tests := []struct {
name string
config Config
wantRequireDataStream bool
}{
{
name: "ecs",
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingECS.String()},
},
wantRequireDataStream: false,
},
{
name: "otel",
config: Config{
NumWorkers: 1,
Mapping: MappingsSettings{Mode: MappingOTel.String()},
},
wantRequireDataStream: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
requireDataStreamCh := make(chan bool, 1)
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
RoundTripFunc: func(r *http.Request) (*http.Response, error) {
if r.URL.Path == "/_bulk" {
requireDataStreamCh <- r.URL.Query().Get("require_data_stream") == "true"
}
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))

assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
})
}
}

func TestAsyncBulkIndexer_flush_error(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 1fab9bb

Please sign in to comment.