Skip to content

Commit

Permalink
Allow rules to be loaded to rulers as backup for List rules API HA
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Mar 11, 2024
1 parent 9704cc3 commit a37e449
Show file tree
Hide file tree
Showing 11 changed files with 1,124 additions and 124 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names and values, only results from quorum number of zones will be included and merged. #5779
* [FEATURE] Storage Cache Clients: Add config `set_async_circuit_breaker_config` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. Implemented in both memcached and redis cache clients. #5789
* [FEATURE] Ruler: Add `ruler.api-deduplicate-rules` flag to remove duplicate rule groups from the Prometheus compatible rules API endpoint. Add `ruler.ring.replication-factor` and `ruler.ring.zone-awareness-enabled` flags to configure rule group replication, but only the first ruler in the replicaset evaluates the rule group, the rest will just hold a copy as backup. Add `ruler.api-enable-rules-backup` flag to configure rulers to send the rule group backups stored in the replicaset to handle events when a ruler is down during an API request to list rules. #5782
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4254,6 +4254,21 @@ ring:
# CLI flag: -experimental.ruler.enable-api
[enable_api: <boolean> | default = false]
# Enable rulers to store a copy of rules owned by other rulers with default
# state (state before any evaluation) and send this copy in list API requests as
# backup in case the ruler who owns the rule fails to send its rules. This
# allows the rules API to handle ruler outage by returning rules with default
# state. Ring replication-factor needs to be set to 3 or more for this to be
# useful.
# CLI flag: -ruler.api-enable-rules-backup
[api_enable_rules_backup: <boolean> | default = false]
# Remove duplicate rules in the prometheus rules and alerts API response. If
# there are duplicate rules the rule with the latest evaluation timestamp will
# be kept.
# CLI flag: -ruler.api-deduplicate-rules
[api_deduplicate_rules: <boolean> | default = false]
# Comma separated list of tenants whose rules this ruler can evaluate. If
# specified, only these tenants will be handled by ruler, otherwise this ruler
# can process rules from all tenants. Subject to sharding.
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

// keep data.groups are in order
sort.Slice(groups, func(i, j int) bool {
if groups[i].File == groups[j].File {
return groups[i].Name < groups[j].Name
}
return groups[i].File < groups[j].File
})

Expand Down
14 changes: 13 additions & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct {
notifiers map[string]*rulerNotifier
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics

// rules backup
rulesBackupManager *rulesBackupManager

managersTotal prometheus.Gauge
lastReloadSuccessful *prometheus.GaugeVec
lastReloadSuccessfulTimestamp *prometheus.GaugeVec
Expand Down Expand Up @@ -85,6 +88,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
userManagerMetrics: userManagerMetrics,
rulesBackupManager: newRulesBackupManager(cfg, logger, reg),
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ruler_managers_total",
Expand Down Expand Up @@ -142,8 +146,12 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.managersTotal.Set(float64(len(r.userManagers)))
}

func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
r.rulesBackupManager.backUpRuleGroups(ctx, ruleGroups)
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
// users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
Expand Down Expand Up @@ -279,6 +287,10 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
return groups
}

func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group {
return r.rulesBackupManager.getRuleGroups(userID)
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
Expand Down
49 changes: 49 additions & 0 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,55 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")
}

