Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Deprecate retry::max_requests in favor of retry::max_retries #35571

Merged
merged 9 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate retry::max_requests in favor of retry::max_retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32344]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: retry::max_retries will be exactly retry::max_requests - 1

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ The behaviour of this bulk indexing can be configured with the following setting
- `interval` (default=30s): Write buffer flush time limit.
- `retry`: Elasticsearch bulk request retry settings
- `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff.
- `max_requests` (default=3): Number of HTTP request retries.
- `max_requests` (DEPRECATED, use retry::max_retries instead): Number of HTTP request retries including the initial attempt. If used, `retry::max_retries` will be set to `max_requests - 1`.
- `max_retries` (default=2): Number of HTTP request retries. To disable retries, set `retry::enabled` to `false` instead of setting `max_retries` to `0`.
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed.
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
- `retry_on_status` (default=[429]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it defaults to `[429]`.
Expand Down
43 changes: 19 additions & 24 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,34 @@ type bulkIndexerSession interface {
Flush(context.Context) error
}

const defaultMaxRetries = 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[to reviewer] moving default out of default config, otherwise we cannot detect max_requests !=0 and max_retries != 0 due to default config being initialized first.


func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
return newSyncBulkIndexer(logger, client, config), nil
}
return newAsyncBulkIndexer(logger, client, config)
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
var maxDocRetry int
func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig {
var maxDocRetries int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
maxDocRetries = defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxDocRetries = config.Retry.MaxRetries
}
}
return docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
}
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
return &syncBulkIndexer{
config: docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
},
config: bulkIndexerConfig(client, config),
flushTimeout: config.Timeout,
retryConfig: config.Retry,
logger: logger,
Expand Down Expand Up @@ -164,13 +171,6 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
flushBytes = 5e+6
}

var maxDocRetry int
if config.Retry.Enabled {
// max_requests includes initial attempt
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
maxDocRetry = config.Retry.MaxRequests - 1
}

pool := &asyncBulkIndexer{
wg: sync.WaitGroup{},
items: make(chan docappender.BulkIndexerItem, config.NumWorkers),
Expand All @@ -179,12 +179,7 @@ func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, confi
pool.wg.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetry,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
})
bi, err := docappender.NewBulkIndexer(bulkIndexerConfig(client, config))
if err != nil {
return nil, err
}
Expand Down
24 changes: 22 additions & 2 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ type RetrySettings struct {
// Enabled allows users to disable retry without having to comment out all settings.
Enabled bool `mapstructure:"enabled"`

// MaxRequests configures how often an HTTP request is retried before it is assumed to be failed.
// MaxRequests configures how often an HTTP request is attempted before it is assumed to be failed.
// Deprecated: use MaxRetries instead.
MaxRequests int `mapstructure:"max_requests"`

// MaxRetries configures how many times an HTTP request is retried.
MaxRetries int `mapstructure:"max_retries"`
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved

// InitialInterval configures the initial waiting time if a request failed.
InitialInterval time.Duration `mapstructure:"initial_interval"`

Expand Down Expand Up @@ -269,6 +273,17 @@ func (cfg *Config) Validate() error {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
}

if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {
return errors.New("must not specify both retry::max_requests and retry::max_retries")
}
if cfg.Retry.MaxRequests < 0 {
return errors.New("retry::max_requests should be non-negative")
}
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}

return nil
}

Expand Down Expand Up @@ -351,11 +366,16 @@ func (cfg *Config) MappingMode() MappingMode {
return mappingModes[cfg.Mapping.Mode]
}

func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) {
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
if cfg.Mapping.Dedup != nil {
logger.Warn("dedup is deprecated, and is always enabled")
}
if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS {
logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only")
}
if cfg.Retry.MaxRequests != 0 {
carsonip marked this conversation as resolved.
Show resolved Hide resolved
cfg.Retry.MaxRetries = cfg.Retry.MaxRequests - 1
// Do not set cfg.Retry.Enabled = false if cfg.Retry.MaxRequest = 1 to avoid breaking change on behavior
logger.Warn("retry::max_requests has been deprecated, and will be removed in a future version. Use retry::max_retries instead.")
}
}
14 changes: 11 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConfig(t *testing.T) {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError},
Expand Down Expand Up @@ -391,6 +391,14 @@ func TestConfig_Validate(t *testing.T) {
}),
err: `compression is not currently configurable`,
},
"both max_retries and max_requests specified": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Retry.MaxRetries = 1
cfg.Retry.MaxRequests = 1
}),
err: `must not specify both retry::max_requests and retry::max_retries`,
},
}

for name, tt := range tests {
Expand Down
17 changes: 6 additions & 11 deletions exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ func newElasticsearchClient(
headers := make(http.Header)
headers.Set("User-Agent", userAgent)

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.

maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0

if retryDisabled {
maxRetries = 0
}

// endpoints converts Config.Endpoints, Config.CloudID,
// and Config.ClientConfig.Endpoint to a list of addresses.
endpoints, err := config.endpoints()
Expand All @@ -113,6 +103,11 @@ func newElasticsearchClient(
logResponseBody: config.LogResponseBody,
}

maxRetries := defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxRetries = config.Retry.MaxRetries
}

return elasticsearch.NewClient(elasticsearch.Config{
Transport: httpClient.Transport,

Expand All @@ -125,7 +120,7 @@ func newElasticsearchClient(

// configure retry behavior
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: retryDisabled,
DisableRetry: !config.Retry.Enabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
Expand Down
10 changes: 3 additions & 7 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,10 @@ func TestExporterLogs(t *testing.T) {

t.Run("no retry", func(t *testing.T) {
configurations := map[string]func(*Config){
"max_requests limited": func(cfg *Config) {
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
cfg.Retry.MaxRequests = 1
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
"retry.enabled is false": func(cfg *Config) {
cfg.Retry.Enabled = false
cfg.Retry.MaxRequests = 10
cfg.Retry.RetryOnStatus = []int{429}
cfg.Retry.MaxRetries = 10
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
},
Expand All @@ -366,7 +362,7 @@ func TestExporterLogs(t *testing.T) {
"fail http request": func(attempts *atomic.Int64) bulkHandler {
return func([]itemRequest) ([]itemResponse, error) {
attempts.Add(1)
return nil, &httpTestError{message: "oops"}
return nil, &httpTestError{message: "oops", status: 429}
}
},
"fail item": func(attempts *atomic.Int64) bulkHandler {
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config {
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 3,
MaxRetries: 0, // default is set in exporter code
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
Expand Down Expand Up @@ -110,7 +110,7 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
index = cf.Index
}
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled)

Expand All @@ -129,7 +129,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)

Expand All @@ -147,7 +147,7 @@ func createTracesExporter(ctx context.Context,
cfg component.Config,
) (exporter.Traces, error) {
cf := cfg.(*Config)
logConfigDeprecationWarnings(cf, set.Logger)
handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)

Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ elasticsearch/trace:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -38,7 +38,7 @@ elasticsearch/metric:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand All @@ -61,7 +61,7 @@ elasticsearch/log:
flush:
bytes: 10485760
retry:
max_requests: 5
max_retries: 5
retry_on_status:
- 429
- 500
Expand Down