Skip to content

Commit

Permalink
Add protobuf codec for query range and instant query responses (#5527)
Browse files Browse the repository at this point in the history
Co-authored-by: Ahmed Hassan <[email protected]>
  • Loading branch information
afhassan and afhassan authored Sep 25, 2024
1 parent 820c3bf commit 409f065
Show file tree
Hide file tree
Showing 23 changed files with 2,124 additions and 258 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ querier:
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]

# Use compression for metrics query API or instant and range query APIs.
# Supports 'gzip' and '' (disable compression)
# CLI flag: -querier.response-compression
[response_compression: <string> | default = "gzip"]

# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ api:
# CLI flag: -api.build-info-enabled
[build_info_enabled: <boolean> | default = false]

# Choose default codec for querier response serialization. Supports 'json' and
# 'protobuf'.
# CLI flag: -api.querier-default-codec
[querier_default_codec: <string> | default = "json"]

# The server_config configures the HTTP and gRPC server of the launched
# service(s).
[server: <server_config>]
Expand Down Expand Up @@ -3718,6 +3723,11 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]
# Use compression for metrics query API or instant and range query APIs.
# Supports 'gzip' and '' (disable compression)
# CLI flag: -querier.response-compression
[response_compression: <string> | default = "gzip"]
# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,4 @@ Currently experimental features are:
- String interning for metrics labels
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
- Querier: protobuf codec (`-api.querier-default-codec`)
20 changes: 20 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,26 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
})
}

func TestQueryFrontendProtobufCodec(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
testMissingMetricName: false,
querySchedulerEnabled: true,
queryStatsEnabled: true,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
"-api.querier-default-codec": "protobuf",
"-querier.response-compression": "gzip",
})
return cortexConfigFile, flags
},
})
}

func TestQueryFrontendRemoteRead(t *testing.T) {
runQueryFrontendTest(t, queryFrontendTestConfig{
remoteReadEnabled: true,
Expand Down
16 changes: 16 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/klauspost/compress/gzhttp"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
Expand Down Expand Up @@ -73,13 +74,20 @@ type Config struct {
corsRegexString string `yaml:"cors_origin"`

buildInfoEnabled bool `yaml:"build_info_enabled"`

QuerierDefaultCodec string `yaml:"querier_default_codec"`
}

var (
errUnsupportedDefaultCodec = errors.New("unsupported default codec type. Supported types are 'json' and 'protobuf'")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
cfg.RegisterFlagsWithPrefix("", f)
}

Expand All @@ -90,6 +98,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.corsRegexString, prefix+"server.cors-origin", ".*", `Regex for CORS origin. It is fully anchored. Example: 'https?://(domain1|domain2)\.com'`)
}

// validate config
func (cfg *Config) Validate() error {
if cfg.QuerierDefaultCodec != "json" && cfg.QuerierDefaultCodec != "protobuf" {
return errUnsupportedDefaultCodec
}
return nil
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
if cfg.DistributorPushWrapper != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -231,6 +232,9 @@ func NewQuerierHandler(
false,
)

// JSON codec is already installed. Install Protobuf codec to give the option for using either.
api.InstallCodec(codec.ProtobufCodec{})

router := mux.NewRouter()

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (c *Config) Validate(log log.Logger) error {
return errInvalidHTTPPrefix
}

if err := c.API.Validate(); err != nil {
return errors.Wrap(err, "invalid api config")
}
if err := c.Storage.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false)
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand Down Expand Up @@ -472,7 +472,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryRangeMiddlewares,
instantQueryMiddlewares,
prometheusCodec,
instantquery.InstantQueryCodec,
instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec),
t.Overrides,
queryAnalyzer,
t.Cfg.Querier.DefaultEvaluationInterval,
Expand Down
161 changes: 161 additions & 0 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package codec

import (
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

type ProtobufCodec struct{}

func (p ProtobufCodec) ContentType() v1.MIMEType {
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
}

func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {
// Errors are parsed by default json codec
if resp.Error != "" || resp.Data == nil {
return false
}
return true
}

func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
prometheusQueryResponse, err := createPrometheusQueryResponse(resp)
if err != nil {
return []byte{}, err
}
b, err := proto.Marshal(prometheusQueryResponse)
return b, err
}

func createPrometheusQueryResponse(resp *v1.Response) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)

