Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing rule backup for rules API HA #5782

Merged
merged 12 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names and values, only results from quorum number of zones will be included and merged. #5779
* [FEATURE] Storage Cache Clients: Add config `set_async_circuit_breaker_config` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations. Implemented in both memcached and redis cache clients. #5789
* [FEATURE] Ruler: Add experimental `experimental.ruler.api-deduplicate-rules` flag to remove duplicate rule groups from the Prometheus compatible rules API endpoint. Add experimental `ruler.ring.replication-factor` and `ruler.ring.zone-awareness-enabled` flags to configure rule group replication, but only the first ruler in the replicaset evaluates the rule group, the rest will just hold a copy as backup. Add experimental `experimental.ruler.api-enable-rules-backup` flag to configure rulers to send the rule group backups stored in the replicaset to handle events when a ruler is down during an API request to list rules. #5782
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4233,6 +4233,16 @@ ring:
# CLI flag: -ruler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# EXPERIMENTAL: The replication factor to use when loading rule groups for API
# HA.
# CLI flag: -ruler.ring.replication-factor
[replication_factor: <int> | default = 1]

# EXPERIMENTAL: True to enable zone-awareness and load rule groups across
# different availability zones for API HA.
# CLI flag: -ruler.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Name of network interface to read address from.
# CLI flag: -ruler.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand All @@ -4254,6 +4264,21 @@ ring:
# CLI flag: -experimental.ruler.enable-api
[enable_api: <boolean> | default = false]

# EXPERIMENTAL: Enable rulers to store a copy of rules owned by other rulers
# with default state (state before any evaluation) and send this copy in list
# API requests as backup in case the ruler who owns the rule fails to send its
# rules. This allows the rules API to handle ruler outage by returning rules
# with default state. Ring replication-factor needs to be set to 2 or more for
# this to be useful.
# CLI flag: -experimental.ruler.api-enable-rules-backup
[api_enable_rules_backup: <boolean> | default = false]

# EXPERIMENTAL: Remove duplicate rules in the prometheus rules and alerts API
# response. If there are duplicate rules the rule with the latest evaluation
# timestamp will be kept.
# CLI flag: -experimental.ruler.api-deduplicate-rules
[api_deduplicate_rules: <boolean> | default = false]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gathered that this config is for transition purposes when an user upgrade to a version of Cortex that supports replication.

In this cause I think the we should make the purpose more explicit, and call out this flag is subject to removal. Maybe we should event mark this field as "experimental" to tag along the "experimental" SLA of Cortex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is but also not 100%. During ring change there is a chance that a rule group is loaded by multiple rulers just because of the timing of the sync rules and the ring propagation delay. So this flag can be used to avoid duplicate rule groups in the list during these events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense for all these new flags to be experimental for now. We don't know if these flags still makes sense when we have rule evaluation HA. I made them all experimental.


# Comma separated list of tenants whose rules this ruler can evaluate. If
# specified, only these tenants will be handled by ruler, otherwise this ruler
# can process rules from all tenants. Subject to sharding.
Expand Down
36 changes: 27 additions & 9 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ func TestRulerSharding(t *testing.T) {
}

func TestRulerAPISharding(t *testing.T) {
testRulerAPIWithSharding(t, false)
}

func TestRulerAPIShardingWithAPIRulesBackupEnabled(t *testing.T) {
testRulerAPIWithSharding(t, true)
}

func testRulerAPIWithSharding(t *testing.T, enableAPIRulesBackup bool) {
const numRulesGroups = 100

random := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down Expand Up @@ -444,24 +452,30 @@ func TestRulerAPISharding(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
overrides := map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
}
if enableAPIRulesBackup {
overrides["-ruler.ring.replication-factor"] = "3"
overrides["-experimental.ruler.api-enable-rules-backup"] = "true"
}
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
overrides,
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
Expand Down Expand Up @@ -542,6 +556,10 @@ func TestRulerAPISharding(t *testing.T) {
},
}
// For each test case, fetch the rules with configured filters, and ensure the results match.
if enableAPIRulesBackup {
err := ruler2.Kill() // if api-enable-rules-backup is enabled the APIs should be able to handle a ruler going down
require.NoError(t, err)
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
actualGroups, err := c.GetPrometheusRules(tc.filter)
Expand Down
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op ring.Operation) (map[string]r
return args.Get(0).(map[string]ring.InstanceDesc), args.Error(1)
}

