Skip to content

Commit

Permalink
Allowing rule backup for rules API HA (#5782)
Browse files Browse the repository at this point in the history
* Allowing ruler replication to be configurable

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Allow rules to be loaded to rulers as backup for List rules API HA

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Add integration test for rulers API with backup enabled

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Mark the entire feature as experimental and improve variable names

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Rename backUpRuleGroups to setRuleGroups to make it code less confusing

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Remove backup manager lock because its not needed

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Improve code quality

- Remove duplicate code and use better data structures
- Make backup rule_group label match the prometheus rule_group label
- Skip initialization when feature is not enabled

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Store rulepb.RuleGroupList in rules backup instead of promRules.Group

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Add GetReplicationSetForOperationWithNoQuorum ring method and use it in getShardedRules

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Refactor getLocalRules to make the method shorter

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Add new ring method to get all instances and created a new method
in ruler to get Replicaset without requiring quorum

Signed-off-by: Emmanuel Lodovice <[email protected]>

* Fix flaky test due to sorting issue

Signed-off-by: Emmanuel Lodovice <[email protected]>

---------

Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice authored Apr 19, 2024
1 parent 44a5d25 commit 8f6da89
Show file tree
Hide file tree
Showing 18 changed files with 1,720 additions and 145 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names and values, only results from quorum number of zones will be included and merged. #5779
* [FEATURE] Storage Cache Clients: Add config `set_async_circuit_breaker_config` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. Implemented in both memcached and redis cache clients. #5789
* [FEATURE] Ruler: Add experimental `experimental.ruler.api-deduplicate-rules` flag to remove duplicate rule groups from the Prometheus compatible rules API endpoint. Add experimental `ruler.ring.replication-factor` and `ruler.ring.zone-awareness-enabled` flags to configure rule group replication, but only the first ruler in the replicaset evaluates the rule group, the rest will just hold a copy as backup. Add experimental `experimental.ruler.api-enable-rules-backup` flag to configure rulers to send the rule group backups stored in the replicaset to handle events when a ruler is down during an API request to list rules. #5782
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4245,6 +4245,16 @@ ring:
# CLI flag: -ruler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# EXPERIMENTAL: The replication factor to use when loading rule groups for API
# HA.
# CLI flag: -ruler.ring.replication-factor
[replication_factor: <int> | default = 1]
# EXPERIMENTAL: True to enable zone-awareness and load rule groups across
# different availability zones for API HA.
# CLI flag: -ruler.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Name of network interface to read address from.
# CLI flag: -ruler.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand All @@ -4266,6 +4276,21 @@ ring:
# CLI flag: -experimental.ruler.enable-api
[enable_api: <boolean> | default = false]
# EXPERIMENTAL: Enable rulers to store a copy of rules owned by other rulers
# with default state (state before any evaluation) and send this copy in list
# API requests as backup in case the ruler who owns the rule fails to send its
# rules. This allows the rules API to handle ruler outage by returning rules
# with default state. Ring replication-factor needs to be set to 2 or more for
# this to be useful.
# CLI flag: -experimental.ruler.api-enable-rules-backup
[api_enable_rules_backup: <boolean> | default = false]
# EXPERIMENTAL: Remove duplicate rules in the prometheus rules and alerts API
# response. If there are duplicate rules the rule with the latest evaluation
# timestamp will be kept.
# CLI flag: -experimental.ruler.api-deduplicate-rules
[api_deduplicate_rules: <boolean> | default = false]
# Comma separated list of tenants whose rules this ruler can evaluate. If
# specified, only these tenants will be handled by ruler, otherwise this ruler
# can process rules from all tenants. Subject to sharding.
Expand Down
36 changes: 27 additions & 9 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ func TestRulerSharding(t *testing.T) {
}

func TestRulerAPISharding(t *testing.T) {
testRulerAPIWithSharding(t, false)
}

func TestRulerAPIShardingWithAPIRulesBackupEnabled(t *testing.T) {
testRulerAPIWithSharding(t, true)
}

func testRulerAPIWithSharding(t *testing.T, enableAPIRulesBackup bool) {
const numRulesGroups = 100

random := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down Expand Up @@ -444,24 +452,30 @@ func TestRulerAPISharding(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
overrides := map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
}
if enableAPIRulesBackup {
overrides["-ruler.ring.replication-factor"] = "3"
overrides["-experimental.ruler.api-enable-rules-backup"] = "true"
}
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
overrides,
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
Expand Down Expand Up @@ -542,6 +556,10 @@ func TestRulerAPISharding(t *testing.T) {
},
}
// For each test case, fetch the rules with configured filters, and ensure the results match.
if enableAPIRulesBackup {
err := ruler2.Kill() // if api-enable-rules-backup is enabled the APIs should be able to handle a ruler going down
require.NoError(t, err)
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
actualGroups, err := c.GetPrometheusRules(tc.filter)
Expand Down
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op ring.Operation) (map[string]r
return args.Get(0).(map[string]ring.InstanceDesc), args.Error(1)
}

func (r *RingMock) GetAllInstanceDescs(op ring.Operation) ([]ring.InstanceDesc, []ring.InstanceDesc, error) {
args := r.Called(op)
return args.Get(0).([]ring.InstanceDesc), make([]ring.InstanceDesc, 0), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ring.ReplicationSet), args.Error(1)
Expand Down
25 changes: 25 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type ReadRing interface {
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetAllInstanceDescs returns a slice of healthy and unhealthy InstanceDesc.
GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error)

// GetInstanceDescsForOperation returns map of InstanceDesc with instance ID as the keys.
GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error)

Expand Down Expand Up @@ -463,6 +466,28 @@ func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
}, nil
}

