diff --git a/CHANGELOG.md b/CHANGELOG.md index 15bbbd8351..b9d7c2e01e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766 * [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names and values, only results from quorum number of zones will be included and merged. #5779 * [FEATURE] Storage Cache Clients: Add config `set_async_circuit_breaker_config` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. Implemented in both memcached and redis cache clients. #5789 +* [FEATURE] Ruler: Add experimental `experimental.ruler.api-deduplicate-rules` flag to remove duplicate rule groups from the Prometheus compatible rules API endpoint. Add experimental `ruler.ring.replication-factor` and `ruler.ring.zone-awareness-enabled` flags to configure rule group replication, but only the first ruler in the replicaset evaluates the rule group, the rest will just hold a copy as backup. Add experimental `experimental.ruler.api-enable-rules-backup` flag to configure rulers to send the rule group backups stored in the replicaset to handle events when a ruler is down during an API request to list rules. #5782 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 * [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f28da2b8cf..39a19734af 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4233,6 +4233,16 @@ ring: # CLI flag: -ruler.ring.heartbeat-timeout [heartbeat_timeout: | default = 1m] + # EXPERIMENTAL: The replication factor to use when loading rule groups for API + # HA. + # CLI flag: -ruler.ring.replication-factor + [replication_factor: | default = 1] + + # EXPERIMENTAL: True to enable zone-awareness and load rule groups across + # different availability zones for API HA. + # CLI flag: -ruler.ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + # Name of network interface to read address from. # CLI flag: -ruler.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] @@ -4254,6 +4264,21 @@ ring: # CLI flag: -experimental.ruler.enable-api [enable_api: | default = false] +# EXPERIMENTAL: Enable rulers to store a copy of rules owned by other rulers +# with default state (state before any evaluation) and send this copy in list +# API requests as backup in case the ruler who owns the rule fails to send its +# rules. This allows the rules API to handle ruler outage by returning rules +# with default state. Ring replication-factor needs to be set to 2 or more for +# this to be useful. +# CLI flag: -experimental.ruler.api-enable-rules-backup +[api_enable_rules_backup: | default = false] + +# EXPERIMENTAL: Remove duplicate rules in the prometheus rules and alerts API +# response. If there are duplicate rules the rule with the latest evaluation +# timestamp will be kept. +# CLI flag: -experimental.ruler.api-deduplicate-rules +[api_deduplicate_rules: | default = false] + # Comma separated list of tenants whose rules this ruler can evaluate. If # specified, only these tenants will be handled by ruler, otherwise this ruler # can process rules from all tenants. Subject to sharding. diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 8cc8da26ad..81e2440c76 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -395,6 +395,14 @@ func TestRulerSharding(t *testing.T) { } func TestRulerAPISharding(t *testing.T) { + testRulerAPIWithSharding(t, false) +} + +func TestRulerAPIShardingWithAPIRulesBackupEnabled(t *testing.T) { + testRulerAPIWithSharding(t, true) +} + +func testRulerAPIWithSharding(t *testing.T, enableAPIRulesBackup bool) { const numRulesGroups = 100 random := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -444,24 +452,30 @@ func TestRulerAPISharding(t *testing.T) { require.NoError(t, s.StartAndWaitReady(consul, minio)) // Configure the ruler. + overrides := map[string]string{ + // Since we're not going to run any rule, we don't need the + // store-gateway to be configured to a valid address. + "-querier.store-gateway-addresses": "localhost:12345", + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + } + if enableAPIRulesBackup { + overrides["-ruler.ring.replication-factor"] = "3" + overrides["-experimental.ruler.api-enable-rules-backup"] = "true" + } rulerFlags := mergeFlags( BlocksStorageFlags(), RulerFlags(), RulerShardingFlags(consul.NetworkHTTPEndpoint()), - map[string]string{ - // Since we're not going to run any rule, we don't need the - // store-gateway to be configured to a valid address. - "-querier.store-gateway-addresses": "localhost:12345", - // Enable the bucket index so we can skip the initial bucket scan. - "-blocks-storage.bucket-store.bucket-index.enabled": "true", - }, + overrides, ) // Start rulers. ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "") ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "") - rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2) - require.NoError(t, s.StartAndWaitReady(ruler1, ruler2)) + ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "") + rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3) + require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3)) // Upload rule groups to one of the rulers. c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1") @@ -542,6 +556,10 @@ func TestRulerAPISharding(t *testing.T) { }, } // For each test case, fetch the rules with configured filters, and ensure the results match. + if enableAPIRulesBackup { + err := ruler2.Kill() // if api-enable-rules-backup is enabled the APIs should be able to handle a ruler going down + require.NoError(t, err) + } for name, tc := range testCases { t.Run(name, func(t *testing.T) { actualGroups, err := c.GetPrometheusRules(tc.filter) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index e237a43a92..ab19ece87b 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -766,6 +766,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op ring.Operation) (map[string]r return args.Get(0).(map[string]ring.InstanceDesc), args.Error(1) } +func (r *RingMock) GetAllInstanceDescs(op ring.Operation) ([]ring.InstanceDesc, []ring.InstanceDesc, error) { + args := r.Called(op) + return args.Get(0).([]ring.InstanceDesc), make([]ring.InstanceDesc, 0), args.Error(1) +} + func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { args := r.Called(op) return args.Get(0).(ring.ReplicationSet), args.Error(1) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index aad67332ef..bc588ba3cf 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -50,6 +50,9 @@ type ReadRing interface { // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) + // GetAllInstanceDescs returns a slice of healthy and unhealthy InstanceDesc. + GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) + // GetInstanceDescsForOperation returns map of InstanceDesc with instance ID as the keys. GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error) @@ -464,6 +467,28 @@ func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { }, nil } +// GetAllInstanceDescs implements ReadRing. +func (r *Ring) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 { + return nil, nil, ErrEmptyRing + } + healthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters)) + unhealthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters)) + storageLastUpdate := r.KVClient.LastUpdateTime(r.key) + for _, instance := range r.ringDesc.Ingesters { + if r.IsHealthy(&instance, op, storageLastUpdate) { + healthyInstances = append(healthyInstances, instance) + } else { + unhealthyInstances = append(unhealthyInstances, instance) + } + } + + return healthyInstances, unhealthyInstances, nil +} + // GetInstanceDescsForOperation implements ReadRing. func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error) { r.mtx.RLock() diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index cf8582d644..82bb096d02 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -959,6 +959,41 @@ func TestRing_GetInstanceDescsForOperation(t *testing.T) { }, instanceDescs) } +func TestRing_GetAllInstanceDescs(t *testing.T) { + now := time.Now().Unix() + twoMinutesAgo := time.Now().Add(-2 * time.Minute).Unix() + + ringDesc := &Desc{Ingesters: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now}, + "instance-2": {Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now}, // not healthy state + "instance-3": {Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo}, // heartbeat timed out + }} + + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } + + testOp := NewOp([]InstanceState{ACTIVE}, nil) + + healthyInstanceDescs, unhealthyInstanceDescs, err := ring.GetAllInstanceDescs(testOp) + require.NoError(t, err) + require.EqualValues(t, []InstanceDesc{ + {Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now}, + }, healthyInstanceDescs) + sort.Slice(unhealthyInstanceDescs, func(i, j int) bool { return unhealthyInstanceDescs[i].Addr < unhealthyInstanceDescs[j].Addr }) + require.EqualValues(t, []InstanceDesc{ + {Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now}, + {Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo}, + }, unhealthyInstanceDescs) +} + func TestRing_GetReplicationSetForOperation(t *testing.T) { now := time.Now() g := NewRandomTokenGenerator() diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 0264f54f86..29fac1f2bf 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -36,6 +36,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op Operation) (map[string]Instan return args.Get(0).(map[string]InstanceDesc), args.Error(1) } +func (r *RingMock) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) { + args := r.Called(op) + return args.Get(0).([]InstanceDesc), make([]InstanceDesc, 0), args.Error(1) +} + func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { args := r.Called(op) return args.Get(0).(ReplicationSet), args.Error(1) diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index bcb3df70dd..225f65683f 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -258,6 +258,9 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { // keep data.groups are in order sort.Slice(groups, func(i, j int) bool { + if groups[i].File == groups[j].File { + return groups[i].Name < groups[j].Name + } return groups[i].File < groups[j].File }) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 95bf7c43cc..57d2d2907f 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct { notifiers map[string]*rulerNotifier notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics + // rules backup + rulesBackupManager *rulesBackupManager + managersTotal prometheus.Gauge lastReloadSuccessful *prometheus.GaugeVec lastReloadSuccessfulTimestamp *prometheus.GaugeVec @@ -79,7 +82,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva os.Exit(1) } - return &DefaultMultiTenantManager{ + m := &DefaultMultiTenantManager{ cfg: cfg, notifierCfg: ncfg, managerFactory: managerFactory, @@ -112,7 +115,11 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva }, []string{"user"}), registry: reg, logger: logger, - }, nil + } + if cfg.APIEnableRulesBackup { + m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg) + } + return m, nil } func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { @@ -161,8 +168,14 @@ func (r *DefaultMultiTenantManager) deleteRuleCache(user string) { delete(r.ruleCache, user) } +func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + if r.rulesBackupManager != nil { + r.rulesBackupManager.setRuleGroups(ctx, ruleGroups) + } +} + // syncRulesToManager maps the rule files to disk, detects any changes and will create/update the -// the users Prometheus Rules Manager. +// users Prometheus Rules Manager. func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) { // Map the files to disk and return the file names to be passed to the users manager if they // have been updated @@ -333,6 +346,13 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { return groups } +func (r *DefaultMultiTenantManager) GetBackupRules(userID string) rulespb.RuleGroupList { + if r.rulesBackupManager != nil { + return r.rulesBackupManager.getRuleGroups(userID) + } + return nil +} + func (r *DefaultMultiTenantManager) Stop() { r.notifiersMtx.Lock() for _, n := range r.notifiers { diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index ec95205a14..d88d47e661 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -253,6 +253,47 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) { require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"") } +func TestBackupRules(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewPedanticRegistry() + evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg) + waitDurations := []time.Duration{ + 1 * time.Millisecond, + 1 * time.Millisecond, + } + ruleManagerFactory := RuleManagerFactory(nil, waitDurations) + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir, APIEnableRulesBackup: true}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger()) + require.NoError(t, err) + + const user1 = "testUser" + const user2 = "testUser2" + + require.Equal(t, 0, len(m.GetBackupRules(user1))) + require.Equal(t, 0, len(m.GetBackupRules(user2))) + + userRules := map[string]rulespb.RuleGroupList{ + user1: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user1, + }, + }, + user2: { + &rulespb.RuleGroupDesc{ + Name: "group2", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user1, + }, + }, + } + m.BackUpRuleGroups(context.TODO(), userRules) + require.Equal(t, userRules[user1], m.GetBackupRules(user1)) + require.Equal(t, userRules[user2], m.GetBackupRules(user2)) +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.RLock() defer m.userManagerMtx.RUnlock() diff --git a/pkg/ruler/merger.go b/pkg/ruler/merger.go new file mode 100644 index 0000000000..7ae7e69317 --- /dev/null +++ b/pkg/ruler/merger.go @@ -0,0 +1,34 @@ +package ruler + +import ( + "time" + + promRules "github.com/prometheus/prometheus/rules" +) + +// mergeGroupStateDesc removes duplicates from the provided []*GroupStateDesc by keeping the GroupStateDesc with the +// latest information. It uses the EvaluationTimestamp of the GroupStateDesc and the EvaluationTimestamp of the +// ActiveRules in a GroupStateDesc to determine the which GroupStateDesc has the latest information. +func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc { + states := make(map[string]*GroupStateDesc) + rgTime := make(map[string]time.Time) + for _, state := range in { + latestTs := state.EvaluationTimestamp + for _, r := range state.ActiveRules { + if latestTs.Before(r.EvaluationTimestamp) { + latestTs = r.EvaluationTimestamp + } + } + key := promRules.GroupKey(state.Group.Namespace, state.Group.Name) + ts, ok := rgTime[key] + if !ok || ts.Before(latestTs) { + states[key] = state + rgTime[key] = latestTs + } + } + groups := make([]*GroupStateDesc, 0, len(states)) + for _, state := range states { + groups = append(groups, state) + } + return groups +} diff --git a/pkg/ruler/merger_test.go b/pkg/ruler/merger_test.go new file mode 100644 index 0000000000..d4bae6d008 --- /dev/null +++ b/pkg/ruler/merger_test.go @@ -0,0 +1,114 @@ +package ruler + +import ( + "reflect" + "slices" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" +) + +func TestMergeGroupStateDesc(t *testing.T) { + curTime := time.Now() + r := rulespb.RuleDesc{ + Expr: "1 > 1", + } + g1 := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + } + g2 := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + } + rs1 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime, + } + rs1NotRun := RuleStateDesc{ + Rule: &r, + } + rs2 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime, + } + rs2NotRun := RuleStateDesc{ + Rule: &r, + } + rs3 := RuleStateDesc{ + Rule: &r, + EvaluationTimestamp: curTime.Add(10 * time.Second), + } + + gs1 := GroupStateDesc{ + Group: &g1, + ActiveRules: []*RuleStateDesc{&rs1, &rs2}, + EvaluationTimestamp: curTime, + } + gs1NotRun := GroupStateDesc{ + Group: &g1, + ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun}, + } + gs2 := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1, &rs2}, + EvaluationTimestamp: curTime, + } + gs2NotRun := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun}, + } + gs3 := GroupStateDesc{ + Group: &g2, + ActiveRules: []*RuleStateDesc{&rs1, &rs3}, + EvaluationTimestamp: curTime, + } + + type testCase struct { + input []*GroupStateDesc + expectedOutput []*GroupStateDesc + } + + testCases := map[string]testCase{ + "No duplicate": { + input: []*GroupStateDesc{&gs1, &gs2}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2}, + }, + "No duplicate but not evaluated": { + input: []*GroupStateDesc{&gs1NotRun, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1NotRun, &gs2NotRun}, + }, + "With exact duplicate": { + input: []*GroupStateDesc{&gs1, &gs2NotRun, &gs1, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2NotRun}, + }, + "With duplicates that are not evaluated": { + input: []*GroupStateDesc{&gs1, &gs2, &gs1NotRun, &gs2NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs2}, + }, + "With duplicate with a new newer rule evaluation": { + input: []*GroupStateDesc{&gs3, &gs1, &gs2, &gs1NotRun}, + expectedOutput: []*GroupStateDesc{&gs1, &gs3}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + out := mergeGroupStateDesc(tc.input) + slices.SortFunc(out, func(a, b *GroupStateDesc) int { + fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace) + if fileCompare != 0 { + return fileCompare + } + return strings.Compare(a.Group.Name, b.Group.Name) + }) + require.Equal(t, len(tc.expectedOutput), len(out)) + require.True(t, reflect.DeepEqual(tc.expectedOutput, out)) + }) + } + +} diff --git a/pkg/ruler/rule_backup_manager.go b/pkg/ruler/rule_backup_manager.go new file mode 100644 index 0000000000..ce4cef2c9b --- /dev/null +++ b/pkg/ruler/rule_backup_manager.go @@ -0,0 +1,96 @@ +package ruler + +import ( + "context" + "net/url" + "path/filepath" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promRules "github.com/prometheus/prometheus/rules" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" +) + +// rulesBackupManager is an in-memory store that holds rulespb.RuleGroupList of multiple users. It only stores the +// data, it DOESN'T evaluate. +type rulesBackupManager struct { + inMemoryRuleGroupsBackup map[string]rulespb.RuleGroupList + cfg Config + + logger log.Logger + + backupRuleGroup *prometheus.GaugeVec +} + +func newRulesBackupManager(cfg Config, logger log.Logger, reg prometheus.Registerer) *rulesBackupManager { + return &rulesBackupManager{ + inMemoryRuleGroupsBackup: make(map[string]rulespb.RuleGroupList), + cfg: cfg, + logger: logger, + + backupRuleGroup: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "ruler_backup_rule_group", + Help: "Boolean set to 1 indicating the ruler stores the rule group as backup.", + }, []string{"user", "rule_group"}), + } +} + +// setRuleGroups updates the map[string]rulespb.RuleGroupList that the rulesBackupManager stores in memory. +func (r *rulesBackupManager) setRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) { + r.updateMetrics(ruleGroups) + r.inMemoryRuleGroupsBackup = ruleGroups +} + +// getRuleGroups returns the rulespb.RuleGroupList that rulesBackupManager stores for a given user +func (r *rulesBackupManager) getRuleGroups(userID string) rulespb.RuleGroupList { + var result rulespb.RuleGroupList + if groups, exists := r.inMemoryRuleGroupsBackup[userID]; exists { + result = groups + } + return result +} + +// updateMetrics updates the ruler_backup_rule_group metric by adding new groups that were backed up and removing +// those that are removed from the backup. +func (r *rulesBackupManager) updateMetrics(newBackupGroups map[string]rulespb.RuleGroupList) { + for user, groups := range newBackupGroups { + keptGroups := make(map[string]struct{}) + for _, g := range groups { + fullFileName := r.getFilePathForGroup(g, user) + key := promRules.GroupKey(fullFileName, g.GetName()) + r.backupRuleGroup.WithLabelValues(user, key).Set(1) + keptGroups[key] = struct{}{} + } + oldGroups := r.inMemoryRuleGroupsBackup[user] + for _, g := range oldGroups { + fullFileName := r.getFilePathForGroup(g, user) + key := promRules.GroupKey(fullFileName, g.GetName()) + if _, exists := keptGroups[key]; !exists { + r.backupRuleGroup.DeleteLabelValues(user, key) + } + } + } + + for user, groups := range r.inMemoryRuleGroupsBackup { + if _, exists := newBackupGroups[user]; exists { + continue + } + for _, g := range groups { + fullFileName := r.getFilePathForGroup(g, user) + key := promRules.GroupKey(fullFileName, g.GetName()) + r.backupRuleGroup.DeleteLabelValues(user, key) + } + } +} + +// getFilePathForGroup returns the supposed file path of the group if it was being evaluated. +// This is based on how mapper.go generates file paths. This can be used to generate value similar to the one returned +// by prometheus Group.File() method. +func (r *rulesBackupManager) getFilePathForGroup(g *rulespb.RuleGroupDesc, user string) string { + dirPath := filepath.Join(r.cfg.RulePath, user) + encodedFileName := url.PathEscape(g.GetNamespace()) + return filepath.Join(dirPath, encodedFileName) +} diff --git a/pkg/ruler/rule_backup_manager_test.go b/pkg/ruler/rule_backup_manager_test.go new file mode 100644 index 0000000000..224b164be3 --- /dev/null +++ b/pkg/ruler/rule_backup_manager_test.go @@ -0,0 +1,150 @@ +package ruler + +import ( + "context" + "net/url" + "path/filepath" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + promRules "github.com/prometheus/prometheus/rules" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ruler/rulespb" + "github.com/cortexproject/cortex/pkg/util" +) + +func TestBackUpRuleGroups(t *testing.T) { + r := rulespb.RuleDesc{ + Expr: "1 > bool 1", + Record: "test", + } + g1 := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g2 := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g3 := rulespb.RuleGroupDesc{ + Name: "g3", + Namespace: "ns2", + Rules: []*rulespb.RuleDesc{&r}, + } + + cfg := defaultRulerConfig(t) + manager := newRulesBackupManager(cfg, log.NewNopLogger(), nil) + + type testCase struct { + input map[string]rulespb.RuleGroupList + } + + testCases := map[string]testCase{ + "Empty input": { + input: make(map[string]rulespb.RuleGroupList), + }, + "With groups from single users": { + input: map[string]rulespb.RuleGroupList{ + "user2": {&g1, &g2}, + }, + }, + "With groups from multiple users": { + input: map[string]rulespb.RuleGroupList{ + "user1": {&g1, &g3}, + "user2": {&g1, &g2}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + manager.setRuleGroups(context.TODO(), tc.input) + require.Equal(t, len(tc.input), len(manager.inMemoryRuleGroupsBackup)) + for user, groups := range tc.input { + loadedGroups := manager.getRuleGroups(user) + require.Equal(t, groups, loadedGroups) + } + }) + } +} + +func TestBackUpRuleGroupsMetrics(t *testing.T) { + r := rulespb.RuleDesc{ + Expr: "1 > bool 1", + Record: "test", + } + g1 := rulespb.RuleGroupDesc{ + Name: "g1", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g2 := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r}, + } + g2Updated := rulespb.RuleGroupDesc{ + Name: "g2", + Namespace: "ns1", + Rules: []*rulespb.RuleDesc{&r, &r}, + } + + cfg := defaultRulerConfig(t) + reg := prometheus.NewPedanticRegistry() + manager := newRulesBackupManager(cfg, log.NewNopLogger(), reg) + + getGroupName := func(g *rulespb.RuleGroupDesc, user string) string { + dirPath := filepath.Join(cfg.RulePath, user) + encodedFileName := url.PathEscape(g.Namespace) + path := filepath.Join(dirPath, encodedFileName) + return promRules.GroupKey(path, g.Name) + } + + manager.setRuleGroups(context.TODO(), map[string]rulespb.RuleGroupList{ + "user1": {&g1, &g2}, + "user2": {&g1}, + }) + gm, err := reg.Gather() + require.NoError(t, err) + mfm, err := util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Equal(t, 3, len(mfm["cortex_ruler_backup_rule_group"].Metric)) + requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[0], map[string]string{ + "user": "user1", + "rule_group": getGroupName(&g1, "user1"), + }, float64(1)) + requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[1], map[string]string{ + "user": "user1", + "rule_group": getGroupName(&g2, "user1"), + }, float64(1)) + requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[2], map[string]string{ + "user": "user2", + "rule_group": getGroupName(&g1, "user2"), + }, float64(1)) + + manager.setRuleGroups(context.TODO(), map[string]rulespb.RuleGroupList{ + "user1": {&g2Updated}, + }) + gm, err = reg.Gather() + require.NoError(t, err) + mfm, err = util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Equal(t, 1, len(mfm["cortex_ruler_backup_rule_group"].Metric)) + requireMetricEqual(t, mfm["cortex_ruler_backup_rule_group"].Metric[0], map[string]string{ + "user": "user1", + "rule_group": getGroupName(&g2, "user1"), + }, float64(1)) +} + +func requireMetricEqual(t *testing.T, m *io_prometheus_client.Metric, labels map[string]string, value float64) { + l := m.GetLabel() + require.Equal(t, len(labels), len(l)) + for _, pair := range l { + require.Equal(t, labels[*pair.Name], *pair.Value) + } + require.Equal(t, value, *m.Gauge.Value) +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ba3c698e36..23de3ab908 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/promql/parser" promRules "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/util/strutil" "github.com/weaveworks/common/user" @@ -127,7 +128,9 @@ type Config struct { Ring RingConfig `yaml:"ring"` FlushCheckPeriod time.Duration `yaml:"flush_period"` - EnableAPI bool `yaml:"enable_api"` + EnableAPI bool `yaml:"enable_api"` + APIEnableRulesBackup bool `yaml:"api_enable_rules_backup"` + APIDeduplicateRules bool `yaml:"api_deduplicate_rules"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` @@ -195,6 +198,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushCheckPeriod, "ruler.flush-period", 1*time.Minute, "Period with which to attempt to flush rule groups.") f.StringVar(&cfg.RulePath, "ruler.rule-path", "/rules", "file path to store temporary rule files for the prometheus rule managers") f.BoolVar(&cfg.EnableAPI, "experimental.ruler.enable-api", false, "Enable the ruler api") + f.BoolVar(&cfg.APIEnableRulesBackup, "experimental.ruler.api-enable-rules-backup", false, "EXPERIMENTAL: Enable rulers to store a copy of rules owned by other rulers with default state (state before any evaluation) and send this copy in list API requests as backup in case the ruler who owns the rule fails to send its rules. This allows the rules API to handle ruler outage by returning rules with default state. Ring replication-factor needs to be set to 2 or more for this to be useful.") + f.BoolVar(&cfg.APIDeduplicateRules, "experimental.ruler.api-deduplicate-rules", false, "EXPERIMENTAL: Remove duplicate rules in the prometheus rules and alerts API response. If there are duplicate rules the rule with the latest evaluation timestamp will be kept.") f.DurationVar(&cfg.OutageTolerance, "ruler.for-outage-tolerance", time.Hour, `Max time to tolerate outage for restoring "for" state of alert.`) f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`) f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`) @@ -215,8 +220,12 @@ type MultiTenantManager interface { // SyncRuleGroups is used to sync the Manager with rules from the RuleStore. // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) + // BackUpRuleGroups is used to store backups of rule groups owned by a different ruler instance. + BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). GetRules(userID string) []*promRules.Group + // GetBackupRules fetches rules for a particular tenant (userID) that the ruler stores for backup purposes + GetBackupRules(userID string) rulespb.RuleGroupList // Stop stops all Manager components. Stop() // ValidateRuleGroup validates a rulegroup @@ -270,6 +279,7 @@ type Ruler struct { rulerSync *prometheus.CounterVec ruleGroupStoreLoadDuration prometheus.Gauge ruleGroupSyncDuration prometheus.Gauge + rulerGetRulesFailures *prometheus.CounterVec allowedTenants *util.AllowedTenants @@ -312,6 +322,11 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, Name: "cortex_ruler_rule_group_sync_duration_seconds", Help: "The duration in seconds required to sync and load rule groups from storage.", }), + + rulerGetRulesFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_get_rules_failure_total", + Help: "The total number of failed rules request sent to rulers in getShardedRules.", + }, []string{"ruler"}), } if len(cfg.EnabledTenants) > 0 { @@ -458,7 +473,7 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { return ringHasher.Sum32() } -func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string) (bool, error) { +func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string, forBackup bool) (bool, error) { hash := tokenForGroup(g) @@ -467,7 +482,20 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRu return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } - ownsRuleGroup := rlrs.Instances[0].Addr == instanceAddr + var ownsRuleGroup bool + if forBackup { + // Only the second up to the last replica are used as backup + for i := 1; i < len(rlrs.Instances); i++ { + if rlrs.Instances[i].Addr == instanceAddr { + ownsRuleGroup = true + break + } + } + } else { + // Even if the replication factor is set to a number bigger than 1, only the first ruler evaluates the rule group + ownsRuleGroup = rlrs.Instances[0].Addr == instanceAddr + } + if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} } @@ -543,16 +571,20 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { r.ruleGroupSyncDuration.Set(ruleGroupSyncDuration) }() - loadedConfigs, err := r.loadRuleGroups(ctx) + loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx) if err != nil { return } // This will also delete local group files for users that are no longer in 'configs' map. r.manager.SyncRuleGroups(ctx, loadedConfigs) + + if r.cfg.APIEnableRulesBackup { + r.manager.BackUpRuleGroups(ctx, backupConfigs) + } } -func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { timer := prometheus.NewTimer(nil) defer func() { @@ -560,51 +592,65 @@ func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGrou r.ruleGroupStoreLoadDuration.Set(storeLoadSeconds) }() - configs, err := r.listRules(ctx) + ownedConfigs, backupConfigs, err := r.listRules(ctx) if err != nil { level.Error(r.logger).Log("msg", "unable to list rules", "err", err) - return nil, err + return nil, nil, err } - loadedConfigs, err := r.store.LoadRuleGroups(ctx, configs) + loadedOwnedConfigs, err := r.store.LoadRuleGroups(ctx, ownedConfigs) if err != nil { - level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(configs)-len(loadedConfigs), "err", err) + level.Warn(r.logger).Log("msg", "failed to load some rules owned by this ruler", "count", len(ownedConfigs)-len(loadedOwnedConfigs), "err", err) } - return loadedConfigs, nil + if r.cfg.APIEnableRulesBackup { + loadedBackupConfigs, err := r.store.LoadRuleGroups(ctx, backupConfigs) + if err != nil { + level.Warn(r.logger).Log("msg", "failed to load some rules backed up by this ruler", "count", len(backupConfigs)-len(loadedBackupConfigs), "err", err) + } + return loadedOwnedConfigs, loadedBackupConfigs, nil + } + return loadedOwnedConfigs, nil, nil } -func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) { +func (r *Ruler) listRules(ctx context.Context) (owned map[string]rulespb.RuleGroupList, backedUp map[string]rulespb.RuleGroupList, err error) { switch { case !r.cfg.EnableSharding: - result, err = r.listRulesNoSharding(ctx) + owned, backedUp, err = r.listRulesNoSharding(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyDefault: - result, err = r.listRulesShardingDefault(ctx) + owned, backedUp, err = r.listRulesShardingDefault(ctx) case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle: - result, err = r.listRulesShuffleSharding(ctx) + owned, backedUp, err = r.listRulesShuffleSharding(ctx) default: - return nil, errors.New("invalid sharding configuration") + return nil, nil, errors.New("invalid sharding configuration") } if err != nil { return } - for userID := range result { + for userID := range owned { + if !r.allowedTenants.IsAllowed(userID) { + level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) + delete(owned, userID) + } + } + + for userID := range backedUp { if !r.allowedTenants.IsAllowed(userID) { level.Debug(r.logger).Log("msg", "ignoring rule groups for user, not allowed", "user", userID) - delete(result, userID) + delete(backedUp, userID) } } return } -func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { allRuleGroups, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } for userID, groups := range allRuleGroups { disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID) @@ -621,29 +667,36 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul } allRuleGroups[userID] = filteredGroupsForUser } - return allRuleGroups, nil + return allRuleGroups, nil, nil } -func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { configs, err := r.store.ListAllRuleGroups(ctx) if err != nil { - return nil, err + return nil, nil, err } - filteredConfigs := make(map[string]rulespb.RuleGroupList) + ownedConfigs := make(map[string]rulespb.RuleGroupList) + backedUpConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) > 0 { - filteredConfigs[userID] = filtered + owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(owned) > 0 { + ownedConfigs[userID] = owned + } + if r.cfg.APIEnableRulesBackup { + backup := filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + if len(backup) > 0 { + backedUpConfigs[userID] = backup + } } } - return filteredConfigs, nil + return ownedConfigs, backedUpConfigs, nil } -func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) { +func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) { users, err := r.store.ListAllUsers(ctx) if err != nil { - return nil, errors.Wrap(err, "unable to list users of ruler") + return nil, nil, errors.Wrap(err, "unable to list users of ruler") } // Only users in userRings will be used in the to load the rules. @@ -664,7 +717,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } if len(userRings) == 0 { - return nil, nil + return nil, nil, nil } userCh := make(chan string, len(userRings)) @@ -674,7 +727,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp close(userCh) mu := sync.Mutex{} - result := map[string]rulespb.RuleGroupList{} + owned := map[string]rulespb.RuleGroupList{} + backedUp := map[string]rulespb.RuleGroupList{} concurrency := loadRulesConcurrency if len(userRings) < concurrency { @@ -690,13 +744,21 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } - filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) - if len(filtered) == 0 { + filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + var filterBackup []*rulespb.RuleGroupDesc + if r.cfg.APIEnableRulesBackup { + filterBackup = filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + } + if len(filterOwned) == 0 && len(filterBackup) == 0 { continue } - mu.Lock() - result[userID] = filtered + if len(filterOwned) > 0 { + owned[userID] = filterOwned + } + if len(filterBackup) > 0 { + backedUp[userID] = filterBackup + } mu.Unlock() } return nil @@ -704,7 +766,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp } err = g.Wait() - return result, err + return owned, backedUp, err } // filterRuleGroups returns map of rule groups that given instance "owns" based on supplied ring. @@ -716,7 +778,7 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabl // Prune the rule group to only contain rules that this ruler is responsible for, based on ring. var result []*rulespb.RuleGroupDesc for _, g := range ruleGroups { - owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr) + owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, false) if err != nil { switch e := err.(type) { case *DisabledRuleGroupErr: @@ -740,6 +802,38 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabl return result } +// filterBackupRuleGroups returns map of rule groups that given instance backs up based on supplied ring. +// This function only uses User, Namespace, and Name fields of individual RuleGroups. +// +// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, +// but only ring passed as parameter. +func filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { + var result []*rulespb.RuleGroupDesc + for _, g := range ruleGroups { + backup, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, true) + if err != nil { + switch e := err.(type) { + case *DisabledRuleGroupErr: + level.Info(log).Log("msg", e.Message) + continue + default: + ringCheckErrors.Inc() + level.Error(log).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + continue + } + } + + if backup { + level.Debug(log).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name) + result = append(result, g) + } else { + level.Debug(log).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) + } + } + + return result +} + // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { @@ -752,10 +846,10 @@ func (r *Ruler) GetRules(ctx context.Context, rulesRequest RulesRequest) ([]*Gro return r.getShardedRules(ctx, userID, rulesRequest) } - return r.getLocalRules(userID, rulesRequest) + return r.getLocalRules(userID, rulesRequest, false) } -func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*GroupStateDesc, error) { +func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest, includeBackups bool) ([]*GroupStateDesc, error) { groups := r.manager.GetRules(userID) groupDescs := make([]*GroupStateDesc, 0, len(groups)) @@ -881,6 +975,123 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou } } + if !includeBackups { + return groupDescs, nil + } + + backupGroups := r.manager.GetBackupRules(userID) + backupGroupDescs, err := r.ruleGroupListToGroupStateDesc(userID, backupGroups, groupListFilter{ + ruleNameSet, + ruleGroupNameSet, + fileSet, + returnAlerts, + returnRecording, + }) + if err != nil { + return nil, err + } + + return append(groupDescs, backupGroupDescs...), nil +} + +type groupListFilter struct { + ruleNameSet map[string]struct{} + ruleGroupNameSet map[string]struct{} + fileSet map[string]struct{} + returnAlerts bool + returnRecording bool +} + +// ruleGroupListToGroupStateDesc converts rulespb.RuleGroupList to []*GroupStateDesc while accepting filters to control what goes to the +// resulting []*GroupStateDesc +func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulespb.RuleGroupList, filters groupListFilter) ([]*GroupStateDesc, error) { + groupDescs := make([]*GroupStateDesc, 0, len(backupGroups)) + for _, group := range backupGroups { + if len(filters.fileSet) > 0 { + if _, OK := filters.fileSet[group.GetNamespace()]; !OK { + continue + } + } + + if len(filters.ruleGroupNameSet) > 0 { + if _, OK := filters.ruleGroupNameSet[group.GetName()]; !OK { + continue + } + } + interval := r.cfg.EvaluationInterval + if group.Interval != 0 { + interval = group.Interval + } + + groupDesc := &GroupStateDesc{ + Group: &rulespb.RuleGroupDesc{ + Name: group.GetName(), + Namespace: group.GetNamespace(), + Interval: interval, + User: userID, + Limit: group.Limit, + }, + // We are keeping default value for EvaluationTimestamp and EvaluationDuration since the backup is not evaluating + } + for _, r := range group.GetRules() { + name := r.GetRecord() + isAlertingRule := false + if name == "" { + name = r.GetAlert() + isAlertingRule = true + } + if len(filters.ruleNameSet) > 0 { + if _, OK := filters.ruleNameSet[name]; !OK { + continue + } + } + + var ruleDesc *RuleStateDesc + query, err := parser.ParseExpr(r.GetExpr()) + if err != nil { + return nil, errors.Errorf("failed to parse rule query '%v'", r.GetExpr()) + } + if isAlertingRule { + if !filters.returnAlerts { + continue + } + alerts := []*AlertStateDesc{} // backup rules are not evaluated so there will be no active alerts + ruleDesc = &RuleStateDesc{ + Rule: &rulespb.RuleDesc{ + Expr: query.String(), + Alert: name, + For: r.GetFor(), + KeepFiringFor: r.GetKeepFiringFor(), + Labels: r.Labels, + Annotations: r.Annotations, + }, + State: promRules.StateInactive.String(), // backup rules are not evaluated so they are inactive + Health: string(promRules.HealthUnknown), + Alerts: alerts, + EvaluationTimestamp: time.Time{}, + EvaluationDuration: time.Duration(0), + } + } else { + if !filters.returnRecording { + continue + } + ruleDesc = &RuleStateDesc{ + Rule: &rulespb.RuleDesc{ + Record: name, + Expr: query.String(), + Labels: r.Labels, + }, + Health: string(promRules.HealthUnknown), + EvaluationTimestamp: time.Time{}, + EvaluationDuration: time.Duration(0), + } + } + groupDesc.ActiveRules = append(groupDesc.ActiveRules, ruleDesc) + } + if len(groupDesc.ActiveRules) > 0 { + groupDescs = append(groupDescs, groupDesc) + } + } return groupDescs, nil } @@ -891,7 +1102,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest ring = r.ring.ShuffleShard(userID, shardSize) } - rulers, err := ring.GetReplicationSetForOperation(ListRuleRingOp) + rulers, failedZones, err := GetReplicationSetForListRule(ring, &r.cfg.Ring) if err != nil { return nil, err } @@ -902,12 +1113,18 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest } var ( - mergedMx sync.Mutex + mtx sync.Mutex merged []*GroupStateDesc + errCount int ) - // Concurrently fetch rules from all rulers. Since rules are not replicated, - // we need all requests to succeed. + zoneByAddress := make(map[string]string) + if r.cfg.APIEnableRulesBackup { + for _, ruler := range rulers.Instances { + zoneByAddress[ruler.Addr] = ruler.Zone + } + } + // Concurrently fetch rules from all rulers. jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { addr := job.(string) @@ -923,28 +1140,48 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest Files: rulesRequest.GetFiles(), Type: rulesRequest.GetType(), }) + if err != nil { + level.Error(r.logger).Log("msg", "unable to retrieve rules from ruler", "addr", addr, "err", err) + r.rulerGetRulesFailures.WithLabelValues(addr).Inc() + // If APIEnableRulesBackup is enabled and there are enough rulers replicating the rules, we should + // be able to handle failures. + if r.cfg.APIEnableRulesBackup && len(jobs) >= r.cfg.Ring.ReplicationFactor { + mtx.Lock() + failedZones[zoneByAddress[addr]] = struct{}{} + errCount += 1 + failed := (rulers.MaxUnavailableZones > 0 && len(failedZones) > rulers.MaxUnavailableZones) || (rulers.MaxUnavailableZones <= 0 && errCount > rulers.MaxErrors) + mtx.Unlock() + if !failed { + return nil + } + } return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr) } - mergedMx.Lock() + mtx.Lock() merged = append(merged, newGrps.Groups...) - mergedMx.Unlock() + mtx.Unlock() return nil }) + if err == nil && (r.cfg.APIEnableRulesBackup || r.cfg.APIDeduplicateRules) { + merged = mergeGroupStateDesc(merged) + } + return merged, err } // Rules implements the rules service func (r *Ruler) Rules(ctx context.Context, in *RulesRequest) (*RulesResponse, error) { userID, err := tenant.TenantID(ctx) + if err != nil { return nil, fmt.Errorf("no user id found in context") } - groupDescs, err := r.getLocalRules(userID, *in) + groupDescs, err := r.getLocalRules(userID, *in, r.cfg.APIEnableRulesBackup) if err != nil { return nil, err } diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index a2078d56db..7ab8f4c030 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util/flagext" + utilmath "github.com/cortexproject/cortex/pkg/util/math" ) const ( @@ -38,15 +39,18 @@ var ListRuleRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.LEAVING}, // is used to strip down the config to the minimum, and avoid confusion // to the user. type RingConfig struct { - KVStore kv.Config `yaml:"kvstore"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + KVStore kv.Config `yaml:"kvstore"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` + InstanceZone string `yaml:"instance_availability_zone" doc:"hidden"` NumTokens int `yaml:"num_tokens"` FinalSleep time.Duration `yaml:"final_sleep"` @@ -70,6 +74,8 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).") f.DurationVar(&cfg.FinalSleep, "ruler.ring.final-sleep", 0*time.Second, "The sleep seconds when ruler is shutting down. Need to be close to or larger than KV Store information propagation delay") + f.IntVar(&cfg.ReplicationFactor, "ruler.ring.replication-factor", 1, "EXPERIMENTAL: The replication factor to use when loading rule groups for API HA.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, "ruler.ring.zone-awareness-enabled", false, "EXPERIMENTAL: True to enable zone-awareness and load rule groups across different availability zones for API HA.") // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} @@ -77,6 +83,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceAddr, "ruler.ring.instance-addr", "", "IP address to advertise in the ring.") f.IntVar(&cfg.InstancePort, "ruler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") f.StringVar(&cfg.InstanceID, "ruler.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, "ruler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") f.IntVar(&cfg.NumTokens, "ruler.ring.num-tokens", 128, "Number of tokens for each ruler.") } @@ -93,6 +100,7 @@ func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecycl return ring.BasicLifecyclerConfig{ ID: cfg.InstanceID, Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, HeartbeatPeriod: cfg.HeartbeatPeriod, TokensObservePeriod: 0, NumTokens: cfg.NumTokens, @@ -107,9 +115,60 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.KVStore = cfg.KVStore rc.HeartbeatTimeout = cfg.HeartbeatTimeout rc.SubringCacheDisabled = true + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled - // Each rule group is loaded to *exactly* one ruler. - rc.ReplicationFactor = 1 + // Each rule group is evaluated by *exactly* one ruler, but it can be loaded by multiple rulers for API HA + rc.ReplicationFactor = cfg.ReplicationFactor return rc } + +// GetReplicationSetForListRule is similar to ring.GetReplicationSetForOperation but does NOT require quorum. Because +// it does not require quorum it returns healthy instance in the AZ with failed instances unlike +// GetReplicationSetForOperation. This is important for ruler because healthy instances in the AZ with failed +// instance could be evaluating some rule groups. +func GetReplicationSetForListRule(r ring.ReadRing, cfg *RingConfig) (ring.ReplicationSet, map[string]struct{}, error) { + healthy, unhealthy, err := r.GetAllInstanceDescs(ListRuleRingOp) + if err != nil { + return ring.ReplicationSet{}, make(map[string]struct{}), err + } + ringZones := make(map[string]struct{}) + zoneFailures := make(map[string]struct{}) + for _, instance := range healthy { + ringZones[instance.Zone] = struct{}{} + } + for _, instance := range unhealthy { + ringZones[instance.Zone] = struct{}{} + zoneFailures[instance.Zone] = struct{}{} + } + // Max errors and max unavailable zones are mutually exclusive. We initialise both + // to 0, and then we update them whether zone-awareness is enabled or not. + maxErrors := 0 + maxUnavailableZones := 0 + if cfg.ZoneAwarenessEnabled { + numReplicatedZones := utilmath.Min(len(ringZones), r.ReplicationFactor()) + // Given that quorum is not required, we only need at least one of the zone to be healthy to succeed. But we + // also need to handle case when RF < number of zones. + maxUnavailableZones = numReplicatedZones - 1 + if len(zoneFailures) > maxUnavailableZones { + return ring.ReplicationSet{}, zoneFailures, ring.ErrTooManyUnhealthyInstances + } + } else { + numRequired := len(healthy) + len(unhealthy) + if numRequired < r.ReplicationFactor() { + numRequired = r.ReplicationFactor() + } + // quorum is not required so 1 replica is enough to handle the request + numRequired -= r.ReplicationFactor() - 1 + if len(healthy) < numRequired { + return ring.ReplicationSet{}, zoneFailures, ring.ErrTooManyUnhealthyInstances + } + + maxErrors = len(healthy) - numRequired + } + return ring.ReplicationSet{ + Instances: healthy, + MaxErrors: maxErrors, + MaxUnavailableZones: maxUnavailableZones, + }, zoneFailures, nil +} diff --git a/pkg/ruler/ruler_ring_test.go b/pkg/ruler/ruler_ring_test.go new file mode 100644 index 0000000000..f9bfaeb03e --- /dev/null +++ b/pkg/ruler/ruler_ring_test.go @@ -0,0 +1,239 @@ +package ruler + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" +) + +func TestGetReplicationSetForListRule(t *testing.T) { + now := time.Now() + + g := ring.NewRandomTokenGenerator() + + tests := map[string]struct { + ringInstances map[string]ring.InstanceDesc + ringHeartbeatTimeout time.Duration + ringReplicationFactor int + enableAZReplication bool + expectedErr error + expectedSet []string + expectedMaxError int + expectedMaxUnavailableZones int + expectedFailedZones map[string]struct{} + }{ + "should return error on empty ring": { + ringInstances: nil, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 1, + expectedErr: ring.ErrEmptyRing, + }, + "should succeed on all healthy instances and RF=1": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 1, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + }, + "should fail on 1 unhealthy instance and RF=1": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 1, + expectedErr: ring.ErrTooManyUnhealthyInstances, + }, + "should succeed on 1 unhealthy instances and RF=3": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + expectedMaxError: 1, + }, + "should succeed on 2 unhealthy instances and RF=3": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "", 128, true)}, + "instance-5": {Addr: "127.0.0.5", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + expectedMaxError: 0, + }, + "should fail on 3 unhealthy instances and RF=3": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "", 128, true)}, + "instance-5": {Addr: "127.0.0.5", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedErr: ring.ErrTooManyUnhealthyInstances, + }, + "should succeed on 0 unhealthy instances and RF=3 zone replication enabled": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "z3", 128, true), Zone: "z3"}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "z1", 128, true), Zone: "z1"}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + enableAZReplication: true, + expectedFailedZones: make(map[string]struct{}), + expectedMaxUnavailableZones: 2, + }, + "should succeed on 3 unhealthy instances in 2 zones and RF=3 zone replication enabled": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.PENDING, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "z3", 128, true), Zone: "z3"}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "z1", 128, true), Zone: "z1"}, + "instance-5": {Addr: "127.0.0.5", State: ring.PENDING, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "z2", 128, true), Zone: "z2"}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.2", "127.0.0.3"}, + enableAZReplication: true, + expectedFailedZones: map[string]struct{}{ + "z1": {}, + "z2": {}, + }, + expectedMaxUnavailableZones: 2, + }, + "should succeed on 1 unhealthy instances in 1 zone and RF=3 zone replication enabled": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "z3", 128, true), Zone: "z3"}, + "instance-4": {Addr: "127.0.0.4", State: ring.PENDING, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "z1", 128, true), Zone: "z1"}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + enableAZReplication: true, + expectedFailedZones: map[string]struct{}{ + "z1": {}, + }, + expectedMaxUnavailableZones: 2, + }, + "should fail on 3 unhealthy instances in 3 zonez and RF=3 zone replication enabled": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "z3", 128, true), Zone: "z3"}, + "instance-4": {Addr: "127.0.0.4", State: ring.PENDING, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "z1", 128, true), Zone: "z1"}, + "instance-5": {Addr: "127.0.0.5", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "z2", 128, true), Zone: "z2"}, + "instance-6": {Addr: "127.0.0.6", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-6", "z3", 128, true), Zone: "z3"}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + enableAZReplication: true, + expectedErr: ring.ErrTooManyUnhealthyInstances, + expectedFailedZones: map[string]struct{}{ + "z1": {}, + "z2": {}, + "z3": {}, + }, + }, + "should fail on 2 unhealthy instances in 2 zones and RF=2 zone replication enabled": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-20 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "z3", 128, true), Zone: "z3"}, + "instance-4": {Addr: "127.0.0.4", State: ring.PENDING, Timestamp: now.Add(-30 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "z1", 128, true), Zone: "z1"}, + "instance-5": {Addr: "127.0.0.5", State: ring.ACTIVE, Timestamp: now.Add(-2 * time.Minute).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "z2", 128, true), Zone: "z2"}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 2, + enableAZReplication: true, + expectedErr: ring.ErrTooManyUnhealthyInstances, + expectedFailedZones: map[string]struct{}{ + "z1": {}, + "z2": {}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + store := newMockRuleStore(nil, nil) + cfg := Config{ + EnableSharding: true, + ShardingStrategy: util.ShardingStrategyDefault, + Ring: RingConfig{ + InstanceID: "instance-1", + InstanceAddr: "127.0.0.1", + InstancePort: 9999, + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 1 * time.Minute, + ReplicationFactor: testData.ringReplicationFactor, + ZoneAwarenessEnabled: testData.enableAZReplication, + }, + FlushCheckPeriod: 0, + } + + r, _ := buildRuler(t, cfg, nil, store, nil) + r.limits = ruleLimits{evalDelay: 0} + + rulerRing := r.ring + // We start ruler's ring, but nothing else (not even lifecycler). + require.NoError(t, services.StartAndAwaitRunning(context.Background(), rulerRing)) + t.Cleanup(rulerRing.StopAsync) + + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + d.Ingesters = testData.ringInstances + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + set, failedZones, err := GetReplicationSetForListRule(rulerRing, &cfg.Ring) + require.Equal(t, testData.expectedErr, err) + assert.ElementsMatch(t, testData.expectedSet, set.GetAddresses()) + require.Equal(t, testData.expectedMaxError, set.MaxErrors) + require.Equal(t, testData.expectedMaxUnavailableZones, set.MaxUnavailableZones) + if testData.enableAZReplication { + require.Equal(t, len(testData.expectedFailedZones), len(failedZones)) + require.EqualValues(t, testData.expectedFailedZones, failedZones) + } + }) + } +} diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index f7187df1ec..22d52aa649 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -75,6 +75,7 @@ func defaultRulerConfig(t testing.TB) Config { cfg.Ring.InstanceAddr = "localhost" cfg.Ring.InstanceID = "localhost" cfg.Ring.FinalSleep = 0 + cfg.Ring.ReplicationFactor = 1 cfg.EnableQueryStats = false return cfg @@ -339,13 +340,17 @@ func TestGetRules(t *testing.T) { type rulesMap map[string][]*rulespb.RuleDesc type testCase struct { - sharding bool - shardingStrategy string - shuffleShardSize int - rulesRequest RulesRequest - expectedCount map[string]int - rulerStateMap map[string]ring.InstanceState - expectedError error + sharding bool + shardingStrategy string + shuffleShardSize int + rulesRequest RulesRequest + expectedCount map[string]int + expectedClientCallCount int + rulerStateMap map[string]ring.InstanceState + rulerAZMap map[string]string + expectedError error + enableAPIRulesBackup bool + enableZoneAwareReplication bool } ruleMap := rulesMap{ @@ -457,6 +462,18 @@ func TestGetRules(t *testing.T) { "ruler3": ring.ACTIVE, } + rulerStateMapTwoPending := map[string]ring.InstanceState{ + "ruler1": ring.PENDING, + "ruler2": ring.PENDING, + "ruler3": ring.ACTIVE, + } + + rulerAZEvenSpread := map[string]string{ + "ruler1": "a", + "ruler2": "b", + "ruler3": "c", + } + expectedRules := expectedRulesMap{ "ruler1": map[string]rulespb.RuleGroupList{ "user1": { @@ -502,6 +519,7 @@ func TestGetRules(t *testing.T) { "user2": 4, "user3": 2, }, + expectedClientCallCount: len(expectedRules), }, "Default Sharding with No Filter": { sharding: true, @@ -512,6 +530,19 @@ func TestGetRules(t *testing.T) { "user2": 9, "user3": 3, }, + expectedClientCallCount: len(expectedRules), + }, + "Default Sharding with No Filter but with API Rules backup enabled": { + sharding: true, + shardingStrategy: util.ShardingStrategyDefault, + rulerStateMap: rulerStateMapAllActive, + expectedCount: map[string]int{ + "user1": 5, + "user2": 9, + "user3": 3, + }, + enableAPIRulesBackup: true, + expectedClientCallCount: len(expectedRules), }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter": { sharding: true, @@ -526,6 +557,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name Filter": { sharding: true, @@ -540,6 +572,7 @@ func TestGetRules(t *testing.T) { "user2": 1, "user3": 2, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 and Rule Group Name and Rule Type Filter": { sharding: true, @@ -555,6 +588,7 @@ func TestGetRules(t *testing.T) { "user2": 2, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type and Namespace Filters": { sharding: true, @@ -570,6 +604,7 @@ func TestGetRules(t *testing.T) { "user2": 0, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in LEAVING state": { sharding: true, @@ -584,6 +619,7 @@ func TestGetRules(t *testing.T) { "user2": 5, "user3": 1, }, + expectedClientCallCount: 2, }, "Shuffle Sharding and ShardSize = 2 with Rule Type Filter and one ruler is in Pending state": { sharding: true, @@ -593,6 +629,99 @@ func TestGetRules(t *testing.T) { rulesRequest: RulesRequest{ Type: recordingRuleFilter, }, + expectedError: ring.ErrTooManyUnhealthyInstances, + expectedClientCallCount: 0, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapAllActive, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 3, + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and one ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapOnePending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 2, // one of the ruler is pending, so we don't expect that ruler to be called + }, + "Shuffle Sharding and ShardSize = 3 with API Rules backup enabled and two ruler is in Pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + rulerStateMap: rulerStateMapTwoPending, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedError: ring.ErrTooManyUnhealthyInstances, + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapAllActive, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 3, + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled and one ruler in pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapOnePending, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, + expectedCount: map[string]int{ + "user1": 3, + "user2": 5, + "user3": 1, + }, + expectedClientCallCount: 2, // one of the ruler is pending, so we don't expect that ruler to be called + }, + "Shuffle Sharding and ShardSize = 3 and AZ replication with API Rules backup enabled and two ruler in pending state": { + sharding: true, + shuffleShardSize: 3, + shardingStrategy: util.ShardingStrategyShuffle, + enableZoneAwareReplication: true, + rulerStateMap: rulerStateMapTwoPending, + rulerAZMap: rulerAZEvenSpread, + enableAPIRulesBackup: true, + rulesRequest: RulesRequest{ + Type: recordingRuleFilter, + }, expectedError: ring.ErrTooManyUnhealthyInstances, }, } @@ -612,6 +741,7 @@ func TestGetRules(t *testing.T) { cfg.ShardingStrategy = tc.shardingStrategy cfg.EnableSharding = tc.sharding + cfg.APIEnableRulesBackup = tc.enableAPIRulesBackup cfg.Ring = RingConfig{ InstanceID: id, @@ -619,6 +749,14 @@ func TestGetRules(t *testing.T) { KVStore: kv.Config{ Mock: kvStore, }, + ReplicationFactor: 1, + } + if tc.enableAPIRulesBackup { + cfg.Ring.ReplicationFactor = 3 + cfg.Ring.ZoneAwarenessEnabled = tc.enableZoneAwareReplication + } + if tc.enableZoneAwareReplication { + cfg.Ring.InstanceZone = tc.rulerAZMap[id] } r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) @@ -647,7 +785,7 @@ func TestGetRules(t *testing.T) { d = ring.NewDesc() } for rID, tokens := range allTokensByRuler { - d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, tc.rulerStateMap[rID], time.Now()) + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now()) } return d, true, nil }) @@ -666,6 +804,24 @@ func TestGetRules(t *testing.T) { forEachRuler(func(_ string, r *Ruler) { r.syncRules(context.Background(), rulerSyncReasonInitial) }) + + if tc.sharding { + // update the State of the rulers in the ring based on tc.rulerStateMap + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, tc.rulerStateMap[rID], time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + for u := range allRulesByUser { ctx := user.InjectOrgID(context.Background(), u) forEachRuler(func(_ string, r *Ruler) { @@ -685,9 +841,9 @@ func TestGetRules(t *testing.T) { mockPoolClient := r.clientsPool.(*mockRulerClientsPool) if tc.shardingStrategy == util.ShardingStrategyShuffle { - require.Equal(t, int32(tc.shuffleShardSize), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } else { - require.Equal(t, int32(len(rulerAddrMap)), mockPoolClient.numberOfCalls.Load()) + require.Equal(t, int32(tc.expectedClientCallCount), mockPoolClient.numberOfCalls.Load()) } mockPoolClient.numberOfCalls.Store(0) } @@ -696,13 +852,21 @@ func TestGetRules(t *testing.T) { totalLoadedRules := 0 totalConfiguredRules := 0 + ruleBackupCount := make(map[string]int) forEachRuler(func(rID string, r *Ruler) { - localRules, err := r.listRules(context.Background()) + localRules, localBackupRules, err := r.listRules(context.Background()) require.NoError(t, err) for _, rules := range localRules { totalLoadedRules += len(rules) } + for user, rules := range localBackupRules { + for _, rule := range rules { + key := user + rule.Namespace + rule.Name + c := ruleBackupCount[key] + ruleBackupCount[key] = c + 1 + } + } totalConfiguredRules += len(allRulesByRuler[rID]) }) @@ -713,10 +877,241 @@ func TestGetRules(t *testing.T) { numberOfRulers := len(rulerAddrMap) require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules) } + if tc.enableAPIRulesBackup && tc.sharding && tc.expectedError == nil { + // all rules should be backed up + require.Equal(t, totalConfiguredRules, len(ruleBackupCount)) + var hasUnhealthyRuler bool + for _, state := range tc.rulerStateMap { + if state != ring.ACTIVE && state != ring.LEAVING { + hasUnhealthyRuler = true + break + } + } + for _, v := range ruleBackupCount { + if !hasUnhealthyRuler { + // with replication factor set to 3, each rule is backed up by 2 rulers + require.Equal(t, 2, v) + } else { + require.GreaterOrEqual(t, v, 1) + } + } + } else { + // If APIEnableRulesBackup is disabled, rulers should not back up any rules + require.Equal(t, 0, len(ruleBackupCount)) + } }) } } +func TestGetRulesFromBackup(t *testing.T) { + // ruler ID -> (user ID -> list of groups). + type expectedRulesMap map[string]map[string]rulespb.RuleGroupList + + rule := []*rulespb.RuleDesc{ + { + Record: "rtest_user1_1", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + }, + { + Alert: "atest_user1_1", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + }, + { + Record: "rtest_user1_2", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + Labels: []cortexpb.LabelAdapter{ + {Name: "key", Value: "val"}, + }, + }, + { + Alert: "atest_user1_2", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + Labels: []cortexpb.LabelAdapter{ + {Name: "key", Value: "val"}, + }, + Annotations: []cortexpb.LabelAdapter{ + {Name: "aKey", Value: "aVal"}, + }, + For: 10 * time.Second, + KeepFiringFor: 20 * time.Second, + }, + } + + tenantId := "user1" + + rulerStateMapOnePending := map[string]ring.InstanceState{ + "ruler1": ring.ACTIVE, + "ruler2": ring.PENDING, + "ruler3": ring.ACTIVE, + } + + rulerAZEvenSpread := map[string]string{ + "ruler1": "a", + "ruler2": "b", + "ruler3": "c", + } + + expectedRules := expectedRulesMap{ + "ruler1": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "l1", Interval: 10 * time.Minute, Limit: 10, Rules: rule}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "l2", Interval: 0, Rules: rule}, + }, + }, + "ruler2": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "b1", Interval: 10 * time.Minute, Limit: 10, Rules: rule}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "b2", Interval: 0, Rules: rule}, + }, + }, + "ruler3": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace2", Name: "b3", Interval: 0, Rules: rule}, + }, + }, + } + + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + allRulesByUser := map[string]rulespb.RuleGroupList{} + allTokensByRuler := map[string][]uint32{} + rulerAddrMap := map[string]*Ruler{} + + createRuler := func(id string) *Ruler { + store := newMockRuleStore(allRulesByUser, nil) + cfg := defaultRulerConfig(t) + + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.EnableSharding = true + cfg.APIEnableRulesBackup = true + cfg.EvaluationInterval = 5 * time.Minute + + cfg.Ring = RingConfig{ + InstanceID: id, + InstanceAddr: id, + KVStore: kv.Config{ + Mock: kvStore, + }, + ReplicationFactor: 3, + ZoneAwarenessEnabled: true, + InstanceZone: rulerAZEvenSpread[id], + } + + r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) + r.limits = ruleLimits{evalDelay: 0, tenantShard: 3} + rulerAddrMap[id] = r + if r.ring != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring)) + t.Cleanup(r.ring.StopAsync) + } + return r + } + + for rID, r := range expectedRules { + createRuler(rID) + for u, rules := range r { + allRulesByUser[u] = append(allRulesByUser[u], rules...) + allTokensByRuler[rID] = generateTokenForGroups(rules, 1) + } + } + + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + forEachRuler := func(f func(rID string, r *Ruler)) { + for rID, r := range rulerAddrMap { + f(rID, r) + } + } + + // Sync Rules + forEachRuler(func(_ string, r *Ruler) { + r.syncRules(context.Background(), rulerSyncReasonInitial) + }) + + // update the State of the rulers in the ring based on tc.rulerStateMap + err = kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, rulerStateMapOnePending[rID], time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + requireGroupStateEqual := func(a *GroupStateDesc, b *GroupStateDesc) { + require.Equal(t, a.Group.Interval, b.Group.Interval) + require.Equal(t, a.Group.User, b.Group.User) + require.Equal(t, a.Group.Limit, b.Group.Limit) + require.Equal(t, a.EvaluationTimestamp, b.EvaluationTimestamp) + require.Equal(t, a.EvaluationDuration, b.EvaluationDuration) + require.Equal(t, len(a.ActiveRules), len(b.ActiveRules)) + for i, aRule := range a.ActiveRules { + bRule := b.ActiveRules[i] + require.Equal(t, aRule.EvaluationTimestamp, bRule.EvaluationTimestamp) + require.Equal(t, aRule.EvaluationDuration, bRule.EvaluationDuration) + require.Equal(t, aRule.Health, bRule.Health) + require.Equal(t, aRule.LastError, bRule.LastError) + require.Equal(t, aRule.Rule.Expr, bRule.Rule.Expr) + require.Equal(t, len(aRule.Rule.Labels), len(bRule.Rule.Labels)) + require.Equal(t, fmt.Sprintf("%+v", aRule.Rule.Labels), fmt.Sprintf("%+v", aRule.Rule.Labels)) + if aRule.Rule.Alert != "" { + require.Equal(t, fmt.Sprintf("%+v", aRule.Rule.Annotations), fmt.Sprintf("%+v", bRule.Rule.Annotations)) + require.Equal(t, aRule.Rule.Alert, bRule.Rule.Alert) + require.Equal(t, aRule.Rule.For, bRule.Rule.For) + require.Equal(t, aRule.Rule.KeepFiringFor, bRule.Rule.KeepFiringFor) + require.Equal(t, aRule.State, bRule.State) + require.Equal(t, aRule.Alerts, bRule.Alerts) + } else { + require.Equal(t, aRule.Rule.Record, bRule.Rule.Record) + } + } + } + ctx := user.InjectOrgID(context.Background(), tenantId) + ruleStateDescriptions, err := rulerAddrMap["ruler1"].GetRules(ctx, RulesRequest{}) + require.NoError(t, err) + require.Equal(t, 5, len(ruleStateDescriptions)) + stateByKey := map[string]*GroupStateDesc{} + for _, state := range ruleStateDescriptions { + stateByKey[state.Group.Namespace+";"+state.Group.Name] = state + } + // Rule Group Name that starts will b are from the backup and those that start with l are evaluating, the details of + // the group other than the Name should be equal to the group that starts with l as the config is the same. This test + // confirms that the way we convert rulepb.RuleGroupList to GroupStateDesc is consistent to how we convert + // promRules.Group to GroupStateDesc + requireGroupStateEqual(stateByKey["namespace;l1"], stateByKey["namespace;b1"]) + requireGroupStateEqual(stateByKey["namespace;l2"], stateByKey["namespace;b2"]) + + // Validate backup rules respect the filters + ruleStateDescriptions, err = rulerAddrMap["ruler1"].GetRules(ctx, RulesRequest{ + RuleNames: []string{"rtest_user1_1", "atest_user1_1"}, + Files: []string{"namespace"}, + RuleGroupNames: []string{"b1"}, + Type: recordingRuleFilter, + }) + require.NoError(t, err) + require.Equal(t, 1, len(ruleStateDescriptions)) + require.Equal(t, "b1", ruleStateDescriptions[0].Group.Name) + require.Equal(t, 1, len(ruleStateDescriptions[0].ActiveRules)) + require.Equal(t, "rtest_user1_1", ruleStateDescriptions[0].ActiveRules[0].Rule.Record) +} + func TestSharding(t *testing.T) { const ( user1 = "user1" @@ -746,14 +1141,16 @@ func TestSharding(t *testing.T) { type expectedRulesMap map[string]map[string]rulespb.RuleGroupList type testCase struct { - sharding bool - shardingStrategy string - shuffleShardSize int - setupRing func(*ring.Desc) - enabledUsers []string - disabledUsers []string - - expectedRules expectedRulesMap + sharding bool + shardingStrategy string + enableAPIRulesBackup bool + replicationFactor int + shuffleShardSize int + setupRing func(*ring.Desc) + enabledUsers []string + disabledUsers []string + expectedRules expectedRulesMap + expectedBackupRules expectedRulesMap } const ( @@ -775,21 +1172,24 @@ func TestSharding(t *testing.T) { testCases := map[string]testCase{ "no sharding": { - sharding: false, - expectedRules: expectedRulesMap{ruler1: allRules}, + sharding: false, + replicationFactor: 1, + expectedRules: expectedRulesMap{ruler1: allRules}, }, "no sharding, single user allowed": { - sharding: false, - enabledUsers: []string{user1}, + sharding: false, + replicationFactor: 1, + enabledUsers: []string{user1}, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user1: {user1Group1, user1Group2}, }}, }, "no sharding, single user disabled": { - sharding: false, - disabledUsers: []string{user1}, + sharding: false, + replicationFactor: 1, + disabledUsers: []string{user1}, expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{ user2: {user2Group1}, user3: {user3Group1}, @@ -797,8 +1197,9 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -806,9 +1207,10 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -818,9 +1220,10 @@ func TestSharding(t *testing.T) { }, "default sharding, single ruler, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now()) }, @@ -831,8 +1234,9 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -852,9 +1256,10 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -872,9 +1277,10 @@ func TestSharding(t *testing.T) { }, "default sharding, multiple ACTIVE rulers, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -892,8 +1298,9 @@ func TestSharding(t *testing.T) { }, "default sharding, unhealthy ACTIVE ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -916,8 +1323,9 @@ func TestSharding(t *testing.T) { }, "default sharding, LEAVING ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now()) @@ -932,8 +1340,9 @@ func TestSharding(t *testing.T) { }, "default sharding, JOINING ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyDefault, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyDefault, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now()) @@ -948,8 +1357,9 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, single ruler": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now()) @@ -961,9 +1371,10 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, multiple rulers, shard size 1": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 1, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now()) @@ -978,9 +1389,10 @@ func TestSharding(t *testing.T) { // Same test as previous one, but with shard size=2. Second ruler gets all the rules. "shuffle sharding, two rulers, shard size 2": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { // Exact same tokens setup as previous test. @@ -995,9 +1407,10 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, two rulers, shard size 1, distributed users": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 1, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 1, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now()) @@ -1015,9 +1428,10 @@ func TestSharding(t *testing.T) { }, }, "shuffle sharding, three rulers, shard size 2": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1039,9 +1453,10 @@ func TestSharding(t *testing.T) { }, }, "shuffle sharding, three rulers, shard size 2, ruler2 has no users": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now()) @@ -1062,10 +1477,11 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, three rulers, shard size 2, single enabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, - enabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + enabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1085,10 +1501,11 @@ func TestSharding(t *testing.T) { }, "shuffle sharding, three rulers, shard size 2, single disabled user": { - sharding: true, - shardingStrategy: util.ShardingStrategyShuffle, - shuffleShardSize: 2, - disabledUsers: []string{user1}, + sharding: true, + replicationFactor: 1, + shardingStrategy: util.ShardingStrategyShuffle, + shuffleShardSize: 2, + disabledUsers: []string{user1}, setupRing: func(desc *ring.Desc) { desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) @@ -1105,6 +1522,40 @@ func TestSharding(t *testing.T) { }, }, }, + + "shuffle sharding, three rulers, shard size 2, enable api backup": { + sharding: true, + replicationFactor: 2, + shardingStrategy: util.ShardingStrategyShuffle, + enableAPIRulesBackup: true, + shuffleShardSize: 2, + enabledUsers: []string{user1}, + + setupRing: func(desc *ring.Desc) { + desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now()) + desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now()) + }, + + expectedRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + ruler3: map[string]rulespb.RuleGroupList{}, + }, + expectedBackupRules: expectedRulesMap{ + ruler1: map[string]rulespb.RuleGroupList{ + user1: {user1Group2}, + }, + ruler2: map[string]rulespb.RuleGroupList{ + user1: {user1Group1}, + }, + ruler3: map[string]rulespb.RuleGroupList{}, + }, + }, } for name, tc := range testCases { @@ -1115,8 +1566,9 @@ func TestSharding(t *testing.T) { setupRuler := func(id string, host string, port int, forceRing *ring.Ring) *Ruler { store := newMockRuleStore(allRules, nil) cfg := Config{ - EnableSharding: tc.sharding, - ShardingStrategy: tc.shardingStrategy, + EnableSharding: tc.sharding, + APIEnableRulesBackup: tc.enableAPIRulesBackup, + ShardingStrategy: tc.shardingStrategy, Ring: RingConfig{ InstanceID: id, InstanceAddr: host, @@ -1124,7 +1576,8 @@ func TestSharding(t *testing.T) { KVStore: kv.Config{ Mock: kvStore, }, - HeartbeatTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + ReplicationFactor: tc.replicationFactor, }, FlushCheckPeriod: 0, EnabledTenants: tc.enabledUsers, @@ -1174,23 +1627,28 @@ func TestSharding(t *testing.T) { } // Always add ruler1 to expected rulers, even if there is no ring (no sharding). - loadedRules1, err := r1.listRules(context.Background()) + loadedRules1, backupRules1, err := r1.listRules(context.Background()) require.NoError(t, err) expected := expectedRulesMap{ ruler1: loadedRules1, } + expectedBackup := expectedRulesMap{ + ruler1: backupRules1, + } + addToExpected := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, backup, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one. if loaded == nil { loaded = map[string]rulespb.RuleGroupList{} } expected[id] = loaded + expectedBackup[id] = backup } } @@ -1198,6 +1656,14 @@ func TestSharding(t *testing.T) { addToExpected(ruler3, r3) require.Equal(t, tc.expectedRules, expected) + + if !tc.enableAPIRulesBackup { + require.Equal(t, 0, len(expectedBackup[ruler1])) + require.Equal(t, 0, len(expectedBackup[ruler2])) + require.Equal(t, 0, len(expectedBackup[ruler3])) + } else { + require.Equal(t, tc.expectedBackupRules, expectedBackup) + } }) } } @@ -1261,7 +1727,8 @@ func Test_LoadPartialGroups(t *testing.T) { KVStore: kv.Config{ Mock: kvStore, }, - HeartbeatTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + ReplicationFactor: 1, }, FlushCheckPeriod: 0, } @@ -1288,7 +1755,7 @@ func Test_LoadPartialGroups(t *testing.T) { len(r1.manager.GetRules(user3)) > 0 }) - returned, err := r1.listRules(context.Background()) + returned, _, err := r1.listRules(context.Background()) require.NoError(t, err) require.Equal(t, returned, allRules) require.Equal(t, 2, len(manager.userManagers)) @@ -1784,7 +2251,8 @@ func TestRulerDisablesRuleGroups(t *testing.T) { KVStore: kv.Config{ Mock: kvStore, }, - HeartbeatTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + ReplicationFactor: 1, }, FlushCheckPeriod: 0, } @@ -1832,7 +2300,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { } actualRules := map[string]rulespb.RuleGroupList{} - loadedRules, err := r1.listRules(context.Background()) + loadedRules, _, err := r1.listRules(context.Background()) require.NoError(t, err) for k, v := range loadedRules { if len(v) > 0 { @@ -1843,7 +2311,7 @@ func TestRulerDisablesRuleGroups(t *testing.T) { fetchRules := func(id string, r *Ruler) { // Only expect rules from other rulers when using ring, and they are present in the ring. if r != nil && rulerRing != nil && rulerRing.HasInstance(id) { - loaded, err := r.listRules(context.Background()) + loaded, _, err := r.listRules(context.Background()) require.NoError(t, err) // Normalize nil map to empty one.