From e359f506f2c02a88a1c860095758a2200a64d286 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 17 Dec 2024 16:33:46 -0800 Subject: [PATCH] Reusing Batch Iterators (#6403) * Reusing Batch Iterators Signed-off-by: alanprot * addressing some comments Signed-off-by: alanprot * lint Signed-off-by: alanprot * fixing reset method Signed-off-by: alanprot --------- Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 4 +-- pkg/querier/batch/batch.go | 15 ++++++--- pkg/querier/batch/batch_test.go | 9 ++--- pkg/querier/batch/merge.go | 50 +++++++++++++++++++++++----- pkg/querier/batch/merge_test.go | 36 +++++++++++++++++--- pkg/querier/chunk_store_queryable.go | 2 +- pkg/querier/distributor_queryable.go | 4 +-- pkg/querier/querier.go | 36 +++----------------- pkg/querier/querier_test.go | 27 ++++++++++++--- 9 files changed, 121 insertions(+), 62 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877f..a03b8fb452 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64) }, }) } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index ca7e1f79ee..79dfe8081e 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -52,19 +52,26 @@ type iterator interface { } // NewChunkMergeIterator returns a chunkenc.Iterator that merges Cortex chunks together. -func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { +func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { converted := make([]GenericChunk, len(chunks)) for i, c := range chunks { c := c converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.NewIterator) } - return NewGenericChunkMergeIterator(converted) + return NewGenericChunkMergeIterator(it, converted) } // NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together. -func NewGenericChunkMergeIterator(chunks []GenericChunk) chunkenc.Iterator { - iter := newMergeIterator(chunks) +func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator { + + var underlying iterator + + if ia, ok := it.(*iteratorAdapter); ok { + underlying = ia.underlying + } + + iter := newMergeIterator(underlying, chunks) return newIteratorAdapter(iter) } diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 30c0a0e38c..4f4b57bfe4 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -55,8 +55,9 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { b.Run(name, func(b *testing.B) { b.ReportAllocs() + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) for it.Next() != chunkenc.ValNone { it.At() } @@ -108,9 +109,9 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() - + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) i := int64(0) for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone { i++ @@ -132,7 +133,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { chunkTwo := util.GenerateChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc) chunks := []chunk.Chunk{chunkOne, chunkTwo} - sut := NewChunkMergeIterator(chunks, 0, 0) + sut := NewChunkMergeIterator(nil, chunks, 0, 0) // Following calls mimics Prometheus's query engine behaviour for VectorSelector. require.Equal(t, valType, sut.Next()) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 8c7ebdf062..27030149d2 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -23,18 +23,27 @@ type mergeIterator struct { currErr error } -func newMergeIterator(cs []GenericChunk) *mergeIterator { +func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator { css := partitionChunks(cs) - its := make([]*nonOverlappingIterator, 0, len(css)) - for _, cs := range css { - its = append(its, newNonOverlappingIterator(cs)) + + var c *mergeIterator + + if mIterator, ok := it.(*mergeIterator); ok && cap(mIterator.its) >= len(css) { + c = mIterator.Reset(len(css)) + } else { + c = &mergeIterator{ + h: make(iteratorHeap, 0, len(css)), + batches: make(batchStream, 0, len(css)), + batchesBuf: make(batchStream, len(css)), + } } - c := &mergeIterator{ - its: its, - h: make(iteratorHeap, 0, len(its)), - batches: make(batchStream, 0, len(its)), - batchesBuf: make(batchStream, len(its)), + if cap(c.its) < len(css) { + c.its = make([]*nonOverlappingIterator, 0, len(css)) + } + + for _, cs := range css { + c.its = append(c.its, newNonOverlappingIterator(cs)) } for _, iter := range c.its { @@ -52,6 +61,29 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { return c } +func (c *mergeIterator) Reset(size int) *mergeIterator { + c.its = c.its[:0] + c.h = c.h[:0] + c.batches = c.batches[:0] + + if size > cap(c.batchesBuf) { + c.batchesBuf = make(batchStream, len(c.its)) + } else { + c.batchesBuf = c.batchesBuf[:size] + for i := 0; i < size; i++ { + c.batchesBuf[i] = promchunk.Batch{} + } + } + + for i := 0; i < len(c.nextBatchBuf); i++ { + c.nextBatchBuf[i] = promchunk.Batch{} + } + + c.currErr = nil + + return c +} + func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType { // Optimisation to see if the seek is within our current caches batches. diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index 8ad0d16df4..d835640d70 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -1,10 +1,12 @@ package batch import ( + "fmt" "testing" "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/cortexproject/cortex/pkg/chunk/encoding" ) @@ -18,14 +20,40 @@ func TestMergeIter(t *testing.T) { chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc) chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) - iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter := newMergeIterator(nil, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testIter(t, 200, newIteratorAdapter(iter), enc) - iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter = newMergeIterator(iter, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testSeek(t, 200, newIteratorAdapter(iter), enc) }) } +func BenchmarkMergeIterator(b *testing.B) { + chunks := make([]GenericChunk, 0, 10) + for i := 0; i < 10; i++ { + chunks = append(chunks, mkGenericChunk(b, model.Time(i*25), 120, encoding.PrometheusXorChunk)) + } + iter := newMergeIterator(nil, chunks) + + for _, r := range []bool{true, false} { + b.Run(fmt.Sprintf("reuse-%t", r), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if r { + iter = newMergeIterator(iter, chunks) + } else { + iter = newMergeIterator(nil, chunks) + } + a := newIteratorAdapter(iter) + for a.Next() != chunkenc.ValNone { + + } + } + }) + } +} + func TestMergeHarder(t *testing.T) { t.Parallel() forEncodings(t, func(t *testing.T, enc encoding.Encoding) { @@ -40,10 +68,10 @@ func TestMergeHarder(t *testing.T) { chunks = append(chunks, mkGenericChunk(t, from, samples, enc)) from = from.Add(time.Duration(offset) * time.Second) } - iter := newMergeIterator(chunks) + iter := newMergeIterator(nil, chunks) testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) - iter = newMergeIterator(chunks) + iter = newMergeIterator(iter, chunks) testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) }) } diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index e072228ac7..0b3fabf011 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -7,4 +7,4 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" ) -type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator +type chunkIteratorFunc func(it chunkenc.Iterator, chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 8ae7c49106..709294e6fa 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -155,8 +155,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT)) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return q.chunkIterFn(it, chunks, model.Time(minT), model.Time(maxT)) }, }) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 784aef3bee..e9a80374cd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -18,18 +18,14 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/engine" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/lazyquery" - seriesset "github.com/cortexproject/cortex/pkg/querier/series" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -188,7 +184,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -275,13 +271,12 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), mint: mint, maxt: maxt, - chunkIterFn: chunkIterFn, limits: limits, maxQueryIntoFuture: cfg.MaxQueryIntoFuture, ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, @@ -295,10 +290,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, } type querier struct { - chunkIterFn chunkIteratorFunc - now time.Time - mint, maxt int64 - + now time.Time + mint, maxt int64 limits *validation.Overrides maxQueryIntoFuture time.Duration distributor QueryableWithFilter @@ -670,24 +663,3 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i return int64(startTime), int64(endTime), nil } - -// Series in the returned set are sorted alphabetically by labels. -func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet { - chunksBySeries := map[string][]chunk.Chunk{} - for _, c := range chunks { - key := client.LabelsToKeyString(c.Metric) - chunksBySeries[key] = append(chunksBySeries[key], c) - } - - series := make([]storage.Series, 0, len(chunksBySeries)) - for i := range chunksBySeries { - series = append(series, &storage.SeriesEntry{ - Lset: chunksBySeries[i][0].Metric, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt)) - }, - }) - } - - return seriesset.NewConcreteSeriesSet(true, series) -} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 044bf0e193..69b542e2d9 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/assert" @@ -32,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" + "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -333,7 +335,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -521,7 +523,7 @@ func TestLimits(t *testing.T) { overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) require.NoError(t, err) - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -1476,7 +1478,7 @@ type mockStoreQuerier struct { // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. -func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (q *mockStoreQuerier) Select(_ context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { // If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded. // That flag is only to be set with blocks storage engine, and this is a protective measure. if sp != nil && sp.Func == "series" { @@ -1488,7 +1490,24 @@ func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.Selec return storage.ErrSeriesSet(err) } - return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc) + cs := make([]storage.Series, 0, len(chunks)) + chunksBySeries := map[string][]chunk.Chunk{} + + for _, c := range chunks { + key := client.LabelsToKeyString(c.Metric) + chunksBySeries[key] = append(chunksBySeries[key], c) + } + + for i, c := range chunksBySeries { + cs = append(cs, &storage.SeriesEntry{ + Lset: chunksBySeries[i][0].Metric, + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return q.chunkIteratorFunc(it, c, model.Time(mint), model.Time(maxt)) + }, + }) + } + + return series.NewConcreteSeriesSet(true, cs) } func (q *mockStoreQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, labels ...*labels.Matcher) ([]string, annotations.Annotations, error) {