From 5a1bc69b71b8287267d4a2af4d0bc86bef3c6816 Mon Sep 17 00:00:00 2001 From: Emmanuel Lodovice Date: Tue, 27 Feb 2024 13:41:13 -0800 Subject: [PATCH] WIP Signed-off-by: Emmanuel Lodovice --- pkg/ruler/manager.go | 14 +- pkg/ruler/manager_test.go | 49 +++ pkg/ruler/merger.go | 43 +++ pkg/ruler/merger_test.go | 103 +++++++ pkg/ruler/rule_backup_manager.go | 107 +++++++ pkg/ruler/rule_backup_manager_test.go | 143 +++++++++ pkg/ruler/ruler.go | 205 ++++++++++--- pkg/ruler/ruler_test.go | 414 +++++++++++++++++++++----- 8 files changed, 957 insertions(+), 121 deletions(-) create mode 100644 pkg/ruler/merger.go create mode 100644 pkg/ruler/merger_test.go create mode 100644 pkg/ruler/rule_backup_manager.go create mode 100644 pkg/ruler/rule_backup_manager_test.go diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 50dc6aebe18..82b13980d0a 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct { notifiers map[string]*rulerNotifier notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics + // rules backup + rulesBackupManager *rulesBackupManager + managersTotal prometheus.Gauge lastReloadSuccessful *prometheus.GaugeVec lastReloadSuccessfulTimestamp *prometheus.GaugeVec @@ -85,6 +88,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva mapper: newMapper(cfg.RulePath, logger), userManagers: map[string]RulesManager{}, userManagerMetrics: userManagerMetrics, + rulesBackupManager: newRulesBackupManager(cfg, logger), managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "ruler_managers_total", @@ -142,8 +146,12 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.managersTotal.Set(float64(len(r.userManagers))) } +func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + r.rulesBackupManager.backUpRuleGroups(ctx, ruleGroups) +} + // syncRulesToManager maps the rule files to disk, detects any changes and will create/update the -// the users Prometheus Rules Manager. +// users Prometheus Rules Manager. func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) { // Map the files to disk and return the file names to be passed to the users manager if they // have been updated @@ -279,6 +287,10 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { return groups } +func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group { + return r.rulesBackupManager.getRuleGroups(userID) +} + func (r *DefaultMultiTenantManager) Stop() { r.notifiersMtx.Lock() for _, n := range r.notifiers { diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index c76888fc801..76c9a7a5923 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -138,6 +138,55 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) { require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"") } +func TestBackupRules(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewPedanticRegistry() + evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg) + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger()) + require.NoError(t, err) + + const user1 = "testUser" + const user2 = "testUser2" + + require.Equal(t, 0, len(m.GetBackupRules(user1))) + require.Equal(t, 0, len(m.GetBackupRules(user2))) + + userRules := map[string]rulespb.RuleGroupList{ + user1: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user1, + }, + }, + user2: { + &rulespb.RuleGroupDesc{ + Name: "group2", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user1, + }, + }, + } + m.BackUpRuleGroups(nil, userRules) + managerOptions := &promRules.ManagerOptions{} + g1 := promRules.NewGroup(promRules.GroupOptions{ + Name: userRules[user1][0].Name, + File: userRules[user1][0].Namespace, + Interval: userRules[user1][0].Interval, + Opts: managerOptions, + }) + g2 := promRules.NewGroup(promRules.GroupOptions{ + Name: userRules[user2][0].Name, + File: userRules[user2][0].Namespace, + Interval: userRules[user2][0].Interval, + Opts: managerOptions, + }) + requireGroupsEqual(t, m.GetBackupRules(user1), []*promRules.Group{g1}) + requireGroupsEqual(t, m.GetBackupRules(user2), []*promRules.Group{g2}) +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.Lock() defer m.userManagerMtx.Unlock() diff --git a/pkg/ruler/merger.go b/pkg/ruler/merger.go new file mode 100644 index 00000000000..dfa72a8375f --- /dev/null +++ b/pkg/ruler/merger.go @@ -0,0 +1,43 @@ +package ruler + +import ( + "strings" + "time" + + "golang.org/x/exp/slices" + + promRules "github.com/prometheus/prometheus/rules" +) + +func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc { + states := make(map[string]*GroupStateDesc) + rgTime := make(map[string]time.Time) + for _, state := range in { + latestTs := state.EvaluationTimestamp + for _, r := range state.ActiveRules { + if latestTs.Before(r.EvaluationTimestamp) { + latestTs = r.EvaluationTimestamp + } + } + key := promRules.GroupKey(state.Group.Namespace, state.Group.Name) + ts, ok := rgTime[key] + if !ok || ts.Before(latestTs) { + states[key] = state + rgTime[key] = latestTs + } + } + groups := make([]*GroupStateDesc, 0, len(states)) + for _, state := range states { + groups = append(groups, state) + } + slices.SortFunc(groups, func(a, b *GroupStateDesc) int { + fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace) + + // If the namespace is the same, check the group name + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(a.Group.Name, b.Group.Name) + }) + return groups +} diff --git a/pkg/ruler/merger_test.go b/pkg/ruler/merger_test.go new file mode 100644 index 00000000000..552a89551d6 --- /dev/null +++ b/pkg/ruler/merger_test.go @@ -0,0 +1,103 @@ +package ruler + +import ( + "github.com/cortexproject/cortex/pkg/ruler/rulespb" + "github.com/stretchr/testify/require" + "reflect" + "testing" + "time" +) + +func TestMergeGroupStateDesc(t *testing.T) { + curTime := time.Now() + r := rulespb.RuleDesc{ + Expr: "1 > 1", + } + g1 := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + } + g2 := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + } + rs1 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime, + } + rs1NotRun := RuleStateDesc{ + Rule: &r, + } + rs2 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime, + } + rs2NotRun := RuleStateDesc{ + Rule: &r, + } + rs3 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime.Add(10 * time.Second), + } + + gs1 := GroupStateDesc{ + Group: &g1, + ActiveRules: []*RuleStateDesc{&rs1, &rs2}, + EvaluationTimestamp: curTime, + } + gs1NotRun := GroupStateDesc{ + Group: &g1, + ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun}, + } + gs2 := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1, &rs2}, + EvaluationTimestamp: curTime, + } + gs2NotRun := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun}, + } + gs3 := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1, &rs3}, + EvaluationTimestamp: curTime, + } + + type testCase struct { + input []*GroupStateDesc + expectedOutput []*GroupStateDesc + } + + testCases := map[string]testCase{ + "No duplicate": { + input: []*GroupStateDesc{&gs1, &gs2}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2}, + }, + "No duplicate but not evaluated": { + input: []*GroupStateDesc{&gs1NotRun, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1NotRun, &gs2NotRun}, + }, + "With exact duplicate": { + input: []*GroupStateDesc{&gs1, &gs2NotRun, &gs1, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2NotRun}, + }, + "With duplicates that are not evaluated": { + input: []*GroupStateDesc{&gs1, &gs2, &gs1NotRun, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2}, + }, + "With duplicate with a new newer rule evaluation": { + input: []*GroupStateDesc{&gs3, &gs1, &gs2, &gs1NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs3}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + out := mergeGroupStateDesc(tc.input) + require.Equal(t, len(tc.expectedOutput), len(out)) + require.True(t, reflect.DeepEqual(tc.expectedOutput, out)) + }) + } + +} diff --git a/pkg/ruler/rule_backup_manager.go b/pkg/ruler/rule_backup_manager.go new file mode 100644 index 00000000000..f6db3e1969d --- /dev/null +++ b/pkg/ruler/rule_backup_manager.go @@ -0,0 +1,107 @@ +package ruler + +import ( + "context" + "errors" + "strings" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/rulefmt" + "github.com/prometheus/prometheus/promql/parser" + promRules "github.com/prometheus/prometheus/rules" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" +) + +// Implements GroupLoader interface but instead of reading from a file when Load is called, it returns the +// rulefmt.RuleGroup it has stored +type loader struct { + ruleGroups map[string][]rulefmt.RuleGroup +} + +func (r *loader) Load(identifier string) (*rulefmt.RuleGroups, []error) { + return &rulefmt.RuleGroups{ + Groups: r.ruleGroups[identifier], + }, nil +} + +func (r *loader) Parse(query string) (parser.Expr, error) { + return parser.ParseExpr(query) +} + +type rulesBackupManager struct { + backupRuleGroupsMtx sync.RWMutex + backupRuleGroups map[string][]*promRules.Group + cfg Config + + logger log.Logger +} + +func newRulesBackupManager(cfg Config, logger log.Logger) *rulesBackupManager { + return &rulesBackupManager{ + backupRuleGroups: make(map[string][]*promRules.Group), + cfg: cfg, + logger: logger, + } +} + +func (r *rulesBackupManager) backUpRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + r.backupRuleGroupsMtx.Lock() + defer r.backupRuleGroupsMtx.Unlock() + backupRuleGroups := make(map[string][]*promRules.Group) + for user, groups := range ruleGroups { + g, err := r.ruleGroupListToPromGroups(user, groups) + if err != nil { + // TODO: Increment a metric + level.Error(r.logger).Log("msg", "unable to back up rules", "user", user, "err", err) + continue + } + backupRuleGroups[user] = g + } + r.backupRuleGroups = backupRuleGroups +} + +// ruleGroupListToPromGroups converts rulespb.RuleGroupList to []*promRules.Group by creating a single use +// promRules.Manager and calling its LoadGroups method. +func (r *rulesBackupManager) ruleGroupListToPromGroups(user string, ruleGroups rulespb.RuleGroupList) ([]*promRules.Group, error) { + rgs := ruleGroups.Formatted() + + loader := &loader{ + ruleGroups: rgs, + } + promManager := promRules.NewManager(&promRules.ManagerOptions{ + ExternalURL: r.cfg.ExternalURL.URL, + GroupLoader: loader, + }) + + namespaces := make([]string, 0, len(rgs)) + for k := range rgs { + namespaces = append(namespaces, k) + } + level.Info(r.logger).Log("msg", "backup rules for user", "user", user, "namespaces", strings.Join(namespaces, ",")) + loadedGroups, errs := promManager.LoadGroups(r.cfg.EvaluationInterval, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), nil, namespaces...) + if errs != nil { + for _, e := range errs { + level.Error(r.logger).Log("msg", "loading groups to backup failed", "user", user, "namespaces", namespaces, "err", e) + } + return nil, errors.New("error loading rules to backup") + } + + groups := make([]*promRules.Group, 0, len(loadedGroups)) + for _, g := range loadedGroups { + groups = append(groups, g) + } + return groups, nil +} + +func (r *rulesBackupManager) getRuleGroups(userID string) []*promRules.Group { + var result []*promRules.Group + r.backupRuleGroupsMtx.RLock() + defer r.backupRuleGroupsMtx.RUnlock() + if groups, exists := r.backupRuleGroups[userID]; exists { + result = groups + } + return result +} diff --git a/pkg/ruler/rule_backup_manager_test.go b/pkg/ruler/rule_backup_manager_test.go new file mode 100644 index 00000000000..c65dd524c22 --- /dev/null +++ b/pkg/ruler/rule_backup_manager_test.go @@ -0,0 +1,143 @@ +package ruler + +import ( + "strings" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + promRules "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" + "github.com/cortexproject/cortex/pkg/util/log" +) + +func TestBackUpRuleGroups(t *testing.T) { + r := rulespb.RuleDesc{ + Expr: "1 > bool 1", + Record: "test", + } + parsedExpr, _ := parser.ParseExpr(r.Expr) + g1 := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g2 := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g3 := rulespb.RuleGroupDesc{ + Name: "g3", + Namespace: "ns2", + Rules: []*rulespb.RuleDesc{&r}, + } + + rInvalid := rulespb.RuleDesc{ + Expr: "1 > 1", // invalid expression + } + gInvalid := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&rInvalid}, + } + cfg := defaultRulerConfig(t) + managerOptions := &promRules.ManagerOptions{} + manager := newRulesBackupManager(cfg, log.Logger) + g1Option := promRules.GroupOptions{ + Name: g1.Name, + File: g1.Namespace, + Interval: cfg.EvaluationInterval, + Rules: []promRules.Rule{ + promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}), + }, + } + g2Option := promRules.GroupOptions{ + Name: g2.Name, + File: g2.Namespace, + Interval: cfg.EvaluationInterval, + Rules: []promRules.Rule{ + promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}), + }, + } + g3Option := promRules.GroupOptions{ + Name: g3.Name, + File: g3.Namespace, + Interval: cfg.EvaluationInterval, + Rules: []promRules.Rule{ + promRules.NewRecordingRule(r.Record, parsedExpr, labels.Labels{}), + }, + } + + type testCase struct { + input map[string]rulespb.RuleGroupList + expectedOutput map[string][]*promRules.GroupOptions + } + + testCases := map[string]testCase{ + "Empty input": { + input: make(map[string]rulespb.RuleGroupList), + expectedOutput: make(map[string][]*promRules.GroupOptions), + }, + "With invalid rules": { + input: map[string]rulespb.RuleGroupList{ + "user1": {&gInvalid}, + }, + expectedOutput: make(map[string][]*promRules.GroupOptions), + }, + "With partial invalid rules": { + input: map[string]rulespb.RuleGroupList{ + "user1": {&gInvalid, &g3}, + "user2": {&g1, &g2}, + }, + expectedOutput: map[string][]*promRules.GroupOptions{ + "user2": {&g1Option, &g2Option}, + }, + }, + "With groups from multiple users": { + input: map[string]rulespb.RuleGroupList{ + "user1": {&g1, &g2, &g3}, + "user2": {&g1, &g2}, + }, + expectedOutput: map[string][]*promRules.GroupOptions{ + "user1": {&g1Option, &g2Option, &g3Option}, + "user2": {&g1Option, &g2Option}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + manager.backUpRuleGroups(nil, tc.input) + require.Equal(t, len(tc.expectedOutput), len(manager.backupRuleGroups)) + for user, expectedGroupOptions := range tc.expectedOutput { + loadedGroups := manager.getRuleGroups(user) + expectedGroups := make([]*promRules.Group, 0, len(expectedGroupOptions)) + for _, o := range expectedGroupOptions { + o.Opts = managerOptions + expectedGroups = append(expectedGroups, promRules.NewGroup(*o)) + } + requireGroupsEqual(t, expectedGroups, loadedGroups) + } + }) + } +} + +func requireGroupsEqual(t *testing.T, a []*promRules.Group, b []*promRules.Group) { + require.Equal(t, len(a), len(b)) + sortFunc := func(g1, g2 *promRules.Group) int { + fileCompare := strings.Compare(g1.File(), g2.File()) + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(g1.Name(), g2.Name()) + } + slices.SortFunc(a, sortFunc) + slices.SortFunc(b, sortFunc) + for i, gA := range a { + gB := b[i] + require.True(t, gA.Equals(gB), "group1", gA.Name(), "group2", gB.Name()) + } +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 42941bf59c3..3d8d716c85f 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -129,7 +129,8 @@ type Config struct { Ring RingConfig `yaml:"ring"` FlushCheckPeriod time.Duration `yaml:"flush_period"` - EnableAPI bool `yaml:"enable_api"` + EnableAPI bool `yaml:"enable_api"` + APIEnableRulesBackup bool `yaml:"api_enable_rules_backup"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` @@ -213,8 +214,12 @@ type MultiTenantManager interface { // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) + // BackUpRuleGroups is used to store backups of rule groups owned by a different ruler instance. + BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). GetRules(userID string) []*promRules.Group + // GetBackupRules fetches rules for a particular tenant (userID) that the ruler stores for backup purposes + GetBackupRules(userID string) []*promRules.Group // Stop stops all Manager components. Stop() // ValidateRuleGroup validates a rulegroup @@ -465,6 +470,7 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRu return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } + // Even if the replication factor is set to a number bigger than 1, only the first ruler evaluates the rule group ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} @@ -473,6 +479,29 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRu return ownsRuleGroup, nil } +func instanceBacksUpRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) { + hash := tokenForGroup(g) + + rlrs, err := r.Get(hash, RingOp, nil, nil, nil) + if err != nil { + return false, errors.Wrap(err, "error reading ring to verify rule group backup ownership") + } + + var backupRuleGroup bool + // Only the second up to the last replica is used a backup + for i := 1; i < len(rlrs.Instances); i++ { + if rlrs.Instances[i].Addr == instanceAddr { + backupRuleGroup = true + break + } + } + + if backupRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { + return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} + } + return backupRuleGroup, nil +} + func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if r.cfg.EnableSharding { r.ring.ServeHTTP(w, req) @@ -541,16 +570,20 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { r.ruleGroupSyncDuration.Set(ruleGroupSyncDuration) }() - loadedConfigs, err := r.loadRuleGroups(ctx) + loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx) if err != nil { return } // This will also delete local group files for users that are no longer in 'configs' map. r.manager.SyncRuleGroups(ctx, loadedConfigs) + + if r.cfg.APIEnableRulesBackup { + r.manager.BackUpRuleGroups(ctx, backupConfigs) + } } -func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { timer := prometheus.NewTimer(nil) defer func() { @@ -558,51 +591,65 @@ func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGrou r.ruleGroupStoreLoadDuration.Set(storeLoadSeconds) }() - configs, err := r.listRules(ctx) + ownedConfigs, backupConfigs, err := r.listRules(ctx) if err != nil { level.Error(r.logger).Log("msg", "unable to list rules", "err", err) - return nil, err + return nil, nil, err } - loadedConfigs, err := r.store.LoadRuleGroups(ctx, configs) + loadedOwnedConfigs, err := r.store.LoadRuleGroups(ctx, ownedConfigs) if err != nil { - level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(configs)-len(loadedConfigs), "err", err) + level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(ownedConfigs)-len(loadedOwnedConfigs), "err", err) } - return loadedConfigs, nil + if r.cfg.APIEnableRulesBackup { + loadedBackupConfigs, err := r.store.LoadRuleGroups(ctx, backupConfigs) + if err != nil { + level.Warn(r.logger).Log("msg", "failed to load some rules backed up by this ruler", "count", len(backupConfigs)-len(loadedBackupConfigs), "err", err) + } + return loadedOwnedConfigs, loadedBackupConfigs, nil + } + return loadedOwnedConfigs, nil, nil } -func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) { +func (r *Ruler) listRules(ctx context.Context) (owned map[string]rulespb.RuleGroupList, backedUp map[string]rulespb.RuleGroupList, err error) { switch { case !r.cfg.EnableSharding: - result, err = r.listRulesNoSharding(ctx) + owned, backedUp, err = r.listRulesNoSharding(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyDefault: - result, err = r.listRulesShardingDefault(ctx) + owned, backedUp, err = r.listRulesShardingDefault(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle: - result, err = r.listRulesShuffleSharding(ctx) + owned, backedUp, err = r.listRulesShuffleSharding(ctx) default: - return nil, errors.New("invalid sharding configuration") + return nil, nil, errors.New("invalid sharding configuration") } if err != nil { return } - for userID := range result { + for userID := range owned { + if !r.allowedTenants.IsAllowed(userID) { + level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) + delete(owned, userID) + } + } + + for userID := range backedUp { if !r.allowedTenants.IsAllowed(userID) { level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) - delete(result, userID) + delete(backedUp, userID) } } return } -func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { allRuleGroups, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } for userID, groups := range allRuleGroups { disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) @@ -619,29 +666,36 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul } allRuleGroups[userID] = filteredGroupsForUser } - return allRuleGroups, nil + return allRuleGroups, nil, nil } -func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { configs, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } - filteredConfigs := make(map[string]rulespb.RuleGroupList) + ownedConfigs := make(map[string]rulespb.RuleGroupList) + backedUpConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) > 0 { - filteredConfigs[userID] = filtered + owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(owned) > 0 { + ownedConfigs[userID] = owned + } + if r.cfg.APIEnableRulesBackup { + backup := filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(backup) > 0 { + backedUpConfigs[userID] = backup + } } } - return filteredConfigs, nil + return ownedConfigs, backedUpConfigs, nil } -func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { users, err := r.store.ListAllUsers(ctx) if err != nil { - return nil, errors.Wrap(err, "unable to list users of ruler") + return nil, nil, errors.Wrap(err, "unable to list users of ruler") } // Only users in userRings will be used in the to load the rules. @@ -662,7 +716,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } if len(userRings) == 0 { - return nil, nil + return nil, nil, nil } userCh := make(chan string, len(userRings)) @@ -672,7 +726,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp close(userCh) mu := sync.Mutex{} - result := map[string]rulespb.RuleGroupList{} + owned := map[string]rulespb.RuleGroupList{} + backedUp := map[string]rulespb.RuleGroupList{} concurrency := loadRulesConcurrency if len(userRings) < concurrency { @@ -688,13 +743,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) == 0 { + filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + var filterBackup []*rulespb.RuleGroupDesc + if r.cfg.APIEnableRulesBackup { + filterBackup = filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + } + if len(filterOwned) == 0 && len(filterBackup) == 0 { continue } - mu.Lock() - result[userID] = filtered + if len(filterOwned) > 0 { + owned[userID] = filterOwned + } + if len(filterBackup) > 0 { + backedUp[userID] = filterBackup + } mu.Unlock() } return nil @@ -702,7 +765,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } err = g.Wait() - return result, err + return owned, backedUp, err } // filterRuleGroups returns map of rule groups that given instance "owns" based on supplied ring. @@ -738,6 +801,38 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabl return result } +// filterBackupRuleGroups returns map of rule groups that given instance backs up based on supplied ring. +// This function only uses User, Namespace, and Name fields of individual RuleGroups. +// +// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, +// but only ring passed as parameter. +func filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { + var result []*rulespb.RuleGroupDesc + for _, g := range ruleGroups { + backup, err := instanceBacksUpRuleGroup(ring, g, disabledRuleGroups, instanceAddr) + if err != nil { + switch e := err.(type) { + case *DisabledRuleGroupErr: + level.Info(log).Log("msg", e.Message) + continue + default: + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } + } + + if backup { + level.Debug(log).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name) + result = append(result, g) + } else { + level.Debug(log).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) + } + } + + return result +} + // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { @@ -756,6 +851,11 @@ func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*Gro func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { groups := r.manager.GetRules(userID) + if r.cfg.APIEnableRulesBackup { + backupGroups := r.manager.GetBackupRules(userID) + groups = append(groups, backupGroups...) + } + groupDescs := make([]*GroupStateDesc, 0, len(groups)) prefix := filepath.Join(r.cfg.RulePath, userID) + "/" @@ -899,12 +999,19 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest } var ( - mergedMx sync.Mutex - merged []*GroupStateDesc + mtx sync.Mutex + merged []*GroupStateDesc + errs []error ) + failedZones := make(map[string]interface{}) - // Concurrently fetch rules from all rulers. Since rules are not replicated, - // we need all requests to succeed. + zoneByAddress := make(map[string]string) + if r.cfg.APIEnableRulesBackup { + for _, ruler := range rulers.Instances { + zoneByAddress[ruler.Addr] = ruler.Zone + } + } + // Concurrently fetch rules from all rulers. jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { addr := job.(string) @@ -920,17 +1027,35 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest Files: rulesRequest.GetFiles(), Type: rulesRequest.GetType(), }) + if err != nil { + level.Error(r.logger).Log("msg", "unable to retrieve rules from ruler", "addr", addr, "err", err) + // If APIEnableRulesBackup is enabled and there are enough rulers replicating the rules, we should + // be able to handle failures. + if r.cfg.APIEnableRulesBackup && len(jobs) >= r.cfg.Ring.ReplicationFactor { + mtx.Lock() + failedZones[zoneByAddress[addr]] = nil + errs = append(errs, err) + failed := (rulers.MaxUnavailableZones > 0 && len(failedZones) > rulers.MaxUnavailableZones) || (rulers.MaxUnavailableZones <= 0 && len(errs) > rulers.MaxErrors) + mtx.Unlock() + if !failed { + return nil + } + } return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr) } - mergedMx.Lock() + mtx.Lock() merged = append(merged, newGrps.Groups...) - mergedMx.Unlock() + mtx.Unlock() return nil }) + if err == nil && r.cfg.APIEnableRulesBackup { + merged = mergeGroupStateDesc(merged) + } + return merged, err } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 589a9d162fa..ff51ce7b8f3 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -338,13 +338,17 @@ func TestGetRules(t *testing.T) { type rulesMap map[string][]*rulespb.RuleDesc type testCase struct { - sharding bool - shardingStrategy string - shuffleShardSize int - rulesRequest RulesRequest - expectedCount map[string]int - rulerStateMap map[string]ring.InstanceState - expectedError error + sharding bool + shardingStrategy string + shuffleShardSize int + rulesRequest RulesRequest + expectedCount map[string]int + expectedClientCallCount int + rulerStateMap map[string]ring.InstanceState + rulerAZMap map[string]string + expectedError error + enableAPIRulesBackup bool + enableZoneAwareReplication bool } ruleMap := rulesMap{ @@ -456,6 +460,18 @@ func TestGetRules(t *testing.T) { "ruler3": ring.ACTIVE, } + rulerStateMapTwoPending := map[string]ring.InstanceState{ + "ruler1": ring.PENDING, + "ruler2": ring.PENDING, + "ruler3": ring.ACTIVE, + } + + rulerAZEvenSpread := map[string]string{ + "ruler1": "a", + "ruler2": "b", + "ruler3": "c", + } + expectedRules := expectedRulesMap{ "ruler1": map[string]rulespb.RuleGroupList{ "user1": { @@ -501,6 +517,7 @@ func TestGetRules(t *testing.T) { "user2": 4, "user3": 2, }, + expectedClientCallCount: len(expectedRules), }, "Default Sharding with No Filter": { sharding: true, @@ -511,6 +528,19 @@ func TestGetRules(t *testing.T) { "user2": 9, "user3": 3, }, + expectedClientCallCount: len(expectedRules), + }, + "Default Sharding with No Filter but with API Rules backup enabled": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + rulerStateMap: rulerStateMapAllActive, + expectedCount: map[string]int{ + "user1": 5, + "user2": 9, + "user3": 3, + }, + enableAPIRulesBackup: true, + expectedClientCallCount: len(expectedRules), }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter": { sharding: true, @@ -525,6 +555,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name Filter": { sharding: true, @@ -539,6 +570,7 @@ func TestGetRules(t *testing.T) { "user2": 1, "user3": 2, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name and Rule Type Filter": { sharding: true, @@ -554,6 +586,7 @@ func TestGetRules(t *testing.T) { "user2": 2, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type and Namespace Filters": { sharding: true, @@ -569,6 +602,7 @@ func TestGetRules(t *testing.T) { "user2": 0, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in LEAVING state": { sharding: true, @@ -583,6 +617,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in Pending state": { sharding: true, @@ -592,6 +627,99 @@ func TestGetRules(t *testing.T) { rulesRequest: RulesRequest{ Type: recordingRuleFilter, }, + expectedError: ring.ErrTooManyUnhealthyInstances, + expectedClientCallCount: 0, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 3, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and one ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapOnePending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 2, // one of the ruler is pending, so we don't expect that ruler to be called + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and two ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapTwoPending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedError: ring.ErrTooManyUnhealthyInstances, + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapAllActive, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 3, + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled and one ruler in pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapOnePending, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 2, // one of the ruler is pending, so we don't expect that ruler to be called + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled and two ruler in pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapTwoPending, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, expectedError: ring.ErrTooManyUnhealthyInstances, }, } @@ -611,6 +739,7 @@ func TestGetRules(t *testing.T) { cfg.ShardingStrategy = tc.shardingStrategy cfg.EnableSharding = tc.sharding + cfg.APIEnableRulesBackup = tc.enableAPIRulesBackup cfg.Ring = RingConfig{ InstanceID: id, @@ -620,6 +749,13 @@ func TestGetRules(t *testing.T) { }, ReplicationFactor: 1, } + if tc.enableAPIRulesBackup { + cfg.Ring.ReplicationFactor = 3 + cfg.Ring.ZoneAwarenessEnabled = tc.enableZoneAwareReplication + } + if tc.enableZoneAwareReplication { + cfg.Ring.InstanceZone = tc.rulerAZMap[id] + } r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} @@ -647,7 +783,7 @@ func TestGetRules(t *testing.T) { d = ring.NewDesc() } for rID, tokens := range allTokensByRuler { - d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, tc.rulerStateMap[rID], time.Now()) + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now()) } return d, true, nil }) @@ -666,6 +802,24 @@ func TestGetRules(t *testing.T) { forEachRuler(func(_ string, r *Ruler) { r.syncRules(context.Background(), rulerSyncReasonInitial) }) + + if tc.sharding { + // update the State of the rulers in the ring based on tc.rulerStateMap + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, tc.rulerStateMap[rID], time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + for u := range allRulesByUser { ctx := user.InjectOrgID(context.Background(), u) forEachRuler(func(_ string, r *Ruler) { @@ -685,9 +839,9 @@ func TestGetRules(t *testing.T) { mockPoolClient := r.clientsPool.(*mockRulerClientsPool) if tc.shardingStrategy == util.ShardingStrategyShuffle { - require.Equal(t, int32(tc.shuffleShardSize), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } else { - require.Equal(t, int32(len(rulerAddrMap)), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } mockPoolClient.numberOfCalls.Store(0) } @@ -696,13 +850,21 @@ func TestGetRules(t *testing.T) { totalLoadedRules := 0 totalConfiguredRules := 0 + ruleBackupCount := make(map[string]int) forEachRuler(func(rID string, r *Ruler) { - localRules, err := r.listRules(context.Background()) + localRules, localBackupRules, err := r.listRules(context.Background()) require.NoError(t, err) for _, rules := range localRules { totalLoadedRules += len(rules) } + for user, rules := range localBackupRules { + for _, rule := range rules { + key := user + rule.Namespace + rule.Name + c := ruleBackupCount[key] + ruleBackupCount[key] = c + 1 + } + } totalConfiguredRules += len(allRulesByRuler[rID]) }) @@ -713,6 +875,28 @@ func TestGetRules(t *testing.T) { numberOfRulers := len(rulerAddrMap) require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules) } + if tc.enableAPIRulesBackup && tc.sharding && tc.expectedError == nil { + // all rules should be backed up + require.Equal(t, totalConfiguredRules, len(ruleBackupCount)) + var hasUnhealthyRuler bool + for _, state := range tc.rulerStateMap { + if state != ring.ACTIVE && state != ring.LEAVING { + hasUnhealthyRuler = true + break + } + } + for _, v := range ruleBackupCount { + if !hasUnhealthyRuler { + // with replication factor set to 3, each rule is backed up by 2 rulers + require.Equal(t, 2, v) + } else { + require.GreaterOrEqual(t, v, 1) + } + } + } else { + // If APIEnableRulesBackup is disabled, rulers should not back up any rules + require.Equal(t, 0, len(ruleBackupCount)) + } }) } } @@ -746,14 +930,16 @@ func TestSharding(t *testing.T) { type expectedRulesMap map[string]map[string]rulespb.RuleGroupList type testCase struct { - sharding bool - shardingStrategy string - shuffleShardSize int - setupRing func(*ring.Desc) - enabledUsers []string - disabledUsers []string - - expectedRules expectedRulesMap + sharding bool + shardingStrategy string + enableAPIRulesBackup bool + replicationFactor int + shuffleShardSize int + setupRing func(*ring.Desc) + enabledUsers []string + disabledUsers []string + expectedRules expectedRulesMap + expectedBackupRules expectedRulesMap } const ( @@ -775,21 +961,24 @@ func TestSharding(t *testing.T) { testCases := map[string]testCase{ "no sharding": { - sharding: false, - expectedRules: expectedRulesMap{ruler1: allRules}, + sharding: false, + replicationFactor: 1, + expectedRules: expectedRulesMap{ruler1: allRules}, }, "no sharding, single user allowed": { - sharding: false, - enabledUsers: []string{user1}, + sharding: false, + replicationFactor: 1, + enabledUsers: []string{user1}, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, }}, }, "no sharding, single user disabled": { - sharding: false, - disabledUsers: []string{user1}, + sharding: false, + replicationFactor: 1, + disabledUsers: []string{user1}, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, user3: {user3Group1}, @@ -797,8 +986,9 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -806,9 +996,10 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -818,9 +1009,10 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -831,8 +1023,9 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -852,9 +1045,10 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -872,9 +1066,10 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -892,8 +1087,9 @@ func TestSharding(t *testing.T) { }, "default sharding, unhealthy ACTIVE ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -916,8 +1112,9 @@ func TestSharding(t *testing.T) { }, "default sharding, LEAVING ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now()) @@ -932,8 +1129,9 @@ func TestSharding(t *testing.T) { }, "default sharding, JOINING ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now()) @@ -948,8 +1146,9 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, single ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now()) @@ -961,9 +1160,10 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, multiple rulers, shard size 1": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 1, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) @@ -978,9 +1178,10 @@ func TestSharding(t *testing.T) { // Same test as previous one, but with shard size=2. Second ruler gets all the rules. "shuffle sharding, two rulers, shard size 2": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { // Exact same tokens setup as previous test. @@ -995,9 +1196,10 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, two rulers, shard size 1, distributed users": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 1, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now()) @@ -1015,9 +1217,10 @@ func TestSharding(t *testing.T) { }, }, "shuffle sharding, three rulers, shard size 2": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1039,9 +1242,10 @@ func TestSharding(t *testing.T) { }, }, "shuffle sharding, three rulers, shard size 2, ruler2 has no users": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now()) @@ -1062,10 +1266,11 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, three rulers, shard size 2, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1085,10 +1290,11 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, three rulers, shard size 2, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1105,6 +1311,40 @@ func TestSharding(t *testing.T) { }, }, }, + + "shuffle sharding, three rulers, shard size 2, enable api backup": { + sharding: true, + replicationFactor: 2, + shardingStrategy: util.ShardingStrategyShuffle, + enableAPIRulesBackup: true, + shuffleShardSize: 2, + enabledUsers: []string{user1}, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + ruler3: map[string]rulespb.RuleGroupList{}, + }, + expectedBackupRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + ruler3: map[string]rulespb.RuleGroupList{}, + }, + }, } for name, tc := range testCases { @@ -1115,8 +1355,9 @@ func TestSharding(t *testing.T) { setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { store := newMockRuleStore(allRules, nil) cfg := Config{ - EnableSharding: tc.sharding, - ShardingStrategy: tc.shardingStrategy, + EnableSharding: tc.sharding, + APIEnableRulesBackup: tc.enableAPIRulesBackup, + ShardingStrategy: tc.shardingStrategy, Ring: RingConfig{ InstanceID: id, InstanceAddr: host, @@ -1125,7 +1366,7 @@ func TestSharding(t *testing.T) { Mock: kvStore, }, HeartbeatTimeout: 1 * time.Minute, - ReplicationFactor: 1, + ReplicationFactor: tc.replicationFactor, }, FlushCheckPeriod: 0, EnabledTenants: tc.enabledUsers, @@ -1175,23 +1416,28 @@ func TestSharding(t *testing.T) { } // Always add ruler1 to expected rulers, even if there is no ring (no sharding). - loadedRules1, err := r1.listRules(context.Background()) + loadedRules1, backupRules1, err := r1.listRules(context.Background()) require.NoError(t, err) expected := expectedRulesMap{ ruler1: loadedRules1, } + expectedBackup := expectedRulesMap{ + ruler1: backupRules1, + } + addToExpected := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, backup, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one. if loaded == nil { loaded = map[string]rulespb.RuleGroupList{} } expected[id] = loaded + expectedBackup[id] = backup } } @@ -1199,6 +1445,14 @@ func TestSharding(t *testing.T) { addToExpected(ruler3, r3) require.Equal(t, tc.expectedRules, expected) + + if !tc.enableAPIRulesBackup { + require.Equal(t, 0, len(expectedBackup[ruler1])) + require.Equal(t, 0, len(expectedBackup[ruler2])) + require.Equal(t, 0, len(expectedBackup[ruler3])) + } else { + require.Equal(t, tc.expectedBackupRules, expectedBackup) + } }) } } @@ -1290,7 +1544,7 @@ func Test_LoadPartialGroups(t *testing.T) { len(r1.manager.GetRules(user3)) > 0 }) - returned, err := r1.listRules(context.Background()) + returned, _, err := r1.listRules(context.Background()) require.NoError(t, err) require.Equal(t, returned, allRules) require.Equal(t, 2, len(manager.userManagers)) @@ -1835,7 +2089,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { } actualRules := map[string]rulespb.RuleGroupList{} - loadedRules, err := r1.listRules(context.Background()) + loadedRules, _, err := r1.listRules(context.Background()) require.NoError(t, err) for k, v := range loadedRules { if len(v) > 0 { @@ -1846,7 +2100,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { fetchRules := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, _, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one.