Skip to content

Commit

Permalink
Add default partition when no matching labelset liimits (#6435)
Browse files Browse the repository at this point in the history
* implement default partition for limits per labelset but without any labels

Signed-off-by: Ben Ye <[email protected]>

* unlock properly

Signed-off-by: Ben Ye <[email protected]>

* handle new partition added and address comments

Signed-off-by: Ben Ye <[email protected]>

* changelog

Signed-off-by: Ben Ye <[email protected]>

* update doc

Signed-off-by: Ben Ye <[email protected]>

* update doc

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Dec 18, 2024
1 parent e359f50 commit 27d68c2
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 63 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
* [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409

## 1.18.1 2024-10-14
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5770,7 +5770,9 @@ limits:
# would not enforce any limits.
[max_series: <int> | default = ]
# LabelSet which the limit should be applied.
# LabelSet which the limit should be applied. If no labels are provided, it
# becomes the default partition which matches any series that doesn't match any
# other explicitly defined label sets.'
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
```

Expand Down
116 changes: 112 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed
// Add default partition -> no label set configured working as a fallback when a series
// doesn't match any existing label set limit.
emptyLabels := labels.EmptyLabels()
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 2,
},
}
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

lbls = []string{labels.MetricName, "test_default"}
for i := 0; i < 2; i++ {
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
}

// Max series limit for default partition is 2 so 1 more series will be throttled.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, emptyLabels.String())

ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Add a new label set limit.
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet,
validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{
"series": "0",
}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 3,
},
},
)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage reduced from 2 to 1 as one series in default partition
// now counted into the new partition.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed, keep default partition limit
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should persist between restarts
Expand All @@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

Expand All @@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) {
MaxSeries: 10e10,
},
},
{
// Default partition.
LabelSet: labels.EmptyLabels(),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10e10,
},
},
}

dir := t.TempDir()
Expand Down Expand Up @@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)

// Go to default partition.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}
Expand All @@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) {
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
require.Equal(t, 2*numberOfSeries, total)
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
Expand Down
28 changes: 6 additions & 22 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
limits := l.limits.LimitsPerLabelSet(userID)
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
for _, limit := range matchedLimits {
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
if u, err := f(limits, limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
Expand Down Expand Up @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim

func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
// We did not find some of the labels on the set
if v := metric.Get(lbl.Name); v != lbl.Value {
continue outer
}
}
r = append(r, lbls)
}
return r
return validation.LimitsPerLabelSetsForSeries(m, metric)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

Expand Down
Loading

0 comments on commit 27d68c2

Please sign in to comment.