Skip to content

Commit

Permalink
Reusing Batch Iterators
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Dec 5, 2024
1 parent 7191ecb commit 225fce1
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 37 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e

serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64)
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64)
},
})
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,26 @@ type iterator interface {
}

// NewChunkMergeIterator returns a chunkenc.Iterator that merges Cortex chunks together.
func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
converted := make([]GenericChunk, len(chunks))
for i, c := range chunks {
c := c
converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.NewIterator)
}

return NewGenericChunkMergeIterator(converted)
return NewGenericChunkMergeIterator(it, converted)
}

// NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together.
func NewGenericChunkMergeIterator(chunks []GenericChunk) chunkenc.Iterator {
iter := newMergeIterator(chunks)
func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator {

var underlying iterator

if ia, ok := it.(*iteratorAdapter); ok {
underlying = ia.underlying
}

iter := newMergeIterator(underlying, chunks)
return newIteratorAdapter(iter)
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()

var it chunkenc.Iterator
for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
it = NewChunkMergeIterator(it, chunks, 0, 0)
for it.Next() != chunkenc.ValNone {
it.At()
}
Expand Down Expand Up @@ -108,9 +109,9 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
b.ResetTimer()
b.Run(name, func(b *testing.B) {
b.ReportAllocs()

var it chunkenc.Iterator
for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
it = NewChunkMergeIterator(it, chunks, 0, 0)
i := int64(0)
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
i++
Expand All @@ -132,7 +133,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
chunkTwo := util.GenerateChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc)
chunks := []chunk.Chunk{chunkOne, chunkTwo}

sut := NewChunkMergeIterator(chunks, 0, 0)
sut := NewChunkMergeIterator(nil, chunks, 0, 0)

// Following calls mimics Prometheus's query engine behaviour for VectorSelector.
require.Equal(t, valType, sut.Next())
Expand Down
49 changes: 40 additions & 9 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@ type mergeIterator struct {
currErr error
}

func newMergeIterator(cs []GenericChunk) *mergeIterator {
func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
css := partitionChunks(cs)
its := make([]*nonOverlappingIterator, 0, len(css))
for _, cs := range css {
its = append(its, newNonOverlappingIterator(cs))

var c *mergeIterator
if mIterator, ok := it.(*mergeIterator); ok {
c = mIterator.Reset(css)
} else {
c = &mergeIterator{
h: make(iteratorHeap, 0, len(css)),
batches: make(batchStream, 0, len(css)),
batchesBuf: make(batchStream, len(css)),
}
}

if cap(c.its) < len(css) {
c.its = make([]*nonOverlappingIterator, 0, len(css))
}

c := &mergeIterator{
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)),
batchesBuf: make(batchStream, len(its)),
for _, cs := range css {
c.its = append(c.its, newNonOverlappingIterator(cs))
}

for _, iter := range c.its {
Expand All @@ -52,6 +60,29 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator {
return c
}

func (c *mergeIterator) Reset(ccs [][]GenericChunk) *mergeIterator {
c.its = c.its[:0]
c.h = c.h[:0]
c.batches = c.batches[:0]

if len(c.its) <= cap(c.batchesBuf) {
c.batchesBuf = c.batchesBuf[:len(ccs)]
for i := 0; i < len(ccs); i++ {
c.batchesBuf[i] = promchunk.Batch{}
}
} else {
c.batchesBuf = make(batchStream, len(c.its))
}

for i := 0; i < len(c.nextBatchBuf); i++ {
c.nextBatchBuf[i] = promchunk.Batch{}
}

c.currErr = nil

return c
}

func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType {

// Optimisation to see if the seek is within our current caches batches.
Expand Down
36 changes: 32 additions & 4 deletions pkg/querier/batch/merge_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package batch

import (
"fmt"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"testing"
"time"

Expand All @@ -18,14 +20,40 @@ func TestMergeIter(t *testing.T) {
chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc)
chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc)

iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
iter := newMergeIterator(nil, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
testIter(t, 200, newIteratorAdapter(iter), enc)

iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
iter = newMergeIterator(iter, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
testSeek(t, 200, newIteratorAdapter(iter), enc)
})
}

func BenchmarkMergeIterator(b *testing.B) {
chunks := make([]GenericChunk, 0, 10)
for i := 0; i < 10; i++ {
chunks = append(chunks, mkGenericChunk(b, model.Time(i*25), 120, encoding.PrometheusXorChunk))
}
iter := newMergeIterator(nil, chunks)

for _, r := range []bool{true, false} {
b.Run(fmt.Sprintf("reuse-%t", r), func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if r {
iter = newMergeIterator(iter, chunks)
} else {
iter = newMergeIterator(nil, chunks)
}
a := newIteratorAdapter(iter)
for a.Next() != chunkenc.ValNone {

}
}
})
}
}

func TestMergeHarder(t *testing.T) {
t.Parallel()
forEncodings(t, func(t *testing.T, enc encoding.Encoding) {
Expand All @@ -40,10 +68,10 @@ func TestMergeHarder(t *testing.T) {
chunks = append(chunks, mkGenericChunk(t, from, samples, enc))
from = from.Add(time.Duration(offset) * time.Second)
}
iter := newMergeIterator(chunks)
iter := newMergeIterator(nil, chunks)
testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc)

iter = newMergeIterator(chunks)
iter = newMergeIterator(iter, chunks)
testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc)
})
}
2 changes: 1 addition & 1 deletion pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
)

type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
type chunkIteratorFunc func(it chunkenc.Iterator, chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
4 changes: 2 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo

serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT))
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(it, chunks, model.Time(minT), model.Time(maxT))
},
})
}
Expand Down
15 changes: 6 additions & 9 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
QueryStoreAfter: cfg.QueryStoreAfter,
}
}
queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits)
queryable := NewQueryable(distributorQueryable, ns, cfg, limits)
exemplarQueryable := newDistributorExemplarQueryable(distributor)

lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) {
Expand Down Expand Up @@ -275,13 +275,12 @@ type limiterHolder struct {
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable {
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
q := querier{
now: time.Now(),
mint: mint,
maxt: maxt,
chunkIterFn: chunkIterFn,
limits: limits,
maxQueryIntoFuture: cfg.MaxQueryIntoFuture,
ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength,
Expand All @@ -295,10 +294,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
}

type querier struct {
chunkIterFn chunkIteratorFunc
now time.Time
mint, maxt int64

now time.Time
mint, maxt int64
limits *validation.Overrides
maxQueryIntoFuture time.Duration
distributor QueryableWithFilter
Expand Down Expand Up @@ -683,8 +680,8 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI
for i := range chunksBySeries {
series = append(series, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt))
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(it, chunksBySeries[i], model.Time(mint), model.Time(maxt))
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
for _, queryable := range tc.storeQueriables {
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable})
}
queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides)
queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides)
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Expand Down Expand Up @@ -521,7 +521,7 @@ func TestLimits(t *testing.T) {
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit)
require.NoError(t, err)

queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides)
queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides)
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Expand Down

0 comments on commit 225fce1

Please sign in to comment.