Skip to content

Commit

Permalink
Store rulepb.RuleGroupList in rules backup instead of promRules.Group
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Apr 4, 2024
1 parent 8cd7743 commit 1507047
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 207 deletions.
2 changes: 1 addition & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
return groups
}

func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group {
func (r *DefaultMultiTenantManager) GetBackupRules(userID string) rulespb.RuleGroupList {
if r.rulesBackupManager != nil {
return r.rulesBackupManager.getRuleGroups(userID)
}
Expand Down
24 changes: 8 additions & 16 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ func TestBackupRules(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir, APIEnableRulesBackup: true}, factory, evalMetrics, reg, log.NewNopLogger())
waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir, APIEnableRulesBackup: true}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user1 = "testUser"
Expand Down Expand Up @@ -285,21 +290,8 @@ func TestBackupRules(t *testing.T) {
},
}
m.BackUpRuleGroups(context.TODO(), userRules)
managerOptions := &promRules.ManagerOptions{}
g1 := promRules.NewGroup(promRules.GroupOptions{
Name: userRules[user1][0].Name,
File: userRules[user1][0].Namespace,
Interval: userRules[user1][0].Interval,
Opts: managerOptions,
})
g2 := promRules.NewGroup(promRules.GroupOptions{
Name: userRules[user2][0].Name,
File: userRules[user2][0].Namespace,
Interval: userRules[user2][0].Interval,
Opts: managerOptions,
})
requireGroupsEqual(t, m.GetBackupRules(user1), []*promRules.Group{g1})
requireGroupsEqual(t, m.GetBackupRules(user2), []*promRules.Group{g2})
require.Equal(t, userRules[user1], m.GetBackupRules(user1))
require.Equal(t, userRules[user2], m.GetBackupRules(user2))
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
Expand Down
113 changes: 23 additions & 90 deletions pkg/ruler/rule_backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,31 @@ package ruler

import (
"context"
"errors"
"net/url"
"path/filepath"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
promRules "github.com/prometheus/prometheus/rules"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
)

// Implements GroupLoader interface but instead of reading from a file when Load is called, it returns the
// rulefmt.RuleGroup it has stored
type loader struct {
ruleGroups map[string][]rulefmt.RuleGroup
}

func (r *loader) Load(identifier string) (*rulefmt.RuleGroups, []error) {
return &rulefmt.RuleGroups{
Groups: r.ruleGroups[identifier],
}, nil
}

func (r *loader) Parse(query string) (parser.Expr, error) {
return parser.ParseExpr(query)
}

// rulesBackupManager is an in-memory store that holds []promRules.Group of multiple users. It only stores the Groups,
// it doesn't evaluate them.
// rulesBackupManager is an in-memory store that holds rulespb.RuleGroupList of multiple users. It only stores the
// data, it DOESN'T evaluate.
type rulesBackupManager struct {
inMemoryRuleGroupsBackup map[string][]*promRules.Group
inMemoryRuleGroupsBackup map[string]rulespb.RuleGroupList
cfg Config

logger log.Logger

backupRuleGroup *prometheus.GaugeVec
lastBackupReloadSuccessful *prometheus.GaugeVec
backupRuleGroup *prometheus.GaugeVec
}

func newRulesBackupManager(cfg Config, logger log.Logger, reg prometheus.Registerer) *rulesBackupManager {
return &rulesBackupManager{
inMemoryRuleGroupsBackup: make(map[string][]*promRules.Group),
inMemoryRuleGroupsBackup: make(map[string]rulespb.RuleGroupList),
cfg: cfg,
logger: logger,

Expand All @@ -56,85 +35,39 @@ func newRulesBackupManager(cfg Config, logger log.Logger, reg prometheus.Registe
Name: "ruler_backup_rule_group",
Help: "Boolean set to 1 indicating the ruler stores the rule group as backup.",
}, []string{"user", "rule_group"}),
lastBackupReloadSuccessful: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ruler_backup_last_reload_successful",
Help: "Boolean set to 1 whenever the last configuration reload attempt was successful.",
}, []string{"user"}),
}
}

// setRuleGroups updates the map[string][]*promRules.Group that the rulesBackupManager stores in memory.
// setRuleGroups updates the map[string]rulespb.RuleGroupList that the rulesBackupManager stores in memory.
func (r *rulesBackupManager) setRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
backupRuleGroups := make(map[string][]*promRules.Group)
for user, groups := range ruleGroups {
promGroups, err := r.ruleGroupListToPromGroups(user, groups)
if err != nil {
r.lastBackupReloadSuccessful.WithLabelValues(user).Set(0)
level.Error(r.logger).Log("msg", "unable to back up rules", "user", user, "err", err)
continue
}
backupRuleGroups[user] = promGroups
r.lastBackupReloadSuccessful.WithLabelValues(user).Set(1)
}
r.updateMetrics(backupRuleGroups)
r.inMemoryRuleGroupsBackup = backupRuleGroups
}

