From 9b1ba233f22077cd76d241426f8cfbb79ac78b3a Mon Sep 17 00:00:00 2001 From: Ahmed Hassan <57634502+afhassan@users.noreply.github.com> Date: Sat, 30 Nov 2024 10:50:31 -0800 Subject: [PATCH] Add support for native histograms in querier protobuf codec (#6368) --- integration/native_histogram_test.go | 272 +++++------ pkg/querier/codec/protobuf_codec.go | 105 ++++- pkg/querier/codec/protobuf_codec_test.go | 425 ++++++++++++++++++ .../instantquery/instant_query_test.go | 72 ++- 4 files changed, 727 insertions(+), 147 deletions(-) create mode 100644 pkg/querier/codec/protobuf_codec_test.go diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index d3f58a20b7..8231991ce8 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -4,6 +4,7 @@ package integration import ( + "fmt" "math/rand" "testing" "time" @@ -21,136 +22,149 @@ import ( func TestNativeHistogramIngestionAndQuery(t *testing.T) { const blockRangePeriod = 5 * time.Second - s, err := e2e.NewScenario(networkName) - require.NoError(t, err) - defer s.Close() - - // Configure the blocks storage to frequently compact TSDB head - // and ship blocks to the storage. - flags := mergeFlags(BlocksStorageFlags(), map[string]string{ - "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), - "-blocks-storage.tsdb.ship-interval": "1s", - "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), - "-blocks-storage.tsdb.enable-native-histograms": "true", - }) - - // Start dependencies. - consul := e2edb.NewConsul() - minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) - require.NoError(t, s.StartAndWaitReady(consul, minio)) - - // Start Cortex components for the write path. - distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") - ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") - require.NoError(t, s.StartAndWaitReady(distributor, ingester)) - - // Wait until the distributor has updated the ring. - require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - - // Push some series to Cortex. - c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") - require.NoError(t, err) - - seriesTimestamp := time.Now() - series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) - histogramIdx1 := rand.Uint32() - series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) - res, err := c.Push(append(series1, series1Float...)) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - - histogramIdx2 := rand.Uint32() - series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) - res, err = c.Push(append(series2, series2Float...)) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - - // Wait until the TSDB head is compacted and shipped to the storage. - // The shipped block contains the 2 series from `series_1` and `series_2` will be in head. - require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) - require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total")) - require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) - require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series")) - - queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") - require.NoError(t, s.Start(queryFrontend)) - - // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. - storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-blocks-storage.bucket-store.sync-interval": "5s", - }), "") - querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-blocks-storage.bucket-store.sync-interval": "1s", - "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) - - // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) - require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) - - // Sleep 3 * bucket sync interval to make sure consistency checker - // doesn't consider block is uploaded recently. - time.Sleep(3 * time.Second) - - // Query back the series. - c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") - require.NoError(t, err) - - expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1)) - expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2)) - result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) - require.NoError(t, err) - require.Equal(t, model.ValMatrix, result.Type()) - m := result.(model.Matrix) - require.Equal(t, 2, m.Len()) - for _, ss := range m { - require.Empty(t, ss.Values) - require.NotEmpty(t, ss.Histograms) - for _, h := range ss.Histograms { - require.NotEmpty(t, h) - require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count)) - require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum)) - } + configs := []map[string]string{ + { + "-api.querier-default-codec": "json", + }, + { + "-api.querier-default-codec": "protobuf", + }, } - result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) - require.NoError(t, err) - require.Equal(t, model.ValMatrix, result.Type()) - m = result.(model.Matrix) - require.Equal(t, 2, m.Len()) - for _, ss := range m { - require.Empty(t, ss.Values) - require.NotEmpty(t, ss.Histograms) - for _, h := range ss.Histograms { - require.NotEmpty(t, h) - require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count)) - require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum)) - } - } - - result, err = c.Query(`series_1`, series2Timestamp) - require.NoError(t, err) - require.Equal(t, model.ValVector, result.Type()) - v := result.(model.Vector) - require.Equal(t, 2, v.Len()) - for _, s := range v { - require.NotNil(t, s.Histogram) - require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count)) - require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum)) - } - - result, err = c.Query(`series_2`, series2Timestamp) - require.NoError(t, err) - require.Equal(t, model.ValVector, result.Type()) - v = result.(model.Vector) - require.Equal(t, 2, v.Len()) - for _, s := range v { - require.NotNil(t, s.Histogram) - require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count)) - require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum)) + for _, config := range configs { + t.Run(fmt.Sprintf("native histograms with %s codec", config["-api.querier-default-codec"]), func(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) + histogramIdx1 := rand.Uint32() + series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err := c.Push(append(series1, series1Float...)) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + histogramIdx2 := rand.Uint32() + series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err = c.Push(append(series2, series2Float...)) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 2 series from `series_1` and `series_2` will be in head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series")) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", mergeFlags(flags, config), "") + require.NoError(t, s.Start(queryFrontend)) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "1s", + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Sleep 3 * bucket sync interval to make sure consistency checker + // doesn't consider block is uploaded recently. + time.Sleep(3 * time.Second) + + // Query back the series. + c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1)) + expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2)) + result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + m := result.(model.Matrix) + require.Equal(t, 2, m.Len()) + for _, ss := range m { + require.Empty(t, ss.Values) + require.NotEmpty(t, ss.Histograms) + for _, h := range ss.Histograms { + require.NotEmpty(t, h) + require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count)) + require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum)) + } + } + + result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + m = result.(model.Matrix) + require.Equal(t, 2, m.Len()) + for _, ss := range m { + require.Empty(t, ss.Values) + require.NotEmpty(t, ss.Histograms) + for _, h := range ss.Histograms { + require.NotEmpty(t, h) + require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count)) + require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum)) + } + } + + result, err = c.Query(`series_1`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v := result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum)) + } + + result, err = c.Query(`series_2`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v = result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum)) + } + }) } } diff --git a/pkg/querier/codec/protobuf_codec.go b/pkg/querier/codec/protobuf_codec.go index b835d573e8..78b1d7c58a 100644 --- a/pkg/querier/codec/protobuf_codec.go +++ b/pkg/querier/codec/protobuf_codec.go @@ -4,6 +4,7 @@ import ( "github.com/gogo/protobuf/proto" jsoniter "github.com/json-iterator/go" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/stats" v1 "github.com/prometheus/prometheus/web/api/v1" @@ -26,6 +27,7 @@ func (p ProtobufCodec) CanEncode(resp *v1.Response) bool { return true } +// ProtobufCodec implementation is derived from https://github.com/prometheus/prometheus/blob/main/web/api/v1/json_codec.go func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) { prometheusQueryResponse, err := createPrometheusQueryResponse(resp) if err != nil { @@ -85,30 +87,54 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream { sampleStreams := make([]tripperware.SampleStream, sampleStreamsLen) for i := 0; i < sampleStreamsLen; i++ { - labelsLen := len(data.Result.(promql.Matrix)[i].Metric) + sampleStream := data.Result.(promql.Matrix)[i] + labelsLen := len(sampleStream.Metric) var labels []cortexpb.LabelAdapter if labelsLen > 0 { labels = make([]cortexpb.LabelAdapter, labelsLen) for j := 0; j < labelsLen; j++ { labels[j] = cortexpb.LabelAdapter{ - Name: data.Result.(promql.Matrix)[i].Metric[j].Name, - Value: data.Result.(promql.Matrix)[i].Metric[j].Value, + Name: sampleStream.Metric[j].Name, + Value: sampleStream.Metric[j].Value, } } } - samplesLen := len(data.Result.(promql.Matrix)[i].Floats) + samplesLen := len(sampleStream.Floats) var samples []cortexpb.Sample if samplesLen > 0 { samples = make([]cortexpb.Sample, samplesLen) for j := 0; j < samplesLen; j++ { samples[j] = cortexpb.Sample{ - Value: data.Result.(promql.Matrix)[i].Floats[j].F, - TimestampMs: data.Result.(promql.Matrix)[i].Floats[j].T, + Value: sampleStream.Floats[j].F, + TimestampMs: sampleStream.Floats[j].T, } } } - sampleStreams[i] = tripperware.SampleStream{Labels: labels, Samples: samples} + + histogramsLen := len(sampleStream.Histograms) + var histograms []tripperware.SampleHistogramPair + if histogramsLen > 0 { + histograms = make([]tripperware.SampleHistogramPair, histogramsLen) + for j := 0; j < histogramsLen; j++ { + bucketsLen := len(sampleStream.Histograms[j].H.NegativeBuckets) + len(sampleStream.Histograms[j].H.PositiveBuckets) + if sampleStream.Histograms[j].H.ZeroCount > 0 { + bucketsLen = len(sampleStream.Histograms[j].H.NegativeBuckets) + len(sampleStream.Histograms[j].H.PositiveBuckets) + 1 + } + buckets := make([]*tripperware.HistogramBucket, bucketsLen) + it := sampleStream.Histograms[j].H.AllBucketIterator() + getBuckets(buckets, it) + histograms[j] = tripperware.SampleHistogramPair{ + TimestampMs: sampleStream.Histograms[j].T, + Histogram: tripperware.SampleHistogram{ + Count: sampleStream.Histograms[j].H.Count, + Sum: sampleStream.Histograms[j].H.Sum, + Buckets: buckets, + }, + } + } + } + sampleStreams[i] = tripperware.SampleStream{Labels: labels, Samples: samples, Histograms: histograms} } return &sampleStreams } @@ -118,29 +144,75 @@ func getVectorSamples(data *v1.QueryData) *[]tripperware.Sample { vectorSamples := make([]tripperware.Sample, vectorSamplesLen) for i := 0; i < vectorSamplesLen; i++ { - labelsLen := len(data.Result.(promql.Vector)[i].Metric) + sample := data.Result.(promql.Vector)[i] + labelsLen := len(sample.Metric) var labels []cortexpb.LabelAdapter if labelsLen > 0 { labels = make([]cortexpb.LabelAdapter, labelsLen) for j := 0; j < labelsLen; j++ { labels[j] = cortexpb.LabelAdapter{ - Name: data.Result.(promql.Vector)[i].Metric[j].Name, - Value: data.Result.(promql.Vector)[i].Metric[j].Value, + Name: sample.Metric[j].Name, + Value: sample.Metric[j].Value, } } } + vectorSamples[i].Labels = labels - vectorSamples[i] = tripperware.Sample{ - Labels: labels, - Sample: &cortexpb.Sample{ - TimestampMs: data.Result.(promql.Vector)[i].T, - Value: data.Result.(promql.Vector)[i].F, - }, + if sample.H != nil { + bucketsLen := len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + if sample.H.ZeroCount > 0 { + bucketsLen = len(sample.H.NegativeBuckets) + len(sample.H.PositiveBuckets) + 1 + } + buckets := make([]*tripperware.HistogramBucket, bucketsLen) + it := sample.H.AllBucketIterator() + getBuckets(buckets, it) + vectorSamples[i].Histogram = &tripperware.SampleHistogramPair{ + TimestampMs: sample.T, + Histogram: tripperware.SampleHistogram{ + Count: sample.H.Count, + Sum: sample.H.Sum, + Buckets: buckets, + }, + } + } else { + vectorSamples[i].Sample = &cortexpb.Sample{ + TimestampMs: sample.T, + Value: sample.F, + } } } return &vectorSamples } +func getBuckets(bucketsList []*tripperware.HistogramBucket, it histogram.BucketIterator[float64]) { + bucketIdx := 0 + for it.Next() { + bucket := it.At() + if bucket.Count == 0 { + continue + } + boundaries := 2 // Exclusive on both sides AKA open interval. + if bucket.LowerInclusive { + if bucket.UpperInclusive { + boundaries = 3 // Inclusive on both sides AKA closed interval. + } else { + boundaries = 1 // Inclusive only on lower end AKA right open. + } + } else { + if bucket.UpperInclusive { + boundaries = 0 // Inclusive only on upper end AKA left open. + } + } + bucketsList[bucketIdx] = &tripperware.HistogramBucket{ + Boundaries: int32(boundaries), + Lower: bucket.Lower, + Upper: bucket.Upper, + Count: bucket.Count, + } + bucketIdx += 1 + } +} + func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSamplesStats { queryableSamplesStatsPerStepLen := len(builtin.Samples.TotalQueryableSamplesPerStep) queryableSamplesStatsPerStep := make([]*tripperware.PrometheusResponseQueryableSamplesStatsPerStep, queryableSamplesStatsPerStepLen) @@ -156,6 +228,5 @@ func getStats(builtin *stats.BuiltinStats) *tripperware.PrometheusResponseSample TotalQueryableSamplesPerStep: queryableSamplesStatsPerStep, PeakSamples: int64(builtin.Samples.PeakSamples), } - return &statSamples } diff --git a/pkg/querier/codec/protobuf_codec_test.go b/pkg/querier/codec/protobuf_codec_test.go new file mode 100644 index 0000000000..54fa561fd5 --- /dev/null +++ b/pkg/querier/codec/protobuf_codec_test.go @@ -0,0 +1,425 @@ +package codec + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" +) + +func TestProtobufCodec_Encode(t *testing.T) { + tests := []struct { + data interface{} + expected *tripperware.PrometheusResponse + }{ + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeVector, + Result: promql.Vector{ + promql.Sample{ + Metric: labels.FromStrings("__name__", "foo"), + T: 1000, + F: 1, + }, + promql.Sample{ + Metric: labels.FromStrings("__name__", "bar"), + T: 2000, + F: 2, + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Sample: &cortexpb.Sample{Value: 1, TimestampMs: 1000}, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Sample: &cortexpb.Sample{Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeScalar, + Result: promql.Scalar{T: 1000, V: 1}, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValScalar.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1,"1"]}`), + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeMatrix, + Result: promql.Matrix{ + promql.Series{ + Metric: labels.FromStrings("__name__", "foo"), + Floats: []promql.FPoint{{F: 1, T: 1000}}, + }, + promql.Series{ + Metric: labels.FromStrings("__name__", "bar"), + Floats: []promql.FPoint{{F: 2, T: 2000}}, + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + }, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 2, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeMatrix, + Result: promql.Matrix{ + promql.Series{ + Floats: []promql.FPoint{{F: 1, T: 1000}}, + Metric: labels.FromStrings("__name__", "foo"), + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeMatrix, + Result: promql.Matrix{ + promql.Series{ + Metric: labels.Labels{ + {Name: "__name__", Value: "foo"}, + {Name: "__job__", Value: "bar"}, + }, + Floats: []promql.FPoint{ + {F: 0.14, T: 18555000}, + {F: 2.9, T: 18556000}, + {F: 30, T: 18557000}, + }, + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + {Name: "__job__", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 0.14, TimestampMs: 18555000}, + {Value: 2.9, TimestampMs: 18556000}, + {Value: 30, TimestampMs: 18557000}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeMatrix, + Result: promql.Matrix{ + promql.Series{ + Histograms: []promql.HPoint{{H: &histogram.FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.001, + ZeroCount: 12, + Count: 10, + Sum: 20, + PositiveSpans: []histogram.Span{ + {Offset: 3, Length: 2}, + {Offset: 1, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []float64{1, 2, 2, 1, 1}, + NegativeBuckets: []float64{2, 1}, + }, T: 1000}}, + Metric: labels.FromStrings("__name__", "foo"), + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValMatrix.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Histograms: []tripperware.SampleHistogramPair{ + { + TimestampMs: 1000, + Histogram: tripperware.SampleHistogram{ + Count: 10, + Sum: 20, + Buckets: []*tripperware.HistogramBucket{ + { + Boundaries: 1, + Upper: -1.414213562373095, + Lower: -1.6817928305074288, + Count: 1, + }, + { + Boundaries: 1, + Upper: -1.189207115002721, + Lower: -1.414213562373095, + Count: 2, + }, + { + Boundaries: 3, + Upper: 0.001, + Lower: -0.001, + Count: 12, + }, + { + Boundaries: 0, + Upper: 1.6817928305074288, + Lower: 1.414213562373095, + Count: 1, + }, + { + Boundaries: 0, + Upper: 2, + Lower: 1.6817928305074288, + Count: 2, + }, + { + Boundaries: 0, + Upper: 2.82842712474619, + Lower: 2.378414230005442, + Count: 2, + }, + { + Boundaries: 0, + Upper: 3.3635856610148576, + Lower: 2.82842712474619, + Count: 1, + }, + { + Boundaries: 0, + Upper: 4, + Lower: 3.3635856610148576, + Count: 1, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + data: &v1.QueryData{ + ResultType: parser.ValueTypeVector, + Result: promql.Vector{ + promql.Sample{ + Metric: labels.FromStrings("__name__", "foo"), + T: 1000, + H: &histogram.FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.001, + ZeroCount: 12, + Count: 10, + Sum: 20, + PositiveSpans: []histogram.Span{ + {Offset: 3, Length: 2}, + {Offset: 1, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []float64{1, 2, 2, 1, 1}, + NegativeBuckets: []float64{2, 1}, + }, + }, + }, + }, + expected: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "foo"}, + }, + Histogram: &tripperware.SampleHistogramPair{ + TimestampMs: 1000, + Histogram: tripperware.SampleHistogram{ + Count: 10, + Sum: 20, + Buckets: []*tripperware.HistogramBucket{ + { + Boundaries: 1, + Upper: -1.414213562373095, + Lower: -1.6817928305074288, + Count: 1, + }, + { + Boundaries: 1, + Upper: -1.189207115002721, + Lower: -1.414213562373095, + Count: 2, + }, + { + Boundaries: 3, + Upper: 0.001, + Lower: -0.001, + Count: 12, + }, + { + Boundaries: 0, + Upper: 1.6817928305074288, + Lower: 1.414213562373095, + Count: 1, + }, + { + Boundaries: 0, + Upper: 2, + Lower: 1.6817928305074288, + Count: 2, + }, + { + Boundaries: 0, + Upper: 2.82842712474619, + Lower: 2.378414230005442, + Count: 2, + }, + { + Boundaries: 0, + Upper: 3.3635856610148576, + Lower: 2.82842712474619, + Count: 1, + }, + { + Boundaries: 0, + Upper: 4, + Lower: 3.3635856610148576, + Count: 1, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + codec := ProtobufCodec{} + for _, test := range tests { + body, err := codec.Encode(&v1.Response{ + Status: tripperware.StatusSuccess, + Data: test.data, + }) + require.NoError(t, err) + b, err := proto.Marshal(test.expected) + require.NoError(t, err) + require.Equal(t, string(b), string(body)) + } +} diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 718bc460ab..8d8ed8fee7 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -224,6 +224,77 @@ func TestResponse(t *testing.T) { { jsonBody: testHistogramResponse, }, + { + jsonBody: testHistogramResponse, + promBody: &tripperware.PrometheusResponse{ + Status: tripperware.StatusSuccess, + Data: tripperware.PrometheusData{ + ResultType: model.ValVector.String(), + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: "prometheus_http_request_duration_seconds"}, + {Name: "handler", Value: "/metrics"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "prometheus"}, + }, + Histogram: &tripperware.SampleHistogramPair{ + TimestampMs: 1719528871898, + Histogram: tripperware.SampleHistogram{ + Count: 6342, + Sum: 43.31319875499995, + Buckets: []*tripperware.HistogramBucket{ + {Boundaries: 0, Upper: 0.0015060652591874421, Lower: 0.0013810679320049755, Count: 1}, + {Boundaries: 0, Upper: 0.001642375811042411, Lower: 0.0015060652591874421, Count: 7}, + {Boundaries: 0, Upper: 0.0017910235218841233, Lower: 0.001642375811042411, Count: 5}, + {Boundaries: 0, Upper: 0.001953125, Lower: 0.0017910235218841233, Count: 13}, + {Boundaries: 0, Upper: 0.0021298979153618314, Lower: 0.001953125, Count: 19}, + {Boundaries: 0, Upper: 0.0023226701464896895, Lower: 0.0021298979153618314, Count: 13}, + {Boundaries: 0, Upper: 0.002532889755177753, Lower: 0.0023226701464896895, Count: 13}, + {Boundaries: 0, Upper: 0.002762135864009951, Lower: 0.002532889755177753, Count: 15}, + {Boundaries: 0, Upper: 0.0030121305183748843, Lower: 0.002762135864009951, Count: 12}, + {Boundaries: 0, Upper: 0.003284751622084822, Lower: 0.0030121305183748843, Count: 34}, + {Boundaries: 0, Upper: 0.0035820470437682465, Lower: 0.003284751622084822, Count: 188}, + {Boundaries: 0, Upper: 0.00390625, Lower: 0.0035820470437682465, Count: 372}, + {Boundaries: 0, Upper: 0.004259795830723663, Lower: 0.00390625, Count: 400}, + {Boundaries: 0, Upper: 0.004645340292979379, Lower: 0.004259795830723663, Count: 411}, + {Boundaries: 0, Upper: 0.005065779510355506, Lower: 0.004645340292979379, Count: 425}, + {Boundaries: 0, Upper: 0.005524271728019902, Lower: 0.005065779510355506, Count: 425}, + {Boundaries: 0, Upper: 0.0060242610367497685, Lower: 0.005524271728019902, Count: 521}, + {Boundaries: 0, Upper: 0.006569503244169644, Lower: 0.0060242610367497685, Count: 621}, + {Boundaries: 0, Upper: 0.007164094087536493, Lower: 0.006569503244169644, Count: 593}, + {Boundaries: 0, Upper: 0.0078125, Lower: 0.007164094087536493, Count: 506}, + {Boundaries: 0, Upper: 0.008519591661447326, Lower: 0.0078125, Count: 458}, + {Boundaries: 0, Upper: 0.009290680585958758, Lower: 0.008519591661447326, Count: 346}, + {Boundaries: 0, Upper: 0.010131559020711013, Lower: 0.009290680585958758, Count: 285}, + {Boundaries: 0, Upper: 0.011048543456039804, Lower: 0.010131559020711013, Count: 196}, + {Boundaries: 0, Upper: 0.012048522073499537, Lower: 0.011048543456039804, Count: 129}, + {Boundaries: 0, Upper: 0.013139006488339287, Lower: 0.012048522073499537, Count: 85}, + {Boundaries: 0, Upper: 0.014328188175072986, Lower: 0.013139006488339287, Count: 65}, + {Boundaries: 0, Upper: 0.015625, Lower: 0.014328188175072986, Count: 54}, + {Boundaries: 0, Upper: 0.01703918332289465, Lower: 0.015625, Count: 53}, + {Boundaries: 0, Upper: 0.018581361171917516, Lower: 0.01703918332289465, Count: 20}, + {Boundaries: 0, Upper: 0.020263118041422026, Lower: 0.018581361171917516, Count: 21}, + {Boundaries: 0, Upper: 0.022097086912079608, Lower: 0.020263118041422026, Count: 15}, + {Boundaries: 0, Upper: 0.024097044146999074, Lower: 0.022097086912079608, Count: 11}, + {Boundaries: 0, Upper: 0.026278012976678575, Lower: 0.024097044146999074, Count: 2}, + {Boundaries: 0, Upper: 0.028656376350145972, Lower: 0.026278012976678575, Count: 3}, + {Boundaries: 0, Upper: 0.03125, Lower: 0.028656376350145972, Count: 3}, + {Boundaries: 0, Upper: 0.044194173824159216, Lower: 0.04052623608284405, Count: 2}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, { jsonBody: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, promBody: &tripperware.PrometheusResponse{ @@ -359,7 +430,6 @@ func TestResponse(t *testing.T) { tc := tc t.Run(strconv.Itoa(i), func(t *testing.T) { t.Parallel() - var response *http.Response if tc.promBody != nil { protobuf, err := proto.Marshal(tc.promBody)