func (r *RingMock) GetAllInstanceDescs(op ring.Operation) ([]ring.InstanceDesc, []ring.InstanceDesc, error) {
args := r.Called(op)
return args.Get(0).([]ring.InstanceDesc), make([]ring.InstanceDesc, 0), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ring.ReplicationSet), args.Error(1)
Expand Down
25 changes: 25 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type ReadRing interface {
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetAllInstanceDescs returns a slice of healthy and unhealthy InstanceDesc.
GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error)

// GetInstanceDescsForOperation returns map of InstanceDesc with instance ID as the keys.
GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error)

Expand Down Expand Up @@ -464,6 +467,28 @@ func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
}, nil
}

// GetAllInstanceDescs implements ReadRing.
func (r *Ring) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
return nil, nil, ErrEmptyRing
}
healthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
unhealthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
storageLastUpdate := r.KVClient.LastUpdateTime(r.key)
for _, instance := range r.ringDesc.Ingesters {
if r.IsHealthy(&instance, op, storageLastUpdate) {
healthyInstances = append(healthyInstances, instance)
} else {
unhealthyInstances = append(unhealthyInstances, instance)
}
}

return healthyInstances, unhealthyInstances, nil
}

// GetInstanceDescsForOperation implements ReadRing.
func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDesc, error) {
r.mtx.RLock()
Expand Down
35 changes: 35 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,41 @@ func TestRing_GetInstanceDescsForOperation(t *testing.T) {
}, instanceDescs)
}

func TestRing_GetAllInstanceDescs(t *testing.T) {
now := time.Now().Unix()
twoMinutesAgo := time.Now().Add(-2 * time.Minute).Unix()

ringDesc := &Desc{Ingesters: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now},
"instance-2": {Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now}, // not healthy state
"instance-3": {Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo}, // heartbeat timed out
}}

ring := Ring{
cfg: Config{HeartbeatTimeout: time.Minute},
ringDesc: ringDesc,
ringTokens: ringDesc.GetTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
KVClient: &MockClient{},
}

testOp := NewOp([]InstanceState{ACTIVE}, nil)

healthyInstanceDescs, unhealthyInstanceDescs, err := ring.GetAllInstanceDescs(testOp)
require.NoError(t, err)
require.EqualValues(t, []InstanceDesc{
{Addr: "127.0.0.1", Tokens: []uint32{1}, State: ACTIVE, Timestamp: now},
}, healthyInstanceDescs)
sort.Slice(unhealthyInstanceDescs, func(i, j int) bool { return unhealthyInstanceDescs[i].Addr < unhealthyInstanceDescs[j].Addr })
require.EqualValues(t, []InstanceDesc{
{Addr: "127.0.0.2", Tokens: []uint32{2}, State: LEAVING, Timestamp: now},
{Addr: "127.0.0.3", Tokens: []uint32{3}, State: ACTIVE, Timestamp: twoMinutesAgo},
}, unhealthyInstanceDescs)
}

func TestRing_GetReplicationSetForOperation(t *testing.T) {
now := time.Now()
g := NewRandomTokenGenerator()
Expand Down
5 changes: 5 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (r *RingMock) GetInstanceDescsForOperation(op Operation) (map[string]Instan
return args.Get(0).(map[string]InstanceDesc), args.Error(1)
}

func (r *RingMock) GetAllInstanceDescs(op Operation) ([]InstanceDesc, []InstanceDesc, error) {
args := r.Called(op)
return args.Get(0).([]InstanceDesc), make([]InstanceDesc, 0), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), args.Error(1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

// keep data.groups are in order
sort.Slice(groups, func(i, j int) bool {
if groups[i].File == groups[j].File {
return groups[i].Name < groups[j].Name
}
return groups[i].File < groups[j].File
})

Expand Down
26 changes: 23 additions & 3 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct {
notifiers map[string]*rulerNotifier
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics

// rules backup
rulesBackupManager *rulesBackupManager

managersTotal prometheus.Gauge
lastReloadSuccessful *prometheus.GaugeVec
lastReloadSuccessfulTimestamp *prometheus.GaugeVec
Expand Down Expand Up @@ -79,7 +82,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
os.Exit(1)
}

return &DefaultMultiTenantManager{
m := &DefaultMultiTenantManager{
cfg: cfg,
notifierCfg: ncfg,
managerFactory: managerFactory,
Expand Down Expand Up @@ -112,7 +115,11 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
}, []string{"user"}),
registry: reg,
logger: logger,
}, nil
}
if cfg.APIEnableRulesBackup {
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
}
return m, nil
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
Expand Down Expand Up @@ -161,8 +168,14 @@ func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
delete(r.ruleCache, user)
}

