Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default partition when no matching labelset liimits #6435

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,77 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed
// Add default partition -> no label set configured working as a fallback when a series
// doesn't match any existing label set limit.
emptyLabels := labels.EmptyLabels()
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 2,
},
}
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

lbls = []string{labels.MetricName, "test_default"}
for i := 0; i < 2; i++ {
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
}

// Max series limit for default partition is 2 so 1 more series will be throttled.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, emptyLabels.String())

ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed, keep default partition limit
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should persist between restarts
Expand All @@ -396,10 +451,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

Expand All @@ -420,6 +477,13 @@ func TestPushRace(t *testing.T) {
MaxSeries: 10e10,
},
},
{
// Default partition.
LabelSet: labels.EmptyLabels(),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10e10,
},
},
}

dir := t.TempDir()
Expand Down Expand Up @@ -451,6 +515,10 @@ func TestPushRace(t *testing.T) {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)

// Go to default partition.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}
Expand All @@ -472,13 +540,13 @@ func TestPushRace(t *testing.T) {
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
require.Equal(t, 2*numberOfSeries, total)
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
Expand Down
28 changes: 6 additions & 22 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
limits := l.limits.LimitsPerLabelSet(userID)
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
for _, limit := range matchedLimits {
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
if u, err := f(limits, limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
Expand Down Expand Up @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim

func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
// We did not find some of the labels on the set
if v := metric.Get(lbl.Name); v != lbl.Value {
continue outer
}
}
r = append(r, lbls)
}
return r
return validation.LimitsPerLabelSetsForSeries(m, metric)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

Expand Down
138 changes: 105 additions & 33 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/segmentio/fasthash/fnv1a"

Expand Down Expand Up @@ -114,59 +115,113 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
}

func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards]
s.RLock()
if r, ok := s.valuesCounter[set.Hash]; ok {
if r, ok := s.valuesCounter[limit.Hash]; ok {
defer s.RUnlock()
return r.count, nil
}
s.RUnlock()

// We still dont keep track of this label value so we need to backfill
return m.backFillLimit(ctx, u, set, s)
return m.backFillLimit(ctx, u, false, allLimits, limit, s)
})
}

func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
s.Lock()
// If not force backfill, use existing counter value.
if !forceBackfill {
if r, ok := s.valuesCounter[limit.Hash]; ok {
s.Unlock()
return r.count, nil
}
}

ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}

defer ir.Close()

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[limit.Hash]; !ok {
postings := make([]index.Postings, 0, len(limit.LabelSet))
for _, lbl := range limit.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
}
totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, ir, allLimits, limit)
if err != nil {
return 0, err
}

p := index.Intersect(postings...)
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
}
s.Unlock()
return totalCount, nil
}

totalCount := 0
for p.Next() {
totalCount++
func getCardinalityForLimitsPerLabelSet(ctx context.Context, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
// Easy path with explicit labels.
if limit.LabelSet.Len() > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document in the label set config that a empty labelset is the "default" and will match all other series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the latest commit

p, err := getPostingForLabels(ctx, ir, limit.LabelSet)
if err != nil {
return 0, err
}
return getPostingCardinality(p)
}

if p.Err() != nil {
return 0, p.Err()
// Default partition needs to get cardinality of all series that doesn't belong to any existing partitions.
postings := make([]index.Postings, 0, len(allLimits)-1)
for _, l := range allLimits {
if l.Hash == limit.Hash {
continue
}
p, err := getPostingForLabels(ctx, ir, l.LabelSet)
if err != nil {
return 0, err
}
postings = append(postings, p)
}
mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...))
if err != nil {
return 0, err
}

name, value := index.AllPostingsKey()
// Don't expand all postings but get length directly instead.
allPostings, err := ir.Postings(ctx, name, value)
if err != nil {
return 0, err
}
allCardinality, err := getPostingCardinality(allPostings)
if err != nil {
return 0, err
}
return allCardinality - mergedCardinality, nil
}
yeya24 marked this conversation as resolved.
Show resolved Hide resolved

s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) {
postings := make([]index.Postings, 0, len(lbls))
for _, lbl := range lbls {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return nil, err
}
return totalCount, nil
} else {
return r.count, nil
postings = append(postings, p)
}

return index.Intersect(postings...), nil
}

func getPostingCardinality(p index.Postings) (int, error) {
totalCount := 0
for p.Next() {
totalCount++
}

if p.Err() != nil {
return 0, p.Err()
}
return totalCount, nil
}

func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
Expand Down Expand Up @@ -200,10 +255,13 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe

func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error {
currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{}
for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) {
limits := m.limiter.limits.LimitsPerLabelSet(u.userID)
for _, l := range limits {
currentLbsLimitHash[l.Hash] = l
}

nonDefaultPartitionRemoved := false
var defaultPartitionHash uint64
for i := 0; i < numMetricCounterShards; i++ {
s := m.shards[i]
s.RLock()
Expand All @@ -215,17 +273,31 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics
metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
continue
}
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
delete(currentLbsLimitHash, h)
// Delay deletion of default partition from current label limits as if
// another label set is removed then we need to backfill default partition again.
if entry.labels.Len() > 0 {
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
delete(currentLbsLimitHash, h)
nonDefaultPartitionRemoved = true
} else {
defaultPartitionHash = h
}
}
s.RUnlock()
}

// No partitions with label sets configured got removed, no need to backfill default partition.
if !nonDefaultPartitionRemoved {
delete(currentLbsLimitHash, defaultPartitionHash)
}

// Backfill all limits that are not being tracked yet
for _, l := range currentLbsLimitHash {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
count, err := m.backFillLimit(ctx, u, l, s)
// Force backfill is enabled to make sure we update the counter for the default partition
// when other limits got removed.
count, err := m.backFillLimit(ctx, u, true, limits, l, s)
if err != nil {
return err
}
Expand Down
Loading
Loading