// ruleGroupListToPromGroups converts rulespb.RuleGroupList to []*promRules.Group by creating a single use
// promRules.Manager and calling its LoadGroups method.
func (r *rulesBackupManager) ruleGroupListToPromGroups(user string, ruleGroups rulespb.RuleGroupList) ([]*promRules.Group, error) {
rgs := ruleGroups.Formatted()

loader := &loader{
ruleGroups: rgs,
}
promManager := promRules.NewManager(&promRules.ManagerOptions{
ExternalURL: r.cfg.ExternalURL.URL,
GroupLoader: loader,
})

namespaces := make([]string, 0, len(rgs))
for k := range rgs {
namespaces = append(namespaces, k)
}
loadedGroups, errs := promManager.LoadGroups(r.cfg.EvaluationInterval, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), nil, namespaces...)
if errs != nil {
for _, e := range errs {
level.Error(r.logger).Log("msg", "loading groups to backup failed", "user", user, "namespaces", namespaces, "err", e)
}
return nil, errors.New("error loading rules to backup")
}

groups := make([]*promRules.Group, 0, len(loadedGroups))
for _, g := range loadedGroups {
groups = append(groups, g)
}
return groups, nil
r.updateMetrics(ruleGroups)
r.inMemoryRuleGroupsBackup = ruleGroups
}

// getRuleGroups returns the []*promRules.Group that rulesBackupManager stores for a given user
func (r *rulesBackupManager) getRuleGroups(userID string) []*promRules.Group {
var result []*promRules.Group
// getRuleGroups returns the rulespb.RuleGroupList that rulesBackupManager stores for a given user
func (r *rulesBackupManager) getRuleGroups(userID string) rulespb.RuleGroupList {
var result rulespb.RuleGroupList
if groups, exists := r.inMemoryRuleGroupsBackup[userID]; exists {
result = groups
}
return result
}

func (r *rulesBackupManager) updateMetrics(newBackupGroups map[string][]*promRules.Group) {
// getRuleGroups updates the ruler_backup_rule_group metric by adding new groups that were backed up and removing
// those that are removed from the backup.
func (r *rulesBackupManager) updateMetrics(newBackupGroups map[string]rulespb.RuleGroupList) {
for user, groups := range newBackupGroups {
keptGroups := make(map[string][]interface{})
keptGroups := make(map[string]struct{})
for _, g := range groups {
fullFileName := r.getFilePathForGroup(g, user)
key := promRules.GroupKey(fullFileName, g.Name())
key := promRules.GroupKey(fullFileName, g.GetName())
r.backupRuleGroup.WithLabelValues(user, key).Set(1)
keptGroups[key] = nil
keptGroups[key] = struct{}{}
}
oldGroups := r.inMemoryRuleGroupsBackup[user]
for _, g := range oldGroups {
fullFileName := r.getFilePathForGroup(g, user)
key := promRules.GroupKey(fullFileName, g.Name())
key := promRules.GroupKey(fullFileName, g.GetName())
if _, exists := keptGroups[key]; !exists {
r.backupRuleGroup.DeleteLabelValues(user, key)
}
Expand All @@ -147,17 +80,17 @@ func (r *rulesBackupManager) updateMetrics(newBackupGroups map[string][]*promRul
}
for _, g := range groups {
fullFileName := r.getFilePathForGroup(g, user)
key := promRules.GroupKey(fullFileName, g.Name())
key := promRules.GroupKey(fullFileName, g.GetName())
r.backupRuleGroup.DeleteLabelValues(user, key)
}
r.lastBackupReloadSuccessful.DeleteLabelValues(user)
}
}

// getFilePathForGroup returns the supposed file path of the group if it was being evaluated.
// This is based on how mapper.go generates file paths.
func (r *rulesBackupManager) getFilePathForGroup(g *promRules.Group, user string) string {
// This is based on how mapper.go generates file paths. This can be used to generate value similar to the one returned
// by prometheus Group.File() method.
func (r *rulesBackupManager) getFilePathForGroup(g *rulespb.RuleGroupDesc, user string) string {
dirPath := filepath.Join(r.cfg.RulePath, user)
encodedFileName := url.PathEscape(g.File())
encodedFileName := url.PathEscape(g.GetNamespace())
return filepath.Join(dirPath, encodedFileName)
}
101 changes: 7 additions & 94 deletions pkg/ruler/rule_backup_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import (
"context"
"net/url"
"path/filepath"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
promRules "github.com/prometheus/prometheus/rules"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -25,7 +21,6 @@ func TestBackUpRuleGroups(t *testing.T) {
Expr: "1 > bool 1",
Record: "test",
}
parsedExpr, _ := parser.ParseExpr(r.Expr)
g1 := rulespb.RuleGroupDesc{
Name: "g1",
Namespace: "ns1",
Expand All @@ -42,90 +37,36 @@ func TestBackUpRuleGroups(t *testing.T) {
Rules: []*rulespb.RuleDesc{&r},
}

rInvalid := rulespb.RuleDesc{
Expr: "1 > 1", // invalid expression
}
gInvalid := rulespb.RuleGroupDesc{
Name: "g1",
Namespace: "ns1",
Rules: []*rulespb.RuleDesc{&rInvalid},
}
cfg := defaultRulerConfig(t)
managerOptions := &promRules.ManagerOptions{}
manager := newRulesBackupManager(cfg, log.NewNopLogger(), nil)
g1Option := promRules.GroupOptions{
Name: g1.Name,
File: g1.Namespace,
Interval: cfg.EvaluationInterval,
Rules: []promRules.Rule{
promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}),
},
}
g2Option := promRules.GroupOptions{
Name: g2.Name,
File: g2.Namespace,
Interval: cfg.EvaluationInterval,
Rules: []promRules.Rule{
promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}),
},
}
g3Option := promRules.GroupOptions{
Name: g3.Name,
File: g3.Namespace,
Interval: cfg.EvaluationInterval,
Rules: []promRules.Rule{
promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}),
},
}