var queryResult tripperware.PrometheusQueryResult
switch string(data.ResultType) {
case model.ValMatrix.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Matrix{
Matrix: &tripperware.Matrix{
SampleStreams: *getMatrixSampleStreams(data),
},
}
case model.ValVector.String():
queryResult.Result = &tripperware.PrometheusQueryResult_Vector{
Vector: &tripperware.Vector{
Samples: *getVectorSamples(data),
},
}
default:
json := jsoniter.ConfigCompatibleWithStandardLibrary
rawBytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
queryResult.Result = &tripperware.PrometheusQueryResult_RawBytes{RawBytes: rawBytes}
}

var stats *tripperware.PrometheusResponseStats
if data.Stats != nil {
builtin := data.Stats.Builtin()
stats = &tripperware.PrometheusResponseStats{Samples: getStats(&builtin)}
}

return &tripperware.PrometheusResponse{
Status: string(resp.Status),
Data: tripperware.PrometheusData{
ResultType: string(data.ResultType),
Result: queryResult,
Stats: stats,
},
ErrorType: string(resp.ErrorType),
Error: resp.Error,
Warnings: resp.Warnings,
}, nil
}

func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
sampleStreamsLen := len(data.Result.(promql.Matrix))
sampleStreams := make([]tripperware.SampleStream, sampleStreamsLen)

for i := 0; i < sampleStreamsLen; i++ {
labelsLen := len(data.Result.(promql.Matrix)[i].Metric)
var labels []cortexpb.LabelAdapter
if labelsLen > 0 {
labels = make([]cortexpb.LabelAdapter, labelsLen)
for j := 0; j < labelsLen; j++ {
labels[j] = cortexpb.LabelAdapter{
Name: data.Result.(promql.Matrix)[i].Metric[j].Name,
Value: data.Result.(promql.Matrix)[i].Metric[j].Value,
}
}
}

samplesLen := len(data.Result.(promql.Matrix)[i].Floats)
var samples []cortexpb.Sample
if samplesLen > 0 {
samples = make([]cortexpb.Sample, samplesLen)
for j := 0; j < samplesLen; j++ {
samples[j] = cortexpb.Sample{
Value: data.Result.(promql.Matrix)[i].Floats[j].F,
TimestampMs: data.Result.(promql.Matrix)[i].Floats[j].T,
}
}
}
sampleStreams[i] = tripperware.SampleStream{Labels: labels, Samples: samples}
}
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

for i := 0; i < vectorSamplesLen; i++ {
labelsLen := len(data.Result.(promql.Vector)[i].Metric)
var labels []cortexpb.LabelAdapter
if labelsLen > 0 {
labels = make([]cortexpb.LabelAdapter, labelsLen)
for j := 0; j < labelsLen; j++ {
labels[j] = cortexpb.LabelAdapter{
Name: data.Result.(promql.Vector)[i].Metric[j].Name,
Value: data.Result.(promql.Vector)[i].Metric[j].Value,
}
}
}

vectorSamples[i] = tripperware.Sample{
Labels: labels,
Sample: &cortexpb.Sample{
TimestampMs: data.Result.(promql.Vector)[i].T,
Value: data.Result.(promql.Vector)[i].F,
},
}
}
return &vectorSamples
}

func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats {
queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep)
queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen)
for i := 0; i < queryableSamplesStatsPerStepLen; i++ {
queryableSamplesStatsPerStep[i] = &tripperware.PrometheusResponseQueryableSamplesStatsPerStep{
Value: builtin.Samples.TotalQueryableSamplesPerStep[i].V,
TimestampMs: builtin.Samples.TotalQueryableSamplesPerStep[i].T,
}
}

statSamples := tripperware.PrometheusResponseSamplesStats{
TotalQueryableSamples: builtin.Samples.TotalQueryableSamples,
TotalQueryableSamplesPerStep: queryableSamplesStatsPerStep,
PeakSamples: int64(builtin.Samples.PeakSamples),
}

return &statSamples
}
9 changes: 9 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// Use compression for metrics query API or instant and range query APIs.
ResponseCompression string `yaml:"response_compression"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"`
Expand Down Expand Up @@ -90,6 +93,7 @@ var (
errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'")
errEmptyTimeRange = errors.New("empty time range")
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -111,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supports 'gzip' and '' (disable compression)")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
Expand All @@ -133,6 +138,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" {
return errUnsupportedResponseCompression
}

if cfg.ShuffleShardingIngestersLookbackPeriod > 0 {
if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter {
return errShuffleShardingLookbackLessThanQueryStoreAfter
Expand Down
Loading

0 comments on commit 409f065

Please sign in to comment.