Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/re-allow multiple workers #36134

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/prometheusremotewrite-reallow-multiple-workers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Re allows the configuration of multiple workers

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36134]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
31 changes: 30 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The following settings can be optionally configured:
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `enabled`: enable the sending queue (default: `true`)
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5`)
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests. (default: `5` or default: `1` if `EnableMultipleWorkersFeatureGate` is enabled).
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
- `target_info`: customize `target_info` metric
Expand All @@ -66,6 +66,8 @@ The following settings can be optionally configured:
- `max_batch_size_bytes` (default = `3000000` -> `~2.861 mb`): Maximum size of a batch of
samples to be sent to the remote write endpoint. If the batch size is larger
than this value, it will be split into multiple batches.
- `max_batch_request_parallelism` (default = `5`): Maximum parallelism allowed for a single request bigger than `max_batch_size_bytes`.
This configuration is used only when feature gate `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is enabled.

Example:

Expand Down Expand Up @@ -101,12 +103,22 @@ Several helper files are leveraged to provide additional capabilities automatica
- [Retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), note that the exporter doesn't support `sending_queue` but provides `remote_write_queue`.

### Feature gates

#### RetryOn429

This exporter has feature gate: `exporter.prometheusremotewritexporter.RetryOn429`.
When this feature gate is enable the prometheus remote write exporter will retry on 429 http status code with the provided retry configuration.
It currently doesn't support respecting the http header `Retry-After` if provided since the retry library used doesn't support this feature.

To enable it run collector with enabled feature gate `exporter.prometheusremotewritexporter.RetryOn429`. This can be done by executing it with one additional parameter - `--feature-gates=telemetry.useOtelForInternalMetrics`.

#### EnableMultipleWorkersFeatureGate

This exporter has feature gate: `+exporter.prometheusremotewritexporter.EnableMultipleWorkers`.

