From 5b1b4d42c63ae90199d5d14144bf846b18f94726 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 17 Oct 2024 19:01:10 +0100 Subject: [PATCH] [exporter/elasticsearch] Deprecate retry::max_requests in favor of retry::max_retries (#35571) **Description:** The new retry::max_retries will be exactly retry::max_requests - 1, but it will be much more intuitive to the end user. Deprecate retry::max_requests. **Link to tracking Issue:** Fixes #32344 **Testing:** **Documentation:** --- ...hexporter_deprecate-retry-maxrequests.yaml | 27 ++++++++++++ exporter/elasticsearchexporter/README.md | 3 +- exporter/elasticsearchexporter/bulkindexer.go | 43 ++++++++----------- exporter/elasticsearchexporter/config.go | 24 ++++++++++- exporter/elasticsearchexporter/config_test.go | 14 ++++-- exporter/elasticsearchexporter/esclient.go | 17 +++----- .../elasticsearchexporter/exporter_test.go | 10 ++--- exporter/elasticsearchexporter/factory.go | 8 ++-- .../testdata/config.yaml | 6 +-- 9 files changed, 97 insertions(+), 55 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml diff --git a/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml b/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml new file mode 100644 index 000000000000..80bb0eac5fb8 --- /dev/null +++ b/.chloggen/elasticsearchexporter_deprecate-retry-maxrequests.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate retry::max_requests in favor of retry::max_retries + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32344] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: retry::max_retries will be exactly retry::max_requests - 1 + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5ec203f13674..b620b81158e9 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -202,7 +202,8 @@ The behaviour of this bulk indexing can be configured with the following setting - `interval` (default=30s): Write buffer flush time limit. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - - `max_requests` (default=3): Number of HTTP request retries. + - `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`. + - `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`. - `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 471ddc2dc7b9..62bc329a26f8 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -51,6 +51,8 @@ type bulkIndexerSession interface { Flush(context.Context) error } +const defaultMaxRetries = 2 + func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) { if config.Batcher.Enabled != nil { return newSyncBulkIndexer(logger, client, config), nil @@ -58,20 +60,25 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Co return newAsyncBulkIndexer(logger, client, config) } -func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer { - var maxDocRetry int +func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig { + var maxDocRetries int if config.Retry.Enabled { - // max_requests includes initial attempt - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 - maxDocRetry = config.Retry.MaxRequests - 1 + maxDocRetries = defaultMaxRetries + if config.Retry.MaxRetries != 0 { + maxDocRetries = config.Retry.MaxRetries + } } + return docappender.BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: maxDocRetries, + Pipeline: config.Pipeline, + RetryOnDocumentStatus: config.Retry.RetryOnStatus, + } +} + +func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer { return &syncBulkIndexer{ - config: docappender.BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: maxDocRetry, - Pipeline: config.Pipeline, - RetryOnDocumentStatus: config.Retry.RetryOnStatus, - }, + config: bulkIndexerConfig(client, config), flushTimeout: config.Timeout, retryConfig: config.Retry, logger: logger, @@ -165,13 +172,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi flushBytes = 5e+6 } - var maxDocRetry int - if config.Retry.Enabled { - // max_requests includes initial attempt - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 - maxDocRetry = config.Retry.MaxRequests - 1 - } - pool := &asyncBulkIndexer{ wg: sync.WaitGroup{}, items: make(chan docappender.BulkIndexerItem, config.NumWorkers), @@ -180,12 +180,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi pool.wg.Add(numWorkers) for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: maxDocRetry, - Pipeline: config.Pipeline, - RetryOnDocumentStatus: config.Retry.RetryOnStatus, - }) + bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config)) if err != nil { return nil, err } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 072bd725c6fe..fe794d6db430 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -169,9 +169,13 @@ type RetrySettings struct { // Enabled allows users to disable retry without having to comment out all settings. Enabled bool `mapstructure:"enabled"` - // MaxRequests configures how often an HTTP request is retried before it is assumed to be failed. + // MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed. + // Deprecated: use MaxRetries instead. MaxRequests int `mapstructure:"max_requests"` + // MaxRetries configures how many times an HTTP request is retried. + MaxRetries int `mapstructure:"max_retries"` + // InitialInterval configures the initial waiting time if a request failed. InitialInterval time.Duration `mapstructure:"initial_interval"` @@ -273,6 +277,17 @@ func (cfg *Config) Validate() error { // TODO support confighttp.ClientConfig.Compression return errors.New("compression is not currently configurable") } + + if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 { + return errors.New("must not specify both retry::max_requests and retry::max_retries") + } + if cfg.Retry.MaxRequests < 0 { + return errors.New("retry::max_requests should be non-negative") + } + if cfg.Retry.MaxRetries < 0 { + return errors.New("retry::max_retries should be non-negative") + } + return nil } @@ -355,11 +370,16 @@ func (cfg *Config) MappingMode() MappingMode { return mappingModes[cfg.Mapping.Mode] } -func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) { +func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { if cfg.Mapping.Dedup != nil { logger.Warn("dedup is deprecated, and is always enabled") } if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS { logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only") } + if cfg.Retry.MaxRequests != 0 { + cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1 + // Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior + logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.") + } } diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index a27a28ccfe6e..9934dbb7365b 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 5, + MaxRetries: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, @@ -391,6 +391,14 @@ func TestConfig_Validate(t *testing.T) { }), err: `compression is not currently configurable`, }, + "both max_retries and max_requests specified": { + config: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"http://test:9200"} + cfg.Retry.MaxRetries = 1 + cfg.Retry.MaxRequests = 1 + }), + err: `must not specify both retry::max_requests and retry::max_retries`, + }, } for name, tt := range tests { diff --git a/exporter/elasticsearchexporter/esclient.go b/exporter/elasticsearchexporter/esclient.go index 23c2d48bb9ef..556718242bbf 100644 --- a/exporter/elasticsearchexporter/esclient.go +++ b/exporter/elasticsearchexporter/esclient.go @@ -90,16 +90,6 @@ func newElasticsearchClient( headers := make(http.Header) headers.Set("User-Agent", userAgent) - // maxRetries configures the maximum number of event publishing attempts, - // including the first send and additional retries. - - maxRetries := config.Retry.MaxRequests - 1 - retryDisabled := !config.Retry.Enabled || maxRetries <= 0 - - if retryDisabled { - maxRetries = 0 - } - // endpoints converts Config.Endpoints, Config.CloudID, // and Config.ClientConfig.Endpoint to a list of addresses. endpoints, err := config.endpoints() @@ -113,6 +103,11 @@ func newElasticsearchClient( logResponseBody: config.LogResponseBody, } + maxRetries := defaultMaxRetries + if config.Retry.MaxRetries != 0 { + maxRetries = config.Retry.MaxRetries + } + return elasticsearch.NewClient(elasticsearch.Config{ Transport: httpClient.Transport, @@ -125,7 +120,7 @@ func newElasticsearchClient( // configure retry behavior RetryOnStatus: config.Retry.RetryOnStatus, - DisableRetry: retryDisabled, + DisableRetry: !config.Retry.Enabled, EnableRetryOnTimeout: config.Retry.Enabled, //RetryOnError: retryOnError, // should be used from esclient version 8 onwards MaxRetries: maxRetries, diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index f1b455e41e1e..9a9b86a5be6f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -540,14 +540,10 @@ func TestExporterLogs(t *testing.T) { t.Run("no retry", func(t *testing.T) { configurations := map[string]func(*Config){ - "max_requests limited": func(cfg *Config) { - cfg.Retry.MaxRequests = 1 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }, "retry.enabled is false": func(cfg *Config) { cfg.Retry.Enabled = false - cfg.Retry.MaxRequests = 10 + cfg.Retry.RetryOnStatus = []int{429} + cfg.Retry.MaxRetries = 10 cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 10 * time.Millisecond }, @@ -557,7 +553,7 @@ func TestExporterLogs(t *testing.T) { "fail http request": func(attempts *atomic.Int64) bulkHandler { return func([]itemRequest) ([]itemResponse, error) { attempts.Add(1) - return nil, &httpTestError{message: "oops"} + return nil, &httpTestError{message: "oops", status: 429} } }, "fail item": func(attempts *atomic.Int64) bulkHandler { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 3f48ca1e2ec7..61af38d5cee6 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config { }, Retry: RetrySettings{ Enabled: true, - MaxRequests: 3, + MaxRetries: 0, // default is set in exporter code InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, RetryOnStatus: []int{ @@ -110,7 +110,7 @@ func createLogsExporter( set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.") index = cf.Index } - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) @@ -129,7 +129,7 @@ func createMetricsExporter( cfg component.Config, ) (exporter.Metrics, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) @@ -147,7 +147,7 @@ func createTracesExporter(ctx context.Context, cfg component.Config, ) (exporter.Traces, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + handleDeprecatedConfig(cf, set.Logger) exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 6f614399b579..d76d300a51c1 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -17,7 +17,7 @@ elasticsearch/trace: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500 @@ -38,7 +38,7 @@ elasticsearch/metric: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500 @@ -61,7 +61,7 @@ elasticsearch/log: flush: bytes: 10485760 retry: - max_requests: 5 + max_retries: 5 retry_on_status: - 429 - 500