func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might missed it , but where do we call this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call it inside syncRules

r.manager.BackUpRuleGroups(ctx, backupConfigs)

if r.rulesBackupManager != nil {
r.rulesBackupManager.setRuleGroups(ctx, ruleGroups)
}
}

// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
// the users Prometheus Rules Manager.
// users Prometheus Rules Manager.
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
// Map the files to disk and return the file names to be passed to the users manager if they
// have been updated
Expand Down Expand Up @@ -333,6 +346,13 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
return groups
}

func (r *DefaultMultiTenantManager) GetBackupRules(userID string) rulespb.RuleGroupList {
if r.rulesBackupManager != nil {
return r.rulesBackupManager.getRuleGroups(userID)
}
return nil
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
Expand Down
41 changes: 41 additions & 0 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,47 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")
}

func TestBackupRules(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
waitDurations := []time.Duration{
1 * time.Millisecond,
1 * time.Millisecond,
}
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir, APIEnableRulesBackup: true}, ruleManagerFactory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user1 = "testUser"
const user2 = "testUser2"

require.Equal(t, 0, len(m.GetBackupRules(user1)))
require.Equal(t, 0, len(m.GetBackupRules(user2)))

userRules := map[string]rulespb.RuleGroupList{
user1: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
user2: {
&rulespb.RuleGroupDesc{
Name: "group2",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user1,
},
},
}
m.BackUpRuleGroups(context.TODO(), userRules)
require.Equal(t, userRules[user1], m.GetBackupRules(user1))
require.Equal(t, userRules[user2], m.GetBackupRules(user2))
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/ruler/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ruler

import (
"time"

promRules "github.com/prometheus/prometheus/rules"
)

// mergeGroupStateDesc removes duplicates from the provided []*GroupStateDesc by keeping the GroupStateDesc with the
// latest information. It uses the EvaluationTimestamp of the GroupStateDesc and the EvaluationTimestamp of the
// ActiveRules in a GroupStateDesc to determine the which GroupStateDesc has the latest information.
func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
states := make(map[string]*GroupStateDesc)
rgTime := make(map[string]time.Time)
for _, state := range in {
latestTs := state.EvaluationTimestamp
for _, r := range state.ActiveRules {
if latestTs.Before(r.EvaluationTimestamp) {
latestTs = r.EvaluationTimestamp
}
}
Comment on lines +17 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this PR we are still evaluating all the rules in the replicas as well? if we are not evaluating, this is not required for the instances that are not primary right?

if yes, should we only consider the timestamp of the group? reason why is because the timestamp in the rule is from the time the rule is being evaluated, which can be delayed -> you might be discarding a GroupStateDesc from a good instance in favor of delayed GroupStateDesc just because a rule took a long time to evaluate?

Copy link
Contributor Author

@emanlodovice emanlodovice Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this PR we are still evaluating all the rules in the replicas as well? if we are not evaluating, this is not required for the instances that are not primary right?

only the primary instance will evaluate the rule group. so only the data from the primary instance will have the latest EvaluationTimestamps.

The EvaluationTimestamp of RuleGroupDesc is based on the GetLastEvaluation of the Group. The setter of this value is only called after the Group finished evaluation. This means if you have a rule group that takes 5 minutes to evaluate maybe because the rules inside it are slow or there are just way too many rules, it would take 5 minutes since the first run of the Group before EvaluationTimestamp is populated. If we look only at the Groups EvaluationTimestamp we could be returning the data from the backup (because they will have the same value) during this 5 minutes which will not be accurate because within this 5 minutes it could be that some rules within the Group has already evaluated and has a state set. This is why I opted to checking the Rule's EvaluationTimestamp as well because that timestamp gets updated per rule evaluation and does not wait for the entire Group to finish.

key := promRules.GroupKey(state.Group.Namespace, state.Group.Name)
ts, ok := rgTime[key]
if !ok || ts.Before(latestTs) {
states[key] = state
rgTime[key] = latestTs
}
}
groups := make([]*GroupStateDesc, 0, len(states))
for _, state := range states {
groups = append(groups, state)
}
return groups
}
Loading
Loading