Skip to content

Commit

Permalink
Support Ruler to query Query Frontend (cortexproject#6151)
Browse files Browse the repository at this point in the history
  • Loading branch information
SungJin1212 authored Sep 7, 2024
1 parent 8622767 commit 1523080
Show file tree
Hide file tree
Showing 14 changed files with 763 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
Expand Down
74 changes: 74 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4067,6 +4067,80 @@ The `redis_config` configures the Redis backend cache.
The `ruler_config` configures the Cortex ruler.

```yaml
# [Experimental] GRPC listen address of the Query Frontend, in host:port format.
# If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries
# to Ingesters directly.
# CLI flag: -ruler.frontend-address
[frontend_address: <string> | default = ""]
frontend_client:
# gRPC client max receive message size (bytes).
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# gRPC client max send message size (bytes).
# CLI flag: -ruler.frontendClient.grpc-max-send-msg-size
[max_send_msg_size: <int> | default = 16777216]
# Use compression when sending messages. Supported values are: 'gzip',
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
# CLI flag: -ruler.frontendClient.grpc-compression
[grpc_compression: <string> | default = ""]
# Rate limit for gRPC client; 0 means disabled.
# CLI flag: -ruler.frontendClient.grpc-client-rate-limit
[rate_limit: <float> | default = 0]
# Rate limit burst for gRPC client.
# CLI flag: -ruler.frontendClient.grpc-client-rate-limit-burst
[rate_limit_burst: <int> | default = 0]
# Enable backoff and retry when we hit ratelimits.
# CLI flag: -ruler.frontendClient.backoff-on-ratelimits
[backoff_on_ratelimits: <boolean> | default = false]
backoff_config:
# Minimum delay when backing off.
# CLI flag: -ruler.frontendClient.backoff-min-period
[min_period: <duration> | default = 100ms]
# Maximum delay when backing off.
# CLI flag: -ruler.frontendClient.backoff-max-period
[max_period: <duration> | default = 10s]
# Number of times to backoff and retry before failing.
# CLI flag: -ruler.frontendClient.backoff-retries
[max_retries: <int> | default = 10]
# Enable TLS in the GRPC client. This flag needs to be enabled when any other
# TLS flag is set. If set to false, insecure connection to gRPC server will be
# used.
# CLI flag: -ruler.frontendClient.tls-enabled
[tls_enabled: <boolean> | default = false]
# Path to the client certificate file, which will be used for authenticating
# with the server. Also requires the key path to be configured.
# CLI flag: -ruler.frontendClient.tls-cert-path
[tls_cert_path: <string> | default = ""]
# Path to the key file for the client certificate. Also requires the client
# certificate to be configured.
# CLI flag: -ruler.frontendClient.tls-key-path
[tls_key_path: <string> | default = ""]
# Path to the CA certificates file to validate server certificate against. If
# not set, the host's root CA certificates are used.
# CLI flag: -ruler.frontendClient.tls-ca-path
[tls_ca_path: <string> | default = ""]
# Override the expected name on the server certificate.
# CLI flag: -ruler.frontendClient.tls-server-name
[tls_server_name: <string> | default = ""]
# Skip validating server certificate.
# CLI flag: -ruler.frontendClient.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# URL of alerts return path.
# CLI flag: -ruler.external.url
[external_url: <url> | default = ]
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Cortex is an actively developed project and we want to encourage the introductio

Currently experimental features are:

- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
- Azure blob storage.
- Zone awareness based replication.
Expand Down
66 changes: 66 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,72 @@ func TestRulerKeepFiring(t *testing.T) {
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
}

func TestRulerEvalWithQueryFrontend(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
// We run single ingester only, no replication.
"-distributor.replication-factor": "1",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(ruler, querier))

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "metric"
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
responseJson, err := json.Marshal(rules)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
t.Cfg.Ruler.FrontendTimeout = t.Cfg.Querier.Timeout
t.Cfg.Ruler.PrometheusHTTPPrefix = t.Cfg.API.PrometheusHTTPPrefix
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)

Expand Down
64 changes: 41 additions & 23 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring/client"
util_log "github.com/cortexproject/cortex/pkg/util/log"
promql_util "github.com/cortexproject/cortex/pkg/util/promql"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -157,7 +158,7 @@ type RulesLimits interface {
// EngineQueryFunc returns a new engine query function validating max queryLength.
// Modified from Prometheus rules.EngineQueryFunc
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
// Enforce the max query length.
maxQueryLength := overrides.MaxQueryLength(userID)
Expand All @@ -174,25 +175,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
}
}

q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
if frontendClient != nil {
v, err := frontendClient.InstantQuery(ctx, qs, t)
if err != nil {
return nil, err
}

return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
} else {
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
}
}
Expand Down Expand Up @@ -300,22 +310,30 @@ type RulesManager interface {
}

// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error)

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine promql.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
// Errors from PromQL are always "user" errors.
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)

return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
var client *frontendClient
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)

if cfg.FrontendAddress != "" {
c, err := frontendPool.GetClientFor(cfg.FrontendAddress)
if err != nil {
return nil, err
}
client = c.(*frontendClient)
}
var queryFunc rules.QueryFunc
engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta)
engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta)
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
if cfg.EnableQueryStats {
queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger)
Expand All @@ -340,7 +358,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
DefaultRuleQueryOffset: func() time.Duration {
return overrides.RulerQueryOffset(userID)
},
})
}), nil
}
}

Expand Down
105 changes: 105 additions & 0 deletions pkg/ruler/frontend_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package ruler

import (
"context"
"fmt"
"net/http"
"net/textproto"
"net/url"
"strconv"
"time"

"github.com/go-kit/log/level"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

const (
orgIDHeader = "X-Scope-OrgID"
instantQueryPath = "/api/v1/query"
mimeTypeForm = "application/x-www-form-urlencoded"
contentTypeJSON = "application/json"
)

type FrontendClient struct {
client httpgrpc.HTTPClient
timeout time.Duration
prometheusHTTPPrefix string
jsonDecoder JsonDecoder
}

func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient {
return &FrontendClient{
client: client,
timeout: timeout,
prometheusHTTPPrefix: prometheusHTTPPrefix,
jsonDecoder: JsonDecoder{},
}
}

func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Time) (*httpgrpc.HTTPRequest, error) {
args := make(url.Values)
args.Set("query", qs)
if !ts.IsZero() {
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())

//lint:ignore faillint wrapper around upstream method
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

req := &httpgrpc.HTTPRequest{
Method: http.MethodPost,
Url: p.prometheusHTTPPrefix + instantQueryPath,
Body: body,
Headers: []*httpgrpc.Header{
{Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}},
{Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}},
{Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}},
},
}

return req, nil
}

func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
log, ctx := spanlogger.New(ctx, "FrontendClient.InstantQuery")
defer log.Span.Finish()

req, err := p.makeRequest(ctx, qs, t)
if err != nil {
level.Error(log).Log("err", err, "query", qs)
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

resp, err := p.client.Handle(ctx, req)

if err != nil {
level.Error(log).Log("err", err, "query", qs)
return nil, err
}

vector, warning, err := p.jsonDecoder.Decode(resp.Body)
if err != nil {
level.Error(log).Log("err", err, "query", qs)
return nil, err
}

if len(warning) > 0 {
level.Warn(log).Log("warnings", warning, "query", qs)
}

return vector, nil
}
Loading

0 comments on commit 1523080

Please sign in to comment.