Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Mar 5, 2024
1 parent 3624924 commit 7aa272b
Show file tree
Hide file tree
Showing 7 changed files with 774 additions and 57 deletions.
14 changes: 13 additions & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct {
notifiers map[string]*rulerNotifier
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics

// rules backup
rulesBackupManager *rulesBackupManager

managersTotal prometheus.Gauge
lastReloadSuccessful *prometheus.GaugeVec
lastReloadSuccessfulTimestamp *prometheus.GaugeVec
Expand Down Expand Up @@ -85,6 +88,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
userManagerMetrics: userManagerMetrics,
rulesBackupManager: newRulesBackupManager(cfg, logger),
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "ruler_managers_total",
Expand Down Expand Up @@ -142,8 +146,12 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.managersTotal.Set(float64(len(r.userManagers)))
}

func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
r.rulesBackupManager.backUpRuleGroups(ctx, ruleGroups)
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
// users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
Expand Down Expand Up @@ -279,6 +287,10 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
return groups
}

func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group {
return r.rulesBackupManager.getRuleGroups(userID)
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
Expand Down
43 changes: 43 additions & 0 deletions pkg/ruler/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ruler

import (
"strings"
"time"

"golang.org/x/exp/slices"

promRules "github.com/prometheus/prometheus/rules"
)

func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
states := make(map[string]*GroupStateDesc)
rgTime := make(map[string]time.Time)
for _, state := range in {
latestTs := state.EvaluationTimestamp
for _, r := range state.ActiveRules {
if latestTs.Before(r.EvaluationTimestamp) {
latestTs = r.EvaluationTimestamp
}
}
key := promRules.GroupKey(state.Group.Namespace, state.Group.Name)
ts, ok := rgTime[key]
if !ok || ts.Before(latestTs) {
states[key] = state
rgTime[key] = latestTs
}
}
groups := make([]*GroupStateDesc, 0, len(states))
for _, state := range states {
groups = append(groups, state)
}
slices.SortFunc(groups, func(a, b *GroupStateDesc) int {
fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace)

// If the namespace is the same, check the group name
if fileCompare != 0 {
return fileCompare
}
return strings.Compare(a.Group.Name, b.Group.Name)
})
return groups
}
103 changes: 103 additions & 0 deletions pkg/ruler/merger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package ruler

import (
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/stretchr/testify/require"
"reflect"
"testing"
"time"
)

func TestMergeGroupStateDesc(t *testing.T) {
curTime := time.Now()
r := rulespb.RuleDesc{
Expr: "1 > 1",
}
g1 := rulespb.RuleGroupDesc{
Name: "g1",
Namespace: "ns1",
}
g2 := rulespb.RuleGroupDesc{
Name: "g2",
Namespace: "ns1",
}
rs1 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime,
}
rs1NotRun := RuleStateDesc{
Rule: &r,
}
rs2 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime,
}
rs2NotRun := RuleStateDesc{
Rule: &r,
}
rs3 := RuleStateDesc{
Rule: &r,
EvaluationTimestamp: curTime.Add(10 * time.Second),
}

gs1 := GroupStateDesc{
Group: &g1,
ActiveRules: []*RuleStateDesc{&rs1, &rs2},
EvaluationTimestamp: curTime,
}
gs1NotRun := GroupStateDesc{
Group: &g1,
ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun},
}
gs2 := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1, &rs2},
EvaluationTimestamp: curTime,
}
gs2NotRun := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1NotRun, &rs2NotRun},
}
gs3 := GroupStateDesc{
Group: &g2,
ActiveRules: []*RuleStateDesc{&rs1, &rs3},
EvaluationTimestamp: curTime,
}

type testCase struct {
input []*GroupStateDesc
expectedOutput []*GroupStateDesc
}

