diff --git a/CHANGELOG.md b/CHANGELOG.md index b992fbf7e2..efa0a139f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 * [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 +* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 06e34d991f..1e730d99e1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3159,11 +3159,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # e.g. remote_write.write_relabel_configs. [metric_relabel_configs: | default = []] -# Enables support for exemplars in TSDB and sets the maximum number that will be -# stored. less than zero means disabled. If the value is set to zero, cortex -# will fallback to blocks-storage.tsdb.max-exemplars value. -# CLI flag: -ingester.max-exemplars -[max_exemplars: | default = 0] +# Limit on total number of positive and negative buckets allowed in a single +# native histogram. The resolution of a histogram with more buckets will be +# reduced until the number of buckets is within the limit. If the limit cannot +# be reached, the sample will be discarded. 0 means no limit. Enforced at +# Distributor. +# CLI flag: -validation.max-native-histogram-buckets +[max_native_histogram_buckets: | default = 0] # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user @@ -3213,6 +3215,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -ingester.out-of-order-time-window [out_of_order_time_window: | default = 0s] +# Enables support for exemplars in TSDB and sets the maximum number that will be +# stored. less than zero means disabled. If the value is set to zero, cortex +# will fallback to blocks-storage.tsdb.max-exemplars value. +# CLI flag: -ingester.max-exemplars +[max_exemplars: | default = 0] + # Maximum number of chunks that can be fetched in a single query from ingesters # and long-term storage. This limit is enforced in the querier, ruler and # store-gateway. 0 to disable. diff --git a/pkg/cortexpb/histograms.go b/pkg/cortexpb/histograms.go index 2e2afef457..129b25d300 100644 --- a/pkg/cortexpb/histograms.go +++ b/pkg/cortexpb/histograms.go @@ -15,6 +15,11 @@ package cortexpb import "github.com/prometheus/prometheus/model/histogram" +const ( + ExponentialSchemaMax int32 = 8 + ExponentialSchemaMin int32 = -4 +) + func (h Histogram) IsFloatHistogram() bool { _, ok := h.GetCount().(*Histogram_CountFloat) return ok diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 055053b436..664a221b10 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -579,12 +579,16 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri if len(ts.Histograms) > 0 { // Only alloc when data present histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms)) - for _, h := range ts.Histograms { - // TODO(yeya24): add other validations for native histogram. - // For example, Prometheus scrape has bucket limit and schema check. + for i, h := range ts.Histograms { if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, h.TimestampMs); err != nil { return emptyPreallocSeries, err } + // TODO(yeya24): add max schema validation for native histogram if needed. + convertedHistogram, err := validation.ValidateNativeHistogram(d.validateMetrics, limits, userID, ts.Labels, h) + if err != nil { + return emptyPreallocSeries, err + } + ts.Histograms[i] = convertedHistogram } histograms = append(histograms, ts.Histograms...) } diff --git a/pkg/util/validation/errors.go b/pkg/util/validation/errors.go index 483bf6acc5..29fa69b0c0 100644 --- a/pkg/util/validation/errors.go +++ b/pkg/util/validation/errors.go @@ -225,6 +225,24 @@ func newExemplarLabelLengthError(seriesLabels []cortexpb.LabelAdapter, exemplarL } } +// histogramBucketLimitExceededError is a ValidationError implementation for samples with native histogram +// exceeding max bucket limit and cannot reduce resolution further to be within the max bucket limit. +type histogramBucketLimitExceededError struct { + series []cortexpb.LabelAdapter + limit int +} + +func newHistogramBucketLimitExceededError(series []cortexpb.LabelAdapter, limit int) ValidationError { + return &histogramBucketLimitExceededError{ + series: series, + limit: limit, + } +} + +func (e *histogramBucketLimitExceededError) Error() string { + return fmt.Sprintf("native histogram bucket count exceeded for metric (limit: %d) metric: %.200q", e.limit, formatLabelSet(e.series)) +} + // formatLabelSet formats label adapters as a metric name with labels, while preserving // label order, and keeping duplicates. If there are multiple "__name__" labels, only // first one is used as metric name, other ones will be included as regular labels. diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 4586a24622..fc9faab078 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -134,7 +134,7 @@ type Limits struct { EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` - MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"` + MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` // Ingester enforced limits. // Series @@ -151,6 +151,8 @@ type Limits struct { MaxGlobalMetadataPerMetric int `yaml:"max_global_metadata_per_metric" json:"max_global_metadata_per_metric"` // Out-of-order OutOfOrderTimeWindow model.Duration `yaml:"out_of_order_time_window" json:"out_of_order_time_window"` + // Exemplars + MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` @@ -232,6 +234,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") + f.IntVar(&l.MaxNativeHistogramBuckets, "validation.max-native-histogram-buckets", 0, "Limit on total number of positive and negative buckets allowed in a single native histogram. The resolution of a histogram with more buckets will be reduced until the number of buckets is within the limit. If the limit cannot be reached, the sample will be discarded. 0 means no limit. Enforced at Distributor.") f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.") f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.") @@ -722,6 +725,12 @@ func (o *Overrides) EnforceMetadataMetricName(userID string) bool { return o.GetOverridesForUser(userID).EnforceMetadataMetricName } +// MaxNativeHistogramBuckets returns the maximum total number of positive and negative buckets of a single native histogram +// a user is allowed to store. +func (o *Overrides) MaxNativeHistogramBuckets(userID string) int { + return o.GetOverridesForUser(userID).MaxNativeHistogramBuckets +} + // MaxLocalMetricsWithMetadataPerUser returns the maximum number of metrics with metadata a user is allowed to store in a single ingester. func (o *Overrides) MaxLocalMetricsWithMetadataPerUser(userID string) int { return o.GetOverridesForUser(userID).MaxLocalMetricsWithMetadataPerUser diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index f03b8b74b1..46554e2d5b 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -52,6 +52,9 @@ const ( exemplarLabelsTooLong = "exemplar_labels_too_long" exemplarTimestampInvalid = "exemplar_timestamp_invalid" + // Native Histogram specific validation reasons + nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded" + // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. RateLimited = "rate_limited" @@ -262,6 +265,59 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri return nil } +func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogram cortexpb.Histogram) (cortexpb.Histogram, error) { + if limits.MaxNativeHistogramBuckets == 0 { + return histogram, nil + } + + var ( + exceedLimit bool + ) + if histogram.IsFloatHistogram() { + // Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting. + exceedLimit = len(histogram.PositiveCounts)+len(histogram.NegativeCounts) > limits.MaxNativeHistogramBuckets + if !exceedLimit { + return histogram, nil + } + // Exceed limit. + if histogram.Schema <= cortexpb.ExponentialSchemaMin { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() + return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) + } + fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogram) + for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets { + if fh.Schema <= cortexpb.ExponentialSchemaMin { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() + return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) + } + fh = fh.ReduceResolution(fh.Schema - 1) + } + // If resolution reduced, convert new float histogram to protobuf type again. + return cortexpb.FloatHistogramToHistogramProto(histogram.TimestampMs, fh), nil + } + + // Initial check to see if bucket limit is exceeded or not. If not, we can avoid type casting. + exceedLimit = len(histogram.PositiveDeltas)+len(histogram.NegativeDeltas) > limits.MaxNativeHistogramBuckets + if !exceedLimit { + return histogram, nil + } + // Exceed limit. + if histogram.Schema <= cortexpb.ExponentialSchemaMin { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() + return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) + } + h := cortexpb.HistogramProtoToHistogram(histogram) + for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets { + if h.Schema <= cortexpb.ExponentialSchemaMin { + validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc() + return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets) + } + h = h.ReduceResolution(h.Schema - 1) + } + // If resolution reduced, convert new histogram to protobuf type again. + return cortexpb.HistogramToHistogramProto(histogram.TimestampMs, h), nil +} + func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) { filter := map[string]string{"user": userID} diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 2ecec1ee72..93051a63b8 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -8,6 +8,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -291,3 +293,110 @@ func TestValidateLabelDuplication(t *testing.T) { }, "a") assert.Equal(t, expected, actual) } + +func TestValidateNativeHistogram(t *testing.T) { + userID := "fake" + lbls := cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")) + + // Test histogram has 4 positive buckets and 4 negative buckets so 8 in total. Schema set to 1. + h := tsdbutil.GenerateTestHistogram(0) + fh := tsdbutil.GenerateTestFloatHistogram(0) + + histogramWithSchemaMin := tsdbutil.GenerateTestHistogram(0) + histogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin + floatHistogramWithSchemaMin := tsdbutil.GenerateTestFloatHistogram(0) + floatHistogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin + for _, tc := range []struct { + name string + bucketLimit int + histogram cortexpb.Histogram + expectedHistogram cortexpb.Histogram + expectedErr error + }{ + { + name: "no limit, histogram", + histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + }, + { + name: "no limit, float histogram", + histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + }, + { + name: "within limit, histogram", + bucketLimit: 8, + histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + }, + { + name: "within limit, float histogram", + bucketLimit: 8, + histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + }, + { + name: "exceed limit and reduce resolution for 1 level, histogram", + bucketLimit: 6, + histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(0)), + }, + { + name: "exceed limit and reduce resolution for 1 level, float histogram", + bucketLimit: 6, + histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(0)), + }, + { + name: "exceed limit and reduce resolution for 2 levels, histogram", + bucketLimit: 4, + histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(-1)), + }, + { + name: "exceed limit and reduce resolution for 2 levels, float histogram", + bucketLimit: 4, + histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(-1)), + }, + { + name: "exceed limit but cannot reduce resolution further, histogram", + bucketLimit: 1, + histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()), + expectedErr: newHistogramBucketLimitExceededError(lbls, 1), + }, + { + name: "exceed limit but cannot reduce resolution further, float histogram", + bucketLimit: 1, + histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()), + expectedErr: newHistogramBucketLimitExceededError(lbls, 1), + }, + { + name: "exceed limit but cannot reduce resolution further with min schema, histogram", + bucketLimit: 4, + histogram: cortexpb.HistogramToHistogramProto(0, histogramWithSchemaMin.Copy()), + expectedErr: newHistogramBucketLimitExceededError(lbls, 4), + }, + { + name: "exceed limit but cannot reduce resolution further with min schema, float histogram", + bucketLimit: 4, + histogram: cortexpb.FloatHistogramToHistogramProto(0, floatHistogramWithSchemaMin.Copy()), + expectedErr: newHistogramBucketLimitExceededError(lbls, 4), + }, + } { + t.Run(tc.name, func(t *testing.T) { + reg := prometheus.NewRegistry() + validateMetrics := NewValidateMetrics(reg) + limits := new(Limits) + limits.MaxNativeHistogramBuckets = tc.bucketLimit + actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, actualErr) + require.Equal(t, float64(1), testutil.ToFloat64(validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID))) + } else { + require.NoError(t, actualErr) + require.Equal(t, tc.expectedHistogram, actualHistogram) + } + }) + } +}