func TestBackupRules(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user1 = "testUser"
const user2 = "testUser2"

require.Equal(t, 0, len(m.GetBackupRules(user1)))
require.Equal(t, 0, len(m.GetBackupRules(user2)))

userRules := map[string]rulespb.RuleGroupList{
user1: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
user2: {
&rulespb.RuleGroupDesc{
Name: "group2",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
}
m.BackUpRuleGroups(context.TODO(), userRules)
managerOptions := &promRules.ManagerOptions{}
g1 := promRules.NewGroup(promRules.GroupOptions{
Name: userRules[user1][0].Name,
File: userRules[user1][0].Namespace,
Interval: userRules[user1][0].Interval,
Opts: managerOptions,
})
g2 := promRules.NewGroup(promRules.GroupOptions{
Name: userRules[user2][0].Name,
File: userRules[user2][0].Namespace,
Interval: userRules[user2][0].Interval,
Opts: managerOptions,
})
requireGroupsEqual(t, m.GetBackupRules(user1), []*promRules.Group{g1})
requireGroupsEqual(t, m.GetBackupRules(user2), []*promRules.Group{g2})
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.Lock()
defer m.userManagerMtx.Unlock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/ruler/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ruler

import (
"time"

promRules "github.com/prometheus/prometheus/rules"
)

// mergeGroupStateDesc removes duplicates from the provided []*GroupStateDesc by keeping the GroupStateDesc with the
// latest information. It uses the EvaluationTimestamp of the GroupStateDesc and the EvaluationTimestamp of the
// ActiveRules in a GroupStateDesc to determine the which GroupStateDesc has the latest information.
func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
states := make(map[string]*GroupStateDesc)
rgTime := make(map[string]time.Time)
for _, state := range in {
latestTs := state.EvaluationTimestamp
for _, r := range state.ActiveRules {
if latestTs.Before(r.EvaluationTimestamp) {
latestTs = r.EvaluationTimestamp
}
}
key := promRules.GroupKey(state.Group.Namespace, state.Group.Name)
ts, ok := rgTime[key]
if !ok || ts.Before(latestTs) {
states[key] = state
rgTime[key] = latestTs
}
}
groups := make([]*GroupStateDesc, 0, len(states))
for _, state := range states {
groups = append(groups, state)
}
return groups
}
114 changes: 114 additions & 0 deletions pkg/ruler/merger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package ruler

import (
"reflect"
"slices"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
)

func TestMergeGroupStateDesc(t *testing.T) {
curTime := time.Now()
r := rulespb.RuleDesc{
Expr: "1 > 1",
}
g1 := rulespb.RuleGroupDesc{
Name: "g1",
Namespace: "ns1",
}
g2 := rulespb.RuleGroupDesc{
Name: "g2",
Namespace: "ns1",
}
rs1 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime,
}
rs1NotRun := RuleStateDesc{
Rule: &r,
}
rs2 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime,
}
rs2NotRun := RuleStateDesc{
Rule: &r,
}
rs3 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime.Add(10 * time.Second),
}

gs1 := GroupStateDesc{
Group: &g1,
ActiveRules: []*RuleStateDesc{&rs1, &rs2},
EvaluationTimestamp: curTime,
}
gs1NotRun := GroupStateDesc{
Group: &g1,
ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun},
}
gs2 := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1, &rs2},
EvaluationTimestamp: curTime,
}
gs2NotRun := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun},
}
gs3 := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1, &rs3},
EvaluationTimestamp: curTime,
}

type testCase struct {
input []*GroupStateDesc
expectedOutput []*GroupStateDesc
}

testCases := map[string]testCase{
"No duplicate": {
input: []*GroupStateDesc{&gs1, &gs2},
expectedOutput: []*GroupStateDesc{&gs1, &gs2},
},
"No duplicate but not evaluated": {
input: []*GroupStateDesc{&gs1NotRun, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1NotRun, &gs2NotRun},
},
"With exact duplicate": {
input: []*GroupStateDesc{&gs1, &gs2NotRun, &gs1, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs2NotRun},
},
"With duplicates that are not evaluated": {
input: []*GroupStateDesc{&gs1, &gs2, &gs1NotRun, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs2},
},
"With duplicate with a new newer rule evaluation": {
input: []*GroupStateDesc{&gs3, &gs1, &gs2, &gs1NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs3},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
out := mergeGroupStateDesc(tc.input)
slices.SortFunc(out, func(a, b *GroupStateDesc) int {
fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace)
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(a.Group.Name, b.Group.Name)
})
require.Equal(t, len(tc.expectedOutput), len(out))
require.True(t, reflect.DeepEqual(tc.expectedOutput, out))
})
}

}
Loading

0 comments on commit a37e449

Please sign in to comment.