type testCase struct {
input map[string]rulespb.RuleGroupList
expectedOutput map[string][]*promRules.GroupOptions
input map[string]rulespb.RuleGroupList
}

testCases := map[string]testCase{
"Empty input": {
input: make(map[string]rulespb.RuleGroupList),
expectedOutput: make(map[string][]*promRules.GroupOptions),
},
"With invalid rules": {
input: map[string]rulespb.RuleGroupList{
"user1": {&gInvalid},
},
expectedOutput: make(map[string][]*promRules.GroupOptions),
input: make(map[string]rulespb.RuleGroupList),
},
"With partial invalid rules": {
"With groups from single users": {
input: map[string]rulespb.RuleGroupList{
"user1": {&gInvalid, &g3},
"user2": {&g1, &g2},
},
expectedOutput: map[string][]*promRules.GroupOptions{
"user2": {&g1Option, &g2Option},
},
},
"With groups from multiple users": {
input: map[string]rulespb.RuleGroupList{
"user1": {&g1, &g2, &g3},
"user1": {&g1, &g3},
"user2": {&g1, &g2},
},
expectedOutput: map[string][]*promRules.GroupOptions{
"user1": {&g1Option, &g2Option, &g3Option},
"user2": {&g1Option, &g2Option},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
manager.setRuleGroups(context.TODO(), tc.input)
require.Equal(t, len(tc.expectedOutput), len(manager.inMemoryRuleGroupsBackup))
for user, expectedGroupOptions := range tc.expectedOutput {
require.Equal(t, len(tc.input), len(manager.inMemoryRuleGroupsBackup))
for user, groups := range tc.input {
loadedGroups := manager.getRuleGroups(user)
expectedGroups := make([]*promRules.Group, 0, len(expectedGroupOptions))
for _, o := range expectedGroupOptions {
o.Opts = managerOptions
expectedGroups = append(expectedGroups, promRules.NewGroup(*o))
}
requireGroupsEqual(t, expectedGroups, loadedGroups)
require.Equal(t, groups, loadedGroups)
}
})
}
Expand Down Expand Up @@ -171,13 +112,6 @@ func TestBackUpRuleGroupsMetrics(t *testing.T) {
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 2, len(mfm["cortex_ruler_backup_last_reload_successful"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_backup_last_reload_successful"].Metric[0], map[string]string{
"user": "user1",
}, float64(1))
requireMetricEqual(t, mfm["cortex_ruler_backup_last_reload_successful"].Metric[1], map[string]string{
"user": "user2",
}, float64(1))
require.Equal(t, 3, len(mfm["cortex_ruler_backup_rule_group"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[0], map[string]string{
"user": "user1",
Expand All @@ -199,34 +133,13 @@ func TestBackUpRuleGroupsMetrics(t *testing.T) {
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 1, len(mfm["cortex_ruler_backup_last_reload_successful"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_backup_last_reload_successful"].Metric[0], map[string]string{
"user": "user1",
}, float64(1))
require.Equal(t, 1, len(mfm["cortex_ruler_backup_rule_group"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[0], map[string]string{
"user": "user1",
"rule_group": getGroupName(&g2, "user1"),
}, float64(1))
}

func requireGroupsEqual(t *testing.T, a []*promRules.Group, b []*promRules.Group) {
require.Equal(t, len(a), len(b))
sortFunc := func(g1, g2 *promRules.Group) int {
fileCompare := strings.Compare(g1.File(), g2.File())
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(g1.Name(), g2.Name())
}
slices.SortFunc(a, sortFunc)
slices.SortFunc(b, sortFunc)
for i, gA := range a {
gB := b[i]
require.True(t, gA.Equals(gB), "group1", gA.Name(), "group2", gB.Name())
}
}

func requireMetricEqual(t *testing.T, m *io_prometheus_client.Metric, labels map[string]string, value float64) {
l := m.GetLabel()
require.Equal(t, len(labels), len(l))
Expand Down
Loading

0 comments on commit 1507047

Please sign in to comment.