Skip to content

Commit

Permalink
[exporter/elasticsearch] Deprecate retry::max_requests in favor of re…
Browse files Browse the repository at this point in the history
…try::max_retries (#35571)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
The new retry::max_retries will be exactly retry::max_requests - 1, but
it will be much more intuitive to the end user. Deprecate
retry::max_requests.

**Link to tracking Issue:** <Issue number if applicable>
Fixes #32344

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored Oct 17, 2024
1 parent 7d02d77 commit 5b1b4d4
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 55 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate retry::max_requests in favor of retry::max_retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32344]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: retry::max_retries will be exactly retry::max_requests - 1

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ The behaviour of this bulk indexing can be configured with the following setting
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`.
- `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.
Expand Down
43 changes: 19 additions & 24 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,34 @@ type bulkIndexerSession interface {
Flush(context.Context) error
}

const defaultMaxRetries = 2

func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
return newSyncBulkIndexer(logger, client, config), nil
}
return newAsyncBulkIndexer(logger, client, config)
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
var maxDocRetry int
func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig {
var maxDocRetries int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
maxDocRetries = defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxDocRetries = config.Retry.MaxRetries
}
}
return docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
}
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
return &syncBulkIndexer{
config: docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
},
config: bulkIndexerConfig(client, config),
flushTimeout: config.Timeout,
retryConfig: config.Retry,
logger: logger,
Expand Down Expand Up @@ -165,13 +172,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
flushBytes = 5e+6
}

var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
}

pool := &asyncBulkIndexer{
wg: sync.WaitGroup{},
items: make(chan docappender.BulkIndexerItem, config.NumWorkers),
Expand All @@ -180,12 +180,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
pool.wg.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
})
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config))
if err != nil {
return nil, err
}
Expand Down
24 changes: 22 additions & 2 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// MaxRequests configures how often an HTTP request is retried before it is assumed to be failed.
// MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed.
// Deprecated: use MaxRetries instead.
MaxRequests int `mapstructure:"max_requests"`

// MaxRetries configures how many times an HTTP request is retried.
MaxRetries int `mapstructure:"max_retries"`

// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

Expand Down Expand Up @@ -273,6 +277,17 @@ func (cfg *Config) Validate() error {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
}

if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {
return errors.New("must not specify both retry::max_requests and retry::max_retries")
}
if cfg.Retry.MaxRequests < 0 {
return errors.New("retry::max_requests should be non-negative")
}
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}

return nil
}

Expand Down Expand Up @@ -355,11 +370,16 @@ func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
if cfg.Mapping.Dedup != nil {
logger.Warn("dedup is deprecated, and is always enabled")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
}
if cfg.Retry.MaxRequests != 0 {
cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1
// Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior
logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.")
}
}
14 changes: 11 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -391,6 +391,14 @@ func TestConfig_Validate(t *testing.T) {
}),
err: `compression is not currently configurable`,
},
"both max_retries and max_requests specified": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Retry.MaxRetries = 1
cfg.Retry.MaxRequests = 1
}),
err: `must not specify both retry::max_requests and retry::max_retries`,
},
}

for name, tt := range tests {
Expand Down
17 changes: 6 additions & 11 deletions exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func newElasticsearchClient(
headers := make(http.Header)
headers.Set("User-Agent", userAgent)

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.

maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0

if retryDisabled {
maxRetries = 0
}

// endpoints converts Config.Endpoints, Config.CloudID,
// and Config.ClientConfig.Endpoint to a list of addresses.
endpoints, err := config.endpoints()
Expand All @@ -113,6 +103,11 @@ func newElasticsearchClient(
logResponseBody: config.LogResponseBody,
}

maxRetries := defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxRetries = config.Retry.MaxRetries
}

return elasticsearch.NewClient(elasticsearch.Config{
Transport: httpClient.Transport,

Expand All @@ -125,7 +120,7 @@ func newElasticsearchClient(

// configure retry behavior
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: retryDisabled,
DisableRetry: !config.Retry.Enabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
Expand Down
10 changes: 3 additions & 7 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,14 +540,10 @@ func TestExporterLogs(t *testing.T) {

t.Run("no retry", func(t *testing.T) {
configurations := map[string]func(*Config){
"max_requests limited": func(cfg *Config) {
cfg.Retry.MaxRequests = 1
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
"retry.enabled is false": func(cfg *Config) {
cfg.Retry.Enabled = false
cfg.Retry.MaxRequests = 10
cfg.Retry.RetryOnStatus = []int{429}
cfg.Retry.MaxRetries = 10
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
Expand All @@ -557,7 +553,7 @@ func TestExporterLogs(t *testing.T) {
"fail http request": func(attempts *atomic.Int64) bulkHandler {
return func([]itemRequest) ([]itemResponse, error) {
attempts.Add(1)
return nil, &httpTestError{message: "oops"}
return nil, &httpTestError{message: "oops", status: 429}
}
},
"fail item": func(attempts *atomic.Int64) bulkHandler {
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 3,
MaxRetries: 0, // default is set in exporter code
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
Expand Down Expand Up @@ -110,7 +110,7 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
index = cf.Index
}
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled)

Expand All @@ -129,7 +129,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)

Expand All @@ -147,7 +147,7 @@ func createTracesExporter(ctx context.Context,
cfg component.Config,
) (exporter.Traces, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)

Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ elasticsearch/trace:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -38,7 +38,7 @@ elasticsearch/metric:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -61,7 +61,7 @@ elasticsearch/log:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand Down

0 comments on commit 5b1b4d4

Please sign in to comment.