Skip to content

Commit

Permalink
Merge pull request #6104 from yeya24/max-bucket-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlieTLe authored Jul 23, 2024
2 parents 0edfd7d + a88d237 commit b167dd4
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
18 changes: 13 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3159,11 +3159,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# e.g. remote_write.write_relabel_configs.
[metric_relabel_configs: <relabel_config...> | default = []]
# Enables support for exemplars in TSDB and sets the maximum number that will be
# stored. less than zero means disabled. If the value is set to zero, cortex
# will fallback to blocks-storage.tsdb.max-exemplars value.
# CLI flag: -ingester.max-exemplars
[max_exemplars: <int> | default = 0]
# Limit on total number of positive and negative buckets allowed in a single
# native histogram. The resolution of a histogram with more buckets will be
# reduced until the number of buckets is within the limit. If the limit cannot
# be reached, the sample will be discarded. 0 means no limit. Enforced at
# Distributor.
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]
# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
Expand Down Expand Up @@ -3213,6 +3215,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.out-of-order-time-window
[out_of_order_time_window: <duration> | default = 0s]
# Enables support for exemplars in TSDB and sets the maximum number that will be
# stored. less than zero means disabled. If the value is set to zero, cortex
# will fallback to blocks-storage.tsdb.max-exemplars value.
# CLI flag: -ingester.max-exemplars
[max_exemplars: <int> | default = 0]
# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. 0 to disable.
Expand Down
5 changes: 5 additions & 0 deletions pkg/cortexpb/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ package cortexpb

import "github.com/prometheus/prometheus/model/histogram"

const (
ExponentialSchemaMax int32 = 8
ExponentialSchemaMin int32 = -4
)

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
Expand Down
10 changes: 7 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,16 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
for _, h := range ts.Histograms {
// TODO(yeya24): add other validations for native histogram.
// For example, Prometheus scrape has bucket limit and schema check.
for i, h := range ts.Histograms {
if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, h.TimestampMs); err != nil {
return emptyPreallocSeries, err
}
// TODO(yeya24): add max schema validation for native histogram if needed.
convertedHistogram, err := validation.ValidateNativeHistogram(d.validateMetrics, limits, userID, ts.Labels, h)
if err != nil {
return emptyPreallocSeries, err
}
ts.Histograms[i] = convertedHistogram
}
histograms = append(histograms, ts.Histograms...)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,24 @@ func newExemplarLabelLengthError(seriesLabels []cortexpb.LabelAdapter, exemplarL
}
}

// histogramBucketLimitExceededError is a ValidationError implementation for samples with native histogram
// exceeding max bucket limit and cannot reduce resolution further to be within the max bucket limit.
type histogramBucketLimitExceededError struct {
series []cortexpb.LabelAdapter
limit int
}

func newHistogramBucketLimitExceededError(series []cortexpb.LabelAdapter, limit int) ValidationError {
return &histogramBucketLimitExceededError{
series: series,
limit: limit,
}
}

func (e *histogramBucketLimitExceededError) Error() string {
return fmt.Sprintf("native histogram bucket count exceeded for metric (limit: %d) metric: %.200q", e.limit, formatLabelSet(e.series))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type Limits struct {
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."`
MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"`
MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"`

// Ingester enforced limits.
// Series
Expand All @@ -151,6 +151,8 @@ type Limits struct {
MaxGlobalMetadataPerMetric int `yaml:"max_global_metadata_per_metric" json:"max_global_metadata_per_metric"`
// Out-of-order
OutOfOrderTimeWindow model.Duration `yaml:"out_of_order_time_window" json:"out_of_order_time_window"`
// Exemplars
MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"`

// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"`
Expand Down Expand Up @@ -232,6 +234,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.")
f.IntVar(&l.MaxNativeHistogramBuckets, "validation.max-native-histogram-buckets", 0, "Limit on total number of positive and negative buckets allowed in a single native histogram. The resolution of a histogram with more buckets will be reduced until the number of buckets is within the limit. If the limit cannot be reached, the sample will be discarded. 0 means no limit. Enforced at Distributor.")

f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.")
Expand Down Expand Up @@ -722,6 +725,12 @@ func (o *Overrides) EnforceMetadataMetricName(userID string) bool {
return o.GetOverridesForUser(userID).EnforceMetadataMetricName
}

// MaxNativeHistogramBuckets returns the maximum total number of positive and negative buckets of a single native histogram
// a user is allowed to store.
func (o *Overrides) MaxNativeHistogramBuckets(userID string) int {
return o.GetOverridesForUser(userID).MaxNativeHistogramBuckets
}

// MaxLocalMetricsWithMetadataPerUser returns the maximum number of metrics with metadata a user is allowed to store in a single ingester.
func (o *Overrides) MaxLocalMetricsWithMetadataPerUser(userID string) int {
return o.GetOverridesForUser(userID).MaxLocalMetricsWithMetadataPerUser
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
exemplarLabelsTooLong = "exemplar_labels_too_long"
exemplarTimestampInvalid = "exemplar_timestamp_invalid"

// Native Histogram specific validation reasons
nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded"

// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"
Expand Down Expand Up @@ -262,6 +265,59 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri
return nil
}

func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogram cortexpb.Histogram) (cortexpb.Histogram, error) {
if limits.MaxNativeHistogramBuckets == 0 {
return histogram, nil
}

var (
exceedLimit bool
)
if histogram.IsFloatHistogram() {
// Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting.
exceedLimit = len(histogram.PositiveCounts)+len(histogram.NegativeCounts) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogram, nil
}
// Exceed limit.
if histogram.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogram)
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if fh.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh = fh.ReduceResolution(fh.Schema - 1)
}
// If resolution reduced, convert new float histogram to protobuf type again.
return cortexpb.FloatHistogramToHistogramProto(histogram.TimestampMs, fh), nil
}