// GetAllInstanceDescs implements ReadRing.
func (r *Ring) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
return nil, nil, ErrEmptyRing
}
healthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
unhealthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
storageLastUpdate := r.KVClient.LastUpdateTime(r.key)
for _, instance := range r.ringDesc.Ingesters {
if r.IsHealthy(&instance, op, storageLastUpdate) {
healthyInstances = append(healthyInstances, instance)
} else {
unhealthyInstances = append(unhealthyInstances, instance)
}
}

return healthyInstances, unhealthyInstances, nil
}

// GetInstanceDescsForOperation implements ReadRing.
func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error) {
r.mtx.RLock()
Expand Down
35 changes: 35 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,41 @@ func TestRing_GetInstanceDescsForOperation(t *testing.T) {
}, instanceDescs)
}

func TestRing_GetAllInstanceDescs(t *testing.T) {
now := time.Now().Unix()
twoMinutesAgo := time.Now().Add(-2 * time.Minute).Unix()

ringDesc := &Desc{Ingesters: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now},
"instance-2": {Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now}, // not healthy state
"instance-3": {Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo}, // heartbeat timed out
}}

ring := Ring{
cfg: Config{HeartbeatTimeout: time.Minute},
ringDesc: ringDesc,
ringTokens: ringDesc.GetTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
KVClient: &MockClient{},
}

testOp := NewOp([]InstanceState{ACTIVE}, nil)

healthyInstanceDescs, unhealthyInstanceDescs, err := ring.GetAllInstanceDescs(testOp)
require.NoError(t, err)
require.EqualValues(t, []InstanceDesc{
{Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now},
}, healthyInstanceDescs)
sort.Slice(unhealthyInstanceDescs, func(i, j int) bool { return unhealthyInstanceDescs[i].Addr < unhealthyInstanceDescs[j].Addr })
require.EqualValues(t, []InstanceDesc{
{Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now},
{Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo},
}, unhealthyInstanceDescs)
}

func TestRing_GetReplicationSetForOperation(t *testing.T) {
now := time.Now()
g := NewRandomTokenGenerator()
Expand Down
5 changes: 5 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op Operation) (map[string]Instan
return args.Get(0).(map[string]InstanceDesc), args.Error(1)
}

func (r *RingMock) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) {
args := r.Called(op)
return args.Get(0).([]InstanceDesc), make([]InstanceDesc, 0), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

// keep data.groups are in order
sort.Slice(groups, func(i, j int) bool {
if groups[i].File == groups[j].File {
return groups[i].Name < groups[j].Name
}
return groups[i].File < groups[j].File
})

Expand Down
26 changes: 23 additions & 3 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct {
notifiers map[string]*rulerNotifier
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics

// rules backup
rulesBackupManager *rulesBackupManager

managersTotal prometheus.Gauge
lastReloadSuccessful *prometheus.GaugeVec
lastReloadSuccessfulTimestamp *prometheus.GaugeVec
Expand Down Expand Up @@ -79,7 +82,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
os.Exit(1)
}

return &DefaultMultiTenantManager{
m := &DefaultMultiTenantManager{
cfg: cfg,
notifierCfg: ncfg,
managerFactory: managerFactory,
Expand Down Expand Up @@ -112,7 +115,11 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
}, []string{"user"}),
registry: reg,
logger: logger,
}, nil
}
if cfg.APIEnableRulesBackup {
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
}
return m, nil
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
Expand Down Expand Up @@ -161,8 +168,14 @@ func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
delete(r.ruleCache, user)
}

func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
if r.rulesBackupManager != nil {
r.rulesBackupManager.setRuleGroups(ctx, ruleGroups)
}
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
// users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
Expand Down Expand Up @@ -333,6 +346,13 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
return groups
}

func (r *DefaultMultiTenantManager) GetBackupRules(userID string) rulespb.RuleGroupList {
if r.rulesBackupManager != nil {
return r.rulesBackupManager.getRuleGroups(userID)
}
return nil
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
Expand Down
41 changes: 41 additions & 0 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,47 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")
}

func TestBackupRules(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir, APIEnableRulesBackup: true}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user1 = "testUser"
const user2 = "testUser2"

require.Equal(t, 0, len(m.GetBackupRules(user1)))
require.Equal(t, 0, len(m.GetBackupRules(user2)))

userRules := map[string]rulespb.RuleGroupList{
user1: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
user2: {
&rulespb.RuleGroupDesc{
Name: "group2",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
}
m.BackUpRuleGroups(context.TODO(), userRules)
require.Equal(t, userRules[user1], m.GetBackupRules(user1))
require.Equal(t, userRules[user2], m.GetBackupRules(user2))
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/ruler/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ruler

import (
"time"

promRules "github.com/prometheus/prometheus/rules"
)

// mergeGroupStateDesc removes duplicates from the provided []*GroupStateDesc by keeping the GroupStateDesc with the
// latest information. It uses the EvaluationTimestamp of the GroupStateDesc and the EvaluationTimestamp of the
// ActiveRules in a GroupStateDesc to determine the which GroupStateDesc has the latest information.
func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
states := make(map[string]*GroupStateDesc)
rgTime := make(map[string]time.Time)
for _, state := range in {
latestTs := state.EvaluationTimestamp
for _, r := range state.ActiveRules {
if latestTs.Before(r.EvaluationTimestamp) {
latestTs = r.EvaluationTimestamp
}
}
key := promRules.GroupKey(state.Group.Namespace, state.Group.Name)
ts, ok := rgTime[key]
if !ok || ts.Before(latestTs) {
states[key] = state
rgTime[key] = latestTs
}
}
groups := make([]*GroupStateDesc, 0, len(states))
for _, state := range states {
groups = append(groups, state)
}
return groups
}
Loading

0 comments on commit 8f6da89

Please sign in to comment.