Skip to content

Commit

Permalink
Add cortex_ruler_rule_groups_in_store metric
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Apr 19, 2024
1 parent fe105a9 commit 840cd27
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837
* [ENHANCEMENT] Ring: Add experimental `-ingester.tokens-generator-strategy=minimize-spread` flag to enable the new minimize spread token generator strategy. #5855
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store`. #5869
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand Down
3 changes: 3 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ func TestRulerSharding(t *testing.T) {
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
// Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values
require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
Expand Down
41 changes: 41 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ruler

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -271,3 +273,42 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}

type RuleGroupMetrics struct {
mtx sync.Mutex
RuleGroupsInStore *prometheus.GaugeVec
tenants map[string]struct{}
allowedTenants *util.AllowedTenants
}

func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics {
m := &RuleGroupMetrics{
RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ruler_rule_groups_in_store",
Help: "The number of rule groups a tenant has in store.",
}, []string{"user"}),
allowedTenants: allowedTenants,
}
return m
}

// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule
// groups per tenant and removing the metrics for tenants that are not present anymore
func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) {
r.mtx.Lock()
defer r.mtx.Unlock()
tenants := make(map[string]struct{}, len(ruleGroupsCount))
for userID, count := range ruleGroupsCount {
if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups
continue
}
tenants[userID] = struct{}{}
r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count))
}
for userID := range r.tenants {
if _, ok := tenants[userID]; !ok {
r.RuleGroupsInStore.DeleteLabelValues(userID)
}
}
r.tenants = tenants
}
38 changes: 38 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,41 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}

func TestRuleGroupMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"}))
m.UpdateRuleGroupsInStore(map[string]int{
"fake1": 10,
"fake2": 20,
"fake3": 30,
})
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake1",
}, float64(10))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{
"user": "fake2",
}, float64(20))
m.UpdateRuleGroupsInStore(map[string]int{
"fake2": 30,
})
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake2",
}, float64(30))
m.UpdateRuleGroupsInStore(make(map[string]int))
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"])
}
15 changes: 15 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ type Ruler struct {
ruleGroupStoreLoadDuration prometheus.Gauge
ruleGroupSyncDuration prometheus.Gauge
rulerGetRulesFailures *prometheus.CounterVec
ruleGroupMetrics *RuleGroupMetrics

allowedTenants *util.AllowedTenants

Expand Down Expand Up @@ -328,6 +329,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
Help: "The total number of failed rules request sent to rulers in getShardedRules.",
}, []string{"ruler"}),
}
ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants)

if len(cfg.EnabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
Expand Down Expand Up @@ -652,7 +654,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
if err != nil {
return nil, nil, err
}
ruleGroupCounts := make(map[string]int, len(allRuleGroups))
for userID, groups := range allRuleGroups {
ruleGroupCounts[userID] = len(groups)
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
Expand All @@ -667,6 +671,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
}
allRuleGroups[userID] = filteredGroupsForUser
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return allRuleGroups, nil, nil
}

Expand All @@ -676,9 +681,11 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
return nil, nil, err
}

ruleGroupCounts := make(map[string]int, len(configs))
ownedConfigs := make(map[string]rulespb.RuleGroupList)
backedUpConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
ruleGroupCounts[userID] = len(groups)
owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(owned) > 0 {
ownedConfigs[userID] = owned
Expand All @@ -690,6 +697,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
}
}
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return ownedConfigs, backedUpConfigs, nil
}

Expand Down Expand Up @@ -717,6 +725,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

if len(userRings) == 0 {
r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int))
return nil, nil, nil
}

Expand All @@ -729,6 +738,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
mu := sync.Mutex{}
owned := map[string]rulespb.RuleGroupList{}
backedUp := map[string]rulespb.RuleGroupList{}
gLock := sync.Mutex{}
ruleGroupCounts := make(map[string]int, len(userRings))

concurrency := loadRulesConcurrency
if len(userRings) < concurrency {
Expand All @@ -743,6 +754,9 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
if err != nil {
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
gLock.Lock()
ruleGroupCounts[userID] = len(groups)
gLock.Unlock()

filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
var filterBackup []*rulespb.RuleGroupDesc
Expand All @@ -766,6 +780,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

err = g.Wait()
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return owned, backedUp, err
}

Expand Down

0 comments on commit 840cd27

Please sign in to comment.