// Initial check to see if bucket limit is exceeded or not. If not, we can avoid type casting.
exceedLimit = len(histogram.PositiveDeltas)+len(histogram.NegativeDeltas) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogram, nil
}
// Exceed limit.
if histogram.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h := cortexpb.HistogramProtoToHistogram(histogram)
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if h.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h = h.ReduceResolution(h.Schema - 1)
}
// If resolution reduced, convert new histogram to protobuf type again.
return cortexpb.HistogramToHistogramProto(histogram.TimestampMs, h), nil
}

func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) {
filter := map[string]string{"user": userID}

Expand Down
109 changes: 109 additions & 0 deletions pkg/util/validation/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -291,3 +293,110 @@ func TestValidateLabelDuplication(t *testing.T) {
}, "a")
assert.Equal(t, expected, actual)
}

func TestValidateNativeHistogram(t *testing.T) {
userID := "fake"
lbls := cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar"))

// Test histogram has 4 positive buckets and 4 negative buckets so 8 in total. Schema set to 1.
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)

histogramWithSchemaMin := tsdbutil.GenerateTestHistogram(0)
histogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin
floatHistogramWithSchemaMin := tsdbutil.GenerateTestFloatHistogram(0)
floatHistogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin
for _, tc := range []struct {
name string
bucketLimit int
histogram cortexpb.Histogram
expectedHistogram cortexpb.Histogram
expectedErr error
}{
{
name: "no limit, histogram",
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
},
{
name: "no limit, float histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
},
{
name: "within limit, histogram",
bucketLimit: 8,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
},
{
name: "within limit, float histogram",
bucketLimit: 8,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
},
{
name: "exceed limit and reduce resolution for 1 level, histogram",
bucketLimit: 6,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(0)),
},
{
name: "exceed limit and reduce resolution for 1 level, float histogram",
bucketLimit: 6,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(0)),
},
{
name: "exceed limit and reduce resolution for 2 levels, histogram",
bucketLimit: 4,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(-1)),
},
{
name: "exceed limit and reduce resolution for 2 levels, float histogram",
bucketLimit: 4,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(-1)),
},
{
name: "exceed limit but cannot reduce resolution further, histogram",
bucketLimit: 1,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 1),
},
{
name: "exceed limit but cannot reduce resolution further, float histogram",
bucketLimit: 1,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 1),
},
{
name: "exceed limit but cannot reduce resolution further with min schema, histogram",
bucketLimit: 4,
histogram: cortexpb.HistogramToHistogramProto(0, histogramWithSchemaMin.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 4),
},
{
name: "exceed limit but cannot reduce resolution further with min schema, float histogram",
bucketLimit: 4,
histogram: cortexpb.FloatHistogramToHistogramProto(0, floatHistogramWithSchemaMin.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 4),
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewRegistry()
validateMetrics := NewValidateMetrics(reg)
limits := new(Limits)
limits.MaxNativeHistogramBuckets = tc.bucketLimit
actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, actualErr)
require.Equal(t, float64(1), testutil.ToFloat64(validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID)))
} else {
require.NoError(t, actualErr)
require.Equal(t, tc.expectedHistogram, actualHistogram)
}
})
}
}

0 comments on commit b167dd4

Please sign in to comment.