testCases := map[string]testCase{
"No duplicate": {
input: []*GroupStateDesc{&gs1, &gs2},
expectedOutput: []*GroupStateDesc{&gs1, &gs2},
},
"No duplicate but not evaluated": {
input: []*GroupStateDesc{&gs1NotRun, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1NotRun, &gs2NotRun},
},
"With exact duplicate": {
input: []*GroupStateDesc{&gs1, &gs2NotRun, &gs1, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs2NotRun},
},
"With duplicates that are not evaluated": {
input: []*GroupStateDesc{&gs1, &gs2, &gs1NotRun, &gs2NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs2},
},
"With duplicate with a new newer rule evaluation": {
input: []*GroupStateDesc{&gs3, &gs1, &gs2, &gs1NotRun},
expectedOutput: []*GroupStateDesc{&gs1, &gs3},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
out := mergeGroupStateDesc(tc.input)
require.Equal(t, len(tc.expectedOutput), len(out))
require.True(t, reflect.DeepEqual(tc.expectedOutput, out))
})
}

}
107 changes: 107 additions & 0 deletions pkg/ruler/rule_backup_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package ruler

import (
"context"
"errors"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
promRules "github.com/prometheus/prometheus/rules"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
)

// Implements GroupLoader interface but instead of reading from a file when Load is called, it returns the
// rulefmt.RuleGroup it has stored
type loader struct {
ruleGroups map[string][]rulefmt.RuleGroup
}

func (r *loader) Load(identifier string) (*rulefmt.RuleGroups, []error) {
return &rulefmt.RuleGroups{
Groups: r.ruleGroups[identifier],
}, nil
}

func (r *loader) Parse(query string) (parser.Expr, error) {
return parser.ParseExpr(query)
}

type rulesBackupManager struct {
backupRuleGroupsMtx sync.RWMutex
backupRuleGroups map[string][]*promRules.Group
cfg Config

logger log.Logger
}

func newRulesBackupManager(cfg Config, logger log.Logger) *rulesBackupManager {
return &rulesBackupManager{
backupRuleGroups: make(map[string][]*promRules.Group),
cfg: cfg,
logger: logger,
}
}

func (r *rulesBackupManager) backUpRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
r.backupRuleGroupsMtx.Lock()
defer r.backupRuleGroupsMtx.Unlock()
backupRuleGroups := make(map[string][]*promRules.Group)
for user, groups := range ruleGroups {
g, err := r.ruleGroupListToPromGroups(user, groups)
if err != nil {
// TODO: Increment a metric
level.Error(r.logger).Log("msg", "unable to back up rules", "user", user, "err", err)
continue
}
backupRuleGroups[user] = g
}
r.backupRuleGroups = backupRuleGroups
}

// ruleGroupListToPromGroups converts rulespb.RuleGroupList to []*promRules.Group by creating a single use
// promRules.Manager and calling its LoadGroups method.
func (r *rulesBackupManager) ruleGroupListToPromGroups(user string, ruleGroups rulespb.RuleGroupList) ([]*promRules.Group, error) {
rgs := ruleGroups.Formatted()

loader := &loader{
ruleGroups: rgs,
}
promManager := promRules.NewManager(&promRules.ManagerOptions{
ExternalURL: r.cfg.ExternalURL.URL,
GroupLoader: loader,
})

namespaces := make([]string, 0, len(rgs))
for k := range rgs {
namespaces = append(namespaces, k)
}
level.Info(r.logger).Log("msg", "backup rules for user", "user", user, "namespaces", strings.Join(namespaces, ","))
loadedGroups, errs := promManager.LoadGroups(r.cfg.EvaluationInterval, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), nil, namespaces...)
if errs != nil {
for _, e := range errs {
level.Error(r.logger).Log("msg", "loading groups to backup failed", "user", user, "namespaces", namespaces, "err", e)
}
return nil, errors.New("error loading rules to backup")
}

groups := make([]*promRules.Group, 0, len(loadedGroups))
for _, g := range loadedGroups {
groups = append(groups, g)
}
return groups, nil
}

func (r *rulesBackupManager) getRuleGroups(userID string) []*promRules.Group {
var result []*promRules.Group
r.backupRuleGroupsMtx.RLock()
defer r.backupRuleGroupsMtx.RUnlock()
if groups, exists := r.backupRuleGroups[userID]; exists {
result = groups
}
return result
}
Loading

0 comments on commit 7aa272b

Please sign in to comment.