Skip to content

Commit

Permalink
addressing some comments
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Dec 5, 2024
1 parent 3886e4b commit 32e0845
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
25 changes: 0 additions & 25 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/lazyquery"
seriesset "github.com/cortexproject/cortex/pkg/querier/series"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -667,24 +663,3 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i

return int64(startTime), int64(endTime), nil
}

// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[string][]chunk.Chunk{}
for _, c := range chunks {
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

series := make([]storage.Series, 0, len(chunksBySeries))
for i := range chunksBySeries {
series = append(series, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(it, chunksBySeries[i], model.Time(mint), model.Time(maxt))
},
})
}

return seriesset.NewConcreteSeriesSet(true, series)
}
23 changes: 21 additions & 2 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -32,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -1476,7 +1478,7 @@ type mockStoreQuerier struct {

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (q *mockStoreQuerier) Select(_ context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded.
// That flag is only to be set with blocks storage engine, and this is a protective measure.
if sp != nil && sp.Func == "series" {
Expand All @@ -1488,7 +1490,24 @@ func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.Selec
return storage.ErrSeriesSet(err)
}

return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc)
cs := make([]storage.Series, 0, len(chunks))
chunksBySeries := map[string][]chunk.Chunk{}

for _, c := range chunks {
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

for i, c := range chunksBySeries {
cs = append(cs, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIteratorFunc(it, c, model.Time(mint), model.Time(maxt))
},
})
}

return series.NewConcreteSeriesSet(true, cs)
}

func (q *mockStoreQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, labels ...*labels.Matcher) ([]string, annotations.Annotations, error) {
Expand Down

0 comments on commit 32e0845

Please sign in to comment.