When this feature gate is enabled, `num_consumers` will be used as the worker counter for handling batches from the queue, and `max_batch_request_parallelism` will be used for parallelism on single batch bigger than `max_batch_size_bytes`.
Enabling this feature gate, with `num_consumers` higher than 1 requires the target destination to supports ingestion of OutOfOrder samples. See [Multiple Consumers and OutOfOrder](#multiple-consumers-and-outoforder) for more info
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this value needs to default to 1 if this feature gate is enabled. When the feature gate is ultimately removed I don't think it appropriate to have a default value that is known to not work with any number of potential receivers and would not work with the default configuration of those receivers that are known to be able to support it in some configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aggree that will help with transition, and that the exporter should have "sane" defaults at all times.
done. please check


## Metric names and labels normalization

OpenTelemetry metric names and attributes are normalized to be compliant with Prometheus naming rules. [Details on this normalization process are described in the Prometheus translator module](../../pkg/translator/prometheus/).
Expand Down Expand Up @@ -149,3 +161,20 @@ sum by (namespace) (app_ads_ad_requests_total)
[beta]:https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol

## Multiple Consumers and OutOfOrder

**DISCLAIMER**: This snippet applies only to Prometheus, other remote write destinations using Prometheus Protocol (ex: Thanos/Grafana Mimir/VictoriaMetrics) may have different settings.

By default, Prometheus expects samples to be ingested sequentially, in temporal order.

When multiple consumers are enabled, the temporal ordering of the samples written to the target destination is not deterministic, and temporal ordering can no longer be guaranteed. For example, one worker may push a sample for `t+30s`, and a second worker may push an additional sample but for `t+15s`.

Vanilla Prometheus configurations will reject these unordered samples and you'll receive "out of order" errors.

Out-of-order support in Prometheus must be enabled for multiple consumers.
This can be done by using the `tsdb.out_of_order_time_window: 10m` settings. Please choose an appropriate time window to support pushing the worst-case scenarios of a "queue" build-up on the sender side.

See for more info:
- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb
- https://prometheus.io/docs/prometheus/latest/feature_flags/#remote-write-receiver
10 changes: 10 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Config struct {
// maximum size in bytes of time series batch sent to remote storage
MaxBatchSizeBytes int `mapstructure:"max_batch_size_bytes"`

// maximum amount of parallel requests to do when handling large batch request
MaxBatchRequestParallelism *int `mapstructure:"max_batch_request_parallelism"`

// ResourceToTelemetrySettings is the option for converting resource attributes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down Expand Up @@ -87,6 +90,13 @@ var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.MaxBatchRequestParallelism != nil && *cfg.MaxBatchRequestParallelism < 1 {
return fmt.Errorf("max_batch_request_parallelism can't be set to below 1")
}
if enableMultipleWorkersFeatureGate.IsEnabled() && cfg.MaxBatchRequestParallelism == nil {
return fmt.Errorf("enabling featuregate `+exporter.prometheusremotewritexporter.EnableMultipleWorkers` requires setting `max_batch_request_parallelism` in the configuration")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to require people to set this.

}

if cfg.RemoteWriteQueue.QueueSize < 0 {
return fmt.Errorf("remote write queue size can't be negative")
}
Expand Down
14 changes: 12 additions & 2 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "2"),
expected: &Config{
MaxBatchSizeBytes: 3000000,
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
MaxBatchSizeBytes: 3000000,
MaxBatchRequestParallelism: toPtr(10),
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
Expand Down Expand Up @@ -90,6 +91,10 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "negative_num_consumers"),
errorMessage: "remote write consumer number can't be negative",
},
{
id: component.NewIDWithName(metadata.Type, "less_than_1_max_batch_request_parallelism"),
errorMessage: "max_batch_request_parallelism can't be set to below 1",
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -136,3 +141,8 @@ func TestDisabledTargetInfo(t *testing.T) {

assert.False(t, cfg.(*Config).TargetInfo.Enabled)
}

func toPtr[T any](val T) *T {
ret := val
return &ret
}
7 changes: 6 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,18 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

concurrency := cfg.RemoteWriteQueue.NumConsumers
if enableMultipleWorkersFeatureGate.IsEnabled() || cfg.MaxBatchRequestParallelism != nil {
concurrency = *cfg.MaxBatchRequestParallelism
}
Comment on lines +123 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
concurrency := cfg.RemoteWriteQueue.NumConsumers
if enableMultipleWorkersFeatureGate.IsEnabled() || cfg.MaxBatchRequestParallelism != nil {
concurrency = *cfg.MaxBatchRequestParallelism
}
concurrency := 5
if !enableMultipleWorkersFeatureGate.IsEnabled() {
concurrency = cfg.RemoteWriteQueue.NumConsumers
}
if cfg.MaxBatchRequestParallelism != nil {
concurrency = *cfg.MaxBatchRequestParallelism
}

We want:

  • A default of 5
  • To always use MaxBatchRequestParallelism if it is set
  • To only use NumConsumers if the feature gate is disabled.

This suggestion accomplishes that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way it will be a "change in behavior". There may be some users that set the cfg.RemoteWriteQueue.NumConsumers to increase the concurrency. This way, it will always be set to 5 unless they explicitely set *cfg.MaxBatchRequestParallelism.

Hence why I checked for the featuregate above to make sure *cfg.MaxBatchRequestParallelism was explicitly set by the user, and if not, default to cfg.RemoteWriteQueue.NumConsumers.

I may have misunderstood but I thought the plan was to keep full retro compability.
Are we okay with setting a default to 5, and if users want to increased parallelism they need to set *cfg.MaxBatchRequestParallelism?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will only be a change in behavior once the feature gate is enabled. It shouldn't be a breaking change in this PR (unless i'm mistaken). The reason we have a feature gate at all is to make the migration from the breaking change easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i missed the gate difference. ill update this over the weekend


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to always always use cfg.MaxBatchRequestParallelism, if it is set by the user. That way, existing users can migrate from RemoteWriteQueue.NumConsumers to cfg.MaxBatchRequestParallelism right away. If the feature gate is disabled, and a user has set NumConsumers to a non-default value, it would be nice to emit a warning instructing them to migrate. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. please check if you agree

