From 014235e3db626f11798fab9edca5fc69b1aba0af Mon Sep 17 00:00:00 2001 From: Emmanuel Lodovice Date: Tue, 27 Feb 2024 13:41:13 -0800 Subject: [PATCH] WIP Signed-off-by: Emmanuel Lodovice --- pkg/ruler/manager.go | 14 ++- pkg/ruler/merger.go | 43 +++++++ pkg/ruler/rule_backup_manager.go | 107 ++++++++++++++++ pkg/ruler/ruler.go | 205 +++++++++++++++++++++++++------ pkg/ruler/ruler_test.go | 145 +++++++++++++++++++--- 5 files changed, 457 insertions(+), 57 deletions(-) create mode 100644 pkg/ruler/merger.go create mode 100644 pkg/ruler/rule_backup_manager.go diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 50dc6aebe18..82b13980d0a 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct { notifiers map[string]*rulerNotifier notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics + // rules backup + rulesBackupManager *rulesBackupManager + managersTotal prometheus.Gauge lastReloadSuccessful *prometheus.GaugeVec lastReloadSuccessfulTimestamp *prometheus.GaugeVec @@ -85,6 +88,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva mapper: newMapper(cfg.RulePath, logger), userManagers: map[string]RulesManager{}, userManagerMetrics: userManagerMetrics, + rulesBackupManager: newRulesBackupManager(cfg, logger), managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "ruler_managers_total", @@ -142,8 +146,12 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.managersTotal.Set(float64(len(r.userManagers))) } +func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + r.rulesBackupManager.backUpRuleGroups(ctx, ruleGroups) +} + // syncRulesToManager maps the rule files to disk, detects any changes and will create/update the -// the users Prometheus Rules Manager. +// users Prometheus Rules Manager. func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) { // Map the files to disk and return the file names to be passed to the users manager if they // have been updated @@ -279,6 +287,10 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { return groups } +func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group { + return r.rulesBackupManager.getRuleGroups(userID) +} + func (r *DefaultMultiTenantManager) Stop() { r.notifiersMtx.Lock() for _, n := range r.notifiers { diff --git a/pkg/ruler/merger.go b/pkg/ruler/merger.go new file mode 100644 index 00000000000..dfa72a8375f --- /dev/null +++ b/pkg/ruler/merger.go @@ -0,0 +1,43 @@ +package ruler + +import ( + "strings" + "time" + + "golang.org/x/exp/slices" + + promRules "github.com/prometheus/prometheus/rules" +) + +func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc { + states := make(map[string]*GroupStateDesc) + rgTime := make(map[string]time.Time) + for _, state := range in { + latestTs := state.EvaluationTimestamp + for _, r := range state.ActiveRules { + if latestTs.Before(r.EvaluationTimestamp) { + latestTs = r.EvaluationTimestamp + } + } + key := promRules.GroupKey(state.Group.Namespace, state.Group.Name) + ts, ok := rgTime[key] + if !ok || ts.Before(latestTs) { + states[key] = state + rgTime[key] = latestTs + } + } + groups := make([]*GroupStateDesc, 0, len(states)) + for _, state := range states { + groups = append(groups, state) + } + slices.SortFunc(groups, func(a, b *GroupStateDesc) int { + fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace) + + // If the namespace is the same, check the group name + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(a.Group.Name, b.Group.Name) + }) + return groups +} diff --git a/pkg/ruler/rule_backup_manager.go b/pkg/ruler/rule_backup_manager.go new file mode 100644 index 00000000000..f6db3e1969d --- /dev/null +++ b/pkg/ruler/rule_backup_manager.go @@ -0,0 +1,107 @@ +package ruler + +import ( + "context" + "errors" + "strings" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/rulefmt" + "github.com/prometheus/prometheus/promql/parser" + promRules "github.com/prometheus/prometheus/rules" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" +) + +// Implements GroupLoader interface but instead of reading from a file when Load is called, it returns the +// rulefmt.RuleGroup it has stored +type loader struct { + ruleGroups map[string][]rulefmt.RuleGroup +} + +func (r *loader) Load(identifier string) (*rulefmt.RuleGroups, []error) { + return &rulefmt.RuleGroups{ + Groups: r.ruleGroups[identifier], + }, nil +} + +func (r *loader) Parse(query string) (parser.Expr, error) { + return parser.ParseExpr(query) +} + +type rulesBackupManager struct { + backupRuleGroupsMtx sync.RWMutex + backupRuleGroups map[string][]*promRules.Group + cfg Config + + logger log.Logger +} + +func newRulesBackupManager(cfg Config, logger log.Logger) *rulesBackupManager { + return &rulesBackupManager{ + backupRuleGroups: make(map[string][]*promRules.Group), + cfg: cfg, + logger: logger, + } +} + +func (r *rulesBackupManager) backUpRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + r.backupRuleGroupsMtx.Lock() + defer r.backupRuleGroupsMtx.Unlock() + backupRuleGroups := make(map[string][]*promRules.Group) + for user, groups := range ruleGroups { + g, err := r.ruleGroupListToPromGroups(user, groups) + if err != nil { + // TODO: Increment a metric + level.Error(r.logger).Log("msg", "unable to back up rules", "user", user, "err", err) + continue + } + backupRuleGroups[user] = g + } + r.backupRuleGroups = backupRuleGroups +} + +// ruleGroupListToPromGroups converts rulespb.RuleGroupList to []*promRules.Group by creating a single use +// promRules.Manager and calling its LoadGroups method. +func (r *rulesBackupManager) ruleGroupListToPromGroups(user string, ruleGroups rulespb.RuleGroupList) ([]*promRules.Group, error) { + rgs := ruleGroups.Formatted() + + loader := &loader{ + ruleGroups: rgs, + } + promManager := promRules.NewManager(&promRules.ManagerOptions{ + ExternalURL: r.cfg.ExternalURL.URL, + GroupLoader: loader, + }) + + namespaces := make([]string, 0, len(rgs)) + for k := range rgs { + namespaces = append(namespaces, k) + } + level.Info(r.logger).Log("msg", "backup rules for user", "user", user, "namespaces", strings.Join(namespaces, ",")) + loadedGroups, errs := promManager.LoadGroups(r.cfg.EvaluationInterval, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), nil, namespaces...) + if errs != nil { + for _, e := range errs { + level.Error(r.logger).Log("msg", "loading groups to backup failed", "user", user, "namespaces", namespaces, "err", e) + } + return nil, errors.New("error loading rules to backup") + } + + groups := make([]*promRules.Group, 0, len(loadedGroups)) + for _, g := range loadedGroups { + groups = append(groups, g) + } + return groups, nil +} + +func (r *rulesBackupManager) getRuleGroups(userID string) []*promRules.Group { + var result []*promRules.Group + r.backupRuleGroupsMtx.RLock() + defer r.backupRuleGroupsMtx.RUnlock() + if groups, exists := r.backupRuleGroups[userID]; exists { + result = groups + } + return result +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 42941bf59c3..3d8d716c85f 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -129,7 +129,8 @@ type Config struct { Ring RingConfig `yaml:"ring"` FlushCheckPeriod time.Duration `yaml:"flush_period"` - EnableAPI bool `yaml:"enable_api"` + EnableAPI bool `yaml:"enable_api"` + APIEnableRulesBackup bool `yaml:"api_enable_rules_backup"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` @@ -213,8 +214,12 @@ type MultiTenantManager interface { // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) + // BackUpRuleGroups is used to store backups of rule groups owned by a different ruler instance. + BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). GetRules(userID string) []*promRules.Group + // GetBackupRules fetches rules for a particular tenant (userID) that the ruler stores for backup purposes + GetBackupRules(userID string) []*promRules.Group // Stop stops all Manager components. Stop() // ValidateRuleGroup validates a rulegroup @@ -465,6 +470,7 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRu return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } + // Even if the replication factor is set to a number bigger than 1, only the first ruler evaluates the rule group ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} @@ -473,6 +479,29 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRu return ownsRuleGroup, nil } +func instanceBacksUpRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) { + hash := tokenForGroup(g) + + rlrs, err := r.Get(hash, RingOp, nil, nil, nil) + if err != nil { + return false, errors.Wrap(err, "error reading ring to verify rule group backup ownership") + } + + var backupRuleGroup bool + // Only the second up to the last replica is used a backup + for i := 1; i < len(rlrs.Instances); i++ { + if rlrs.Instances[i].Addr == instanceAddr { + backupRuleGroup = true + break + } + } + + if backupRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { + return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} + } + return backupRuleGroup, nil +} + func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if r.cfg.EnableSharding { r.ring.ServeHTTP(w, req) @@ -541,16 +570,20 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { r.ruleGroupSyncDuration.Set(ruleGroupSyncDuration) }() - loadedConfigs, err := r.loadRuleGroups(ctx) + loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx) if err != nil { return } // This will also delete local group files for users that are no longer in 'configs' map. r.manager.SyncRuleGroups(ctx, loadedConfigs) + + if r.cfg.APIEnableRulesBackup { + r.manager.BackUpRuleGroups(ctx, backupConfigs) + } } -func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { timer := prometheus.NewTimer(nil) defer func() { @@ -558,51 +591,65 @@ func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGrou r.ruleGroupStoreLoadDuration.Set(storeLoadSeconds) }() - configs, err := r.listRules(ctx) + ownedConfigs, backupConfigs, err := r.listRules(ctx) if err != nil { level.Error(r.logger).Log("msg", "unable to list rules", "err", err) - return nil, err + return nil, nil, err } - loadedConfigs, err := r.store.LoadRuleGroups(ctx, configs) + loadedOwnedConfigs, err := r.store.LoadRuleGroups(ctx, ownedConfigs) if err != nil { - level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(configs)-len(loadedConfigs), "err", err) + level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(ownedConfigs)-len(loadedOwnedConfigs), "err", err) } - return loadedConfigs, nil + if r.cfg.APIEnableRulesBackup { + loadedBackupConfigs, err := r.store.LoadRuleGroups(ctx, backupConfigs) + if err != nil { + level.Warn(r.logger).Log("msg", "failed to load some rules backed up by this ruler", "count", len(backupConfigs)-len(loadedBackupConfigs), "err", err) + } + return loadedOwnedConfigs, loadedBackupConfigs, nil + } + return loadedOwnedConfigs, nil, nil } -func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) { +func (r *Ruler) listRules(ctx context.Context) (owned map[string]rulespb.RuleGroupList, backedUp map[string]rulespb.RuleGroupList, err error) { switch { case !r.cfg.EnableSharding: - result, err = r.listRulesNoSharding(ctx) + owned, backedUp, err = r.listRulesNoSharding(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyDefault: - result, err = r.listRulesShardingDefault(ctx) + owned, backedUp, err = r.listRulesShardingDefault(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle: - result, err = r.listRulesShuffleSharding(ctx) + owned, backedUp, err = r.listRulesShuffleSharding(ctx) default: - return nil, errors.New("invalid sharding configuration") + return nil, nil, errors.New("invalid sharding configuration") } if err != nil { return } - for userID := range result { + for userID := range owned { + if !r.allowedTenants.IsAllowed(userID) { + level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) + delete(owned, userID) + } + } + + for userID := range backedUp { if !r.allowedTenants.IsAllowed(userID) { level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) - delete(result, userID) + delete(backedUp, userID) } } return } -func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { allRuleGroups, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } for userID, groups := range allRuleGroups { disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) @@ -619,29 +666,36 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul } allRuleGroups[userID] = filteredGroupsForUser } - return allRuleGroups, nil + return allRuleGroups, nil, nil } -func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { configs, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } - filteredConfigs := make(map[string]rulespb.RuleGroupList) + ownedConfigs := make(map[string]rulespb.RuleGroupList) + backedUpConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) > 0 { - filteredConfigs[userID] = filtered + owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(owned) > 0 { + ownedConfigs[userID] = owned + } + if r.cfg.APIEnableRulesBackup { + backup := filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(backup) > 0 { + backedUpConfigs[userID] = backup + } } } - return filteredConfigs, nil + return ownedConfigs, backedUpConfigs, nil } -func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { users, err := r.store.ListAllUsers(ctx) if err != nil { - return nil, errors.Wrap(err, "unable to list users of ruler") + return nil, nil, errors.Wrap(err, "unable to list users of ruler") } // Only users in userRings will be used in the to load the rules. @@ -662,7 +716,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } if len(userRings) == 0 { - return nil, nil + return nil, nil, nil } userCh := make(chan string, len(userRings)) @@ -672,7 +726,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp close(userCh) mu := sync.Mutex{} - result := map[string]rulespb.RuleGroupList{} + owned := map[string]rulespb.RuleGroupList{} + backedUp := map[string]rulespb.RuleGroupList{} concurrency := loadRulesConcurrency if len(userRings) < concurrency { @@ -688,13 +743,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) == 0 { + filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + var filterBackup []*rulespb.RuleGroupDesc + if r.cfg.APIEnableRulesBackup { + filterBackup = filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + } + if len(filterOwned) == 0 && len(filterBackup) == 0 { continue } - mu.Lock() - result[userID] = filtered + if len(filterOwned) > 0 { + owned[userID] = filterOwned + } + if len(filterBackup) > 0 { + backedUp[userID] = filterBackup + } mu.Unlock() } return nil @@ -702,7 +765,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } err = g.Wait() - return result, err + return owned, backedUp, err } // filterRuleGroups returns map of rule groups that given instance "owns" based on supplied ring. @@ -738,6 +801,38 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabl return result } +// filterBackupRuleGroups returns map of rule groups that given instance backs up based on supplied ring. +// This function only uses User, Namespace, and Name fields of individual RuleGroups. +// +// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, +// but only ring passed as parameter. +func filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { + var result []*rulespb.RuleGroupDesc + for _, g := range ruleGroups { + backup, err := instanceBacksUpRuleGroup(ring, g, disabledRuleGroups, instanceAddr) + if err != nil { + switch e := err.(type) { + case *DisabledRuleGroupErr: + level.Info(log).Log("msg", e.Message) + continue + default: + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } + } + + if backup { + level.Debug(log).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name) + result = append(result, g) + } else { + level.Debug(log).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) + } + } + + return result +} + // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { @@ -756,6 +851,11 @@ func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*Gro func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { groups := r.manager.GetRules(userID) + if r.cfg.APIEnableRulesBackup { + backupGroups := r.manager.GetBackupRules(userID) + groups = append(groups, backupGroups...) + } + groupDescs := make([]*GroupStateDesc, 0, len(groups)) prefix := filepath.Join(r.cfg.RulePath, userID) + "/" @@ -899,12 +999,19 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest } var ( - mergedMx sync.Mutex - merged []*GroupStateDesc + mtx sync.Mutex + merged []*GroupStateDesc + errs []error ) + failedZones := make(map[string]interface{}) - // Concurrently fetch rules from all rulers. Since rules are not replicated, - // we need all requests to succeed. + zoneByAddress := make(map[string]string) + if r.cfg.APIEnableRulesBackup { + for _, ruler := range rulers.Instances { + zoneByAddress[ruler.Addr] = ruler.Zone + } + } + // Concurrently fetch rules from all rulers. jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { addr := job.(string) @@ -920,17 +1027,35 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest Files: rulesRequest.GetFiles(), Type: rulesRequest.GetType(), }) + if err != nil { + level.Error(r.logger).Log("msg", "unable to retrieve rules from ruler", "addr", addr, "err", err) + // If APIEnableRulesBackup is enabled and there are enough rulers replicating the rules, we should + // be able to handle failures. + if r.cfg.APIEnableRulesBackup && len(jobs) >= r.cfg.Ring.ReplicationFactor { + mtx.Lock() + failedZones[zoneByAddress[addr]] = nil + errs = append(errs, err) + failed := (rulers.MaxUnavailableZones > 0 && len(failedZones) > rulers.MaxUnavailableZones) || (rulers.MaxUnavailableZones <= 0 && len(errs) > rulers.MaxErrors) + mtx.Unlock() + if !failed { + return nil + } + } return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr) } - mergedMx.Lock() + mtx.Lock() merged = append(merged, newGrps.Groups...) - mergedMx.Unlock() + mtx.Unlock() return nil }) + if err == nil && r.cfg.APIEnableRulesBackup { + merged = mergeGroupStateDesc(merged) + } + return merged, err } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 589a9d162fa..8ad0b8b9276 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -338,13 +338,16 @@ func TestGetRules(t *testing.T) { type rulesMap map[string][]*rulespb.RuleDesc type testCase struct { - sharding bool - shardingStrategy string - shuffleShardSize int - rulesRequest RulesRequest - expectedCount map[string]int - rulerStateMap map[string]ring.InstanceState - expectedError error + sharding bool + shardingStrategy string + shuffleShardSize int + rulesRequest RulesRequest + expectedCount map[string]int + expectedClientCallCount int + rulerStateMap map[string]ring.InstanceState + expectedError error + enableAPIRulesBackup bool + enableZoneAwareReplication bool } ruleMap := rulesMap{ @@ -456,6 +459,12 @@ func TestGetRules(t *testing.T) { "ruler3": ring.ACTIVE, } + rulerStateMapTwoPending := map[string]ring.InstanceState{ + "ruler1": ring.PENDING, + "ruler2": ring.PENDING, + "ruler3": ring.ACTIVE, + } + expectedRules := expectedRulesMap{ "ruler1": map[string]rulespb.RuleGroupList{ "user1": { @@ -501,6 +510,7 @@ func TestGetRules(t *testing.T) { "user2": 4, "user3": 2, }, + expectedClientCallCount: len(expectedRules), }, "Default Sharding with No Filter": { sharding: true, @@ -511,6 +521,19 @@ func TestGetRules(t *testing.T) { "user2": 9, "user3": 3, }, + expectedClientCallCount: len(expectedRules), + }, + "Default Sharding with No Filter but with API Rules backup enabled": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + rulerStateMap: rulerStateMapAllActive, + expectedCount: map[string]int{ + "user1": 5, + "user2": 9, + "user3": 3, + }, + enableAPIRulesBackup: true, + expectedClientCallCount: len(expectedRules), }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter": { sharding: true, @@ -525,6 +548,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name Filter": { sharding: true, @@ -539,6 +563,7 @@ func TestGetRules(t *testing.T) { "user2": 1, "user3": 2, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name and Rule Type Filter": { sharding: true, @@ -554,6 +579,7 @@ func TestGetRules(t *testing.T) { "user2": 2, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type and Namespace Filters": { sharding: true, @@ -569,6 +595,7 @@ func TestGetRules(t *testing.T) { "user2": 0, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in LEAVING state": { sharding: true, @@ -583,6 +610,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in Pending state": { sharding: true, @@ -592,7 +620,52 @@ func TestGetRules(t *testing.T) { rulesRequest: RulesRequest{ Type: recordingRuleFilter, }, - expectedError: ring.ErrTooManyUnhealthyInstances, + expectedError: ring.ErrTooManyUnhealthyInstances, + expectedClientCallCount: 0, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 3, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and one ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapOnePending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 2, // one of the ruler is pending, so we don't expect that ruler to be called + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and two ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapTwoPending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedError: ring.ErrTooManyUnhealthyInstances, + expectedClientCallCount: 0, }, } @@ -611,6 +684,7 @@ func TestGetRules(t *testing.T) { cfg.ShardingStrategy = tc.shardingStrategy cfg.EnableSharding = tc.sharding + cfg.APIEnableRulesBackup = tc.enableAPIRulesBackup cfg.Ring = RingConfig{ InstanceID: id, @@ -620,6 +694,10 @@ func TestGetRules(t *testing.T) { }, ReplicationFactor: 1, } + if tc.enableAPIRulesBackup { + cfg.Ring.ReplicationFactor = 3 + cfg.Ring.ZoneAwarenessEnabled = tc.enableZoneAwareReplication + } r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} @@ -685,9 +763,9 @@ func TestGetRules(t *testing.T) { mockPoolClient := r.clientsPool.(*mockRulerClientsPool) if tc.shardingStrategy == util.ShardingStrategyShuffle { - require.Equal(t, int32(tc.shuffleShardSize), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } else { - require.Equal(t, int32(len(rulerAddrMap)), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } mockPoolClient.numberOfCalls.Store(0) } @@ -696,13 +774,21 @@ func TestGetRules(t *testing.T) { totalLoadedRules := 0 totalConfiguredRules := 0 + ruleBackupCount := make(map[string]int) forEachRuler(func(rID string, r *Ruler) { - localRules, err := r.listRules(context.Background()) + localRules, localBackupRules, err := r.listRules(context.Background()) require.NoError(t, err) for _, rules := range localRules { totalLoadedRules += len(rules) } + for user, rules := range localBackupRules { + for _, rule := range rules { + key := user + rule.Namespace + rule.Name + c := ruleBackupCount[key] + ruleBackupCount[key] = c + 1 + } + } totalConfiguredRules += len(allRulesByRuler[rID]) }) @@ -713,6 +799,33 @@ func TestGetRules(t *testing.T) { numberOfRulers := len(rulerAddrMap) require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules) } + if tc.enableAPIRulesBackup && tc.sharding && tc.expectedError == nil { + // all rules should be backed up + require.Equal(t, totalConfiguredRules, len(ruleBackupCount)) + var hasUnhealthyRuler bool + for _, state := range tc.rulerStateMap { + if state != ring.ACTIVE && state != ring.LEAVING { + hasUnhealthyRuler = true + break + } + } + minBackupCount := 100 + for _, v := range ruleBackupCount { + if minBackupCount > v { + minBackupCount = v + } + } + if !hasUnhealthyRuler { + // with replication factor set to 3, 2 rulers backup a rule + require.Equal(t, 2, minBackupCount) + } else { + require.Equal(t, 1, minBackupCount) + } + + } else { + // If APIEnableRulesBackup is disabled, rulers should not back up any rules + require.Equal(t, 0, len(ruleBackupCount)) + } }) } } @@ -1175,7 +1288,7 @@ func TestSharding(t *testing.T) { } // Always add ruler1 to expected rulers, even if there is no ring (no sharding). - loadedRules1, err := r1.listRules(context.Background()) + loadedRules1, _, err := r1.listRules(context.Background()) require.NoError(t, err) expected := expectedRulesMap{ @@ -1185,7 +1298,7 @@ func TestSharding(t *testing.T) { addToExpected := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, _, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one. if loaded == nil { @@ -1290,7 +1403,7 @@ func Test_LoadPartialGroups(t *testing.T) { len(r1.manager.GetRules(user3)) > 0 }) - returned, err := r1.listRules(context.Background()) + returned, _, err := r1.listRules(context.Background()) require.NoError(t, err) require.Equal(t, returned, allRules) require.Equal(t, 2, len(manager.userManagers)) @@ -1835,7 +1948,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { } actualRules := map[string]rulespb.RuleGroupList{} - loadedRules, err := r1.listRules(context.Background()) + loadedRules, _, err := r1.listRules(context.Background()) require.NoError(t, err) for k, v := range loadedRules { if len(v) > 0 { @@ -1846,7 +1959,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { fetchRules := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, _, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one.