prwe := &prwExporter{
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
concurrency: concurrency,
clientSettings: &cfg.ClientConfig,
settings: set.TelemetrySettings,
retrySettings: cfg.BackOffConfig,
Expand Down
31 changes: 23 additions & 8 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ var retryOn429FeatureGate = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterDescription("When enabled, the Prometheus remote write exporter will retry 429 http status code. Requires exporter.prometheusremotewritexporter.metrics.RetryOn429 to be enabled."),
)

var enableMultipleWorkersFeatureGate = featuregate.GlobalRegistry().MustRegister(
"exporter.prometheusremotewritexporter.EnableMultipleWorkers",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled and settings configured, the Prometheus remote exporter will"+
" spawn multiple workers/goroutines to handle incoming metrics batches concurrently"),
)

// NewFactory creates a new Prometheus Remote Write exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand All @@ -42,17 +49,19 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
return nil, errors.New("invalid configuration")
}

if !enableMultipleWorkersFeatureGate.IsEnabled() && prwCfg.RemoteWriteQueue.NumConsumers != 5 {
set.Logger.Warn("`remote_write_queue.num_consumers` is deprecated for controlling the single request parallelism. Please configure `max_batch_request_parallelism: X` for the same behavior.")
bmiguel-teixeira marked this conversation as resolved.
Show resolved Hide resolved
}

prwe, err := newPRWExporter(prwCfg, set)
if err != nil {
return nil, err
}

// Don't allow users to configure the queue.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/2949.
// Prometheus remote write samples needs to be in chronological
// order for each timeseries. If we shard the incoming metrics
// without considering this limitation, we experience
// "out of order samples" errors.
numConsumers := 1
if enableMultipleWorkersFeatureGate.IsEnabled() {
numConsumers = prwCfg.RemoteWriteQueue.NumConsumers
}
exporter, err := exporterhelper.NewMetrics(
ctx,
set,
Expand All @@ -61,7 +70,7 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueConfig{
Enabled: prwCfg.RemoteWriteQueue.Enabled,
NumConsumers: 1,
NumConsumers: numConsumers,
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
}),
exporterhelper.WithStart(prwe.Start),
Expand All @@ -83,10 +92,16 @@ func createDefaultConfig() component.Config {
clientConfig.WriteBufferSize = 512 * 1024
clientConfig.Timeout = exporterhelper.NewDefaultTimeoutConfig().Timeout

numConsumers := 5
if enableMultipleWorkersFeatureGate.IsEnabled() {
numConsumers = 1
}
return &Config{
Namespace: "",
ExternalLabels: map[string]string{},
MaxBatchSizeBytes: 3000000,
// To set this as default once `exporter.prometheusremotewritexporter.EnableMultipleWorkers` is removed
//MaxBatchRequestParallelism: 5,
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: retrySettings,
AddMetricSuffixes: true,
Expand All @@ -96,7 +111,7 @@ func createDefaultConfig() component.Config {
RemoteWriteQueue: RemoteWriteQueue{
Enabled: true,
QueueSize: 10000,
NumConsumers: 5,
dashpole marked this conversation as resolved.
Show resolved Hide resolved
NumConsumers: numConsumers,
},
TargetInfo: &TargetInfo{
Enabled: true,
Expand Down
5 changes: 5 additions & 0 deletions exporter/prometheusremotewriteexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ prometheusremotewrite:

prometheusremotewrite/2:
namespace: "test-space"
max_batch_request_parallelism: 10
retry_on_failure:
enabled: true
initial_interval: 10s
Expand Down Expand Up @@ -38,6 +39,10 @@ prometheusremotewrite/negative_num_consumers:
queue_size: 5
num_consumers: -1

prometheusremotewrite/less_than_1_max_batch_request_parallelism:
endpoint: "localhost:8888"
max_batch_request_parallelism: 0

prometheusremotewrite/disabled_target_info:
endpoint: "localhost:8888"
target_info:
Expand Down
Loading