From 4aaea68f822a81b46dd881a6fe261722b709116b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 11 May 2024 22:29:43 -0700 Subject: [PATCH 1/5] Implement histogram iterators for batch Signed-off-by: Ben Ye --- pkg/chunk/chunk.go | 46 ++++++--- pkg/chunk/encoding/encoding.go | 39 ++++++- pkg/chunk/iterator.go | 19 ++-- pkg/querier/batch/batch.go | 56 +++++++--- pkg/querier/batch/batch_test.go | 61 ++++++++--- pkg/querier/batch/chunk.go | 29 +++--- pkg/querier/batch/chunk_test.go | 119 ++++++++++++++++------ pkg/querier/batch/merge.go | 20 ++-- pkg/querier/batch/merge_test.go | 8 +- pkg/querier/batch/non_overlapping.go | 22 ++-- pkg/querier/batch/non_overlapping_test.go | 8 +- pkg/querier/batch/stream.go | 93 +++++++++++------ pkg/querier/batch/stream_test.go | 118 ++++++++++++++------- pkg/util/histogram/testutils.go | 55 ++++++++++ 14 files changed, 502 insertions(+), 191 deletions(-) create mode 100644 pkg/util/histogram/testutils.go diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 2d36192ee0..77662b7d20 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -3,6 +3,7 @@ package chunk import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -45,24 +46,37 @@ type prometheusChunkIterator struct { it chunkenc.Iterator } -func (p *prometheusChunkIterator) Scan() bool { - return p.it.Next() != chunkenc.ValNone +func (p *prometheusChunkIterator) Scan() chunkenc.ValueType { + return p.it.Next() } -func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool { +func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { // FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator, // otherwise we cannot guarantee OLDEST. p.it = p.c.Iterator(p.it) - return p.it.Seek(int64(time)) != chunkenc.ValNone + return p.it.Seek(int64(time)) } -func (p *prometheusChunkIterator) Batch(size int) Batch { +func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Batch { var batch Batch j := 0 for j < size { - t, v := p.it.At() - batch.Timestamps[j] = t - batch.Values[j] = v + switch valType { + case chunkenc.ValNone: + break + case chunkenc.ValFloat: + t, v := p.it.At() + batch.Timestamps[j] = t + batch.Values[j] = v + case chunkenc.ValHistogram: + t, v := p.it.AtHistogram(nil) + batch.Timestamps[j] = t + batch.Histograms[j] = v + case chunkenc.ValFloatHistogram: + t, v := p.it.AtFloatHistogram(nil) + batch.Timestamps[j] = t + batch.FloatHistograms[j] = v + } j++ if j < size && p.it.Next() == chunkenc.ValNone { break @@ -70,6 +84,7 @@ func (p *prometheusChunkIterator) Batch(size int) Batch { } batch.Index = 0 batch.Length = j + batch.ValType = valType return batch } @@ -79,7 +94,14 @@ func (p *prometheusChunkIterator) Err() error { type errorIterator string -func (e errorIterator) Scan() bool { return false } -func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false } -func (e errorIterator) Batch(size int) Batch { panic("no values") } -func (e errorIterator) Err() error { return errors.New(string(e)) } +func (e errorIterator) Scan() chunkenc.ValueType { return chunkenc.ValNone } +func (e errorIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { return chunkenc.ValNone } +func (e errorIterator) Value() model.SamplePair { panic("no values") } +func (e errorIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + panic("no values") +} +func (e errorIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + panic("no values") +} +func (e errorIterator) Batch(size int, valType chunkenc.ValueType) Batch { panic("no values") } +func (e errorIterator) Err() error { return errors.New(string(e)) } diff --git a/pkg/chunk/encoding/encoding.go b/pkg/chunk/encoding/encoding.go index 9a94d1e716..8dde3eb7a0 100644 --- a/pkg/chunk/encoding/encoding.go +++ b/pkg/chunk/encoding/encoding.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -// Encoding defines which encoding we are using, delta, doubledelta, or varbit +// Encoding defines which encoding we are using. type Encoding byte // String implements flag.Value. @@ -26,21 +26,46 @@ func (e Encoding) PromChunkEncoding() chunkenc.Encoding { return chunkenc.EncNone } +func (e Encoding) ChunkValueType() chunkenc.ValueType { + if known, found := encodings[e]; found { + return known.ValueType + } + return chunkenc.ValNone +} + const ( // PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk. // 4 is the magic value for backwards-compatibility with previous iota-based constants. PrometheusXorChunk Encoding = 4 + // PrometheusHistogramChunk is a wrapper around Prometheus histogram chunk. + // 5 is the magic value for backwards-compatibility with previous iota-based constants. + PrometheusHistogramChunk Encoding = 5 + // PrometheusFloatHistogramChunk is a wrapper around Prometheus float histogram chunk. + // 6 is the magic value for backwards-compatibility with previous iota-based constants. + PrometheusFloatHistogramChunk Encoding = 6 ) type encoding struct { - Name string - Encoding chunkenc.Encoding + Name string + Encoding chunkenc.Encoding + ValueType chunkenc.ValueType } var encodings = map[Encoding]encoding{ PrometheusXorChunk: { - Name: "PrometheusXorChunk", - Encoding: chunkenc.EncXOR, + Name: "PrometheusXorChunk", + Encoding: chunkenc.EncXOR, + ValueType: chunkenc.ValFloat, + }, + PrometheusHistogramChunk: { + Name: "PrometheusHistogramChunk", + Encoding: chunkenc.EncHistogram, + ValueType: chunkenc.ValHistogram, + }, + PrometheusFloatHistogramChunk: { + Name: "PrometheusFloatHistogramChunk", + Encoding: chunkenc.EncFloatHistogram, + ValueType: chunkenc.ValFloatHistogram, }, } @@ -48,6 +73,10 @@ func FromPromChunkEncoding(enc chunkenc.Encoding) (Encoding, error) { switch enc { case chunkenc.EncXOR: return PrometheusXorChunk, nil + case chunkenc.EncHistogram: + return PrometheusHistogramChunk, nil + case chunkenc.EncFloatHistogram: + return PrometheusFloatHistogramChunk, nil } return Encoding(0), errors.Errorf("unknown Prometheus chunk encoding: %v", enc) } diff --git a/pkg/chunk/iterator.go b/pkg/chunk/iterator.go index 5e41e211a6..331fc15176 100644 --- a/pkg/chunk/iterator.go +++ b/pkg/chunk/iterator.go @@ -2,6 +2,8 @@ package chunk import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" ) // Iterator enables efficient access to the content of a chunk. It is @@ -13,14 +15,14 @@ type Iterator interface { // chunk. Otherwise, it is the value following the last value scanned or // found (by one of the Find... methods). Returns false if either the // end of the chunk is reached or an error has occurred. - Scan() bool + Scan() chunkenc.ValueType // Finds the oldest value at or after the provided time. Returns false // if either the chunk contains no value at or after the provided time, // or an error has occurred. - FindAtOrAfter(model.Time) bool + FindAtOrAfter(model.Time) chunkenc.ValueType // Returns a batch of the provisded size; NB not idempotent! Should only be called // once per Scan. - Batch(size int) Batch + Batch(size int, valType chunkenc.ValueType) Batch // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. Err() error @@ -33,8 +35,11 @@ const BatchSize = 12 // Batch is a sorted set of (timestamp, value) pairs. They are intended to be // small, and passed by value. type Batch struct { - Timestamps [BatchSize]int64 - Values [BatchSize]float64 - Index int - Length int + Timestamps [BatchSize]int64 + Values [BatchSize]float64 + Histograms [BatchSize]*histogram.Histogram + FloatHistograms [BatchSize]*histogram.FloatHistogram + Index int + Length int + ValType chunkenc.ValueType } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index c1d998bea6..2cf37f9ff9 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -2,10 +2,10 @@ package batch import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/querier/iterators" ) // GenericChunk is a generic chunk used by the batch iterator, in order to make the batch @@ -32,10 +32,10 @@ func (c GenericChunk) Iterator(reuse chunk.Iterator) chunk.Iterator { // iterator iterates over batches. type iterator interface { // Seek to the batch at (or after) time t. - Seek(t int64, size int) bool + Seek(t int64, size int) chunkenc.ValueType // Next moves to the next batch. - Next(size int) bool + Next(size int) chunkenc.ValueType // AtTime returns the start time of the next batch. Must only be called after // Seek or Next have returned true. @@ -78,37 +78,41 @@ type iteratorAdapter struct { } func newIteratorAdapter(underlying iterator) chunkenc.Iterator { - return iterators.NewCompatibleChunksIterator(&iteratorAdapter{ + return &iteratorAdapter{ batchSize: 1, underlying: underlying, - }) + } } // Seek implements chunkenc.Iterator. -func (a *iteratorAdapter) Seek(t int64) bool { +func (a *iteratorAdapter) Seek(t int64) chunkenc.ValueType { // Optimisation: fulfill the seek using current batch if possible. if a.curr.Length > 0 && a.curr.Index < a.curr.Length { if t <= a.curr.Timestamps[a.curr.Index] { //In this case, the interface's requirement is met, so state of this //iterator does not need any change. - return true + return a.curr.ValType } else if t <= a.curr.Timestamps[a.curr.Length-1] { //In this case, some timestamp between current sample and end of batch can fulfill //the seek. Let's find it. for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] { a.curr.Index++ } - return true + return a.curr.ValType } else if t <= a.underlying.MaxCurrentChunkTime() { // In this case, some timestamp inside the current underlying chunk can fulfill the seek. // In this case we will call next until we find the sample as it will be faster than calling // `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk. // See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45 // https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95 - for a.Next() { + for { + valType := a.Next() + if valType == chunkenc.ValNone { + break + } if t <= a.curr.Timestamps[a.curr.Index] { - return true + return valType } } } @@ -116,24 +120,29 @@ func (a *iteratorAdapter) Seek(t int64) bool { a.curr.Length = -1 a.batchSize = 1 - if a.underlying.Seek(t, a.batchSize) { + if valType := a.underlying.Seek(t, a.batchSize); valType != chunkenc.ValNone { a.curr = a.underlying.Batch() - return a.curr.Index < a.curr.Length + if a.curr.Index < a.curr.Length { + return a.curr.ValType + } } - return false + return chunkenc.ValNone } // Next implements chunkenc.Iterator. -func (a *iteratorAdapter) Next() bool { +func (a *iteratorAdapter) Next() chunkenc.ValueType { a.curr.Index++ - for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) { + for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) != chunkenc.ValNone { a.curr = a.underlying.Batch() a.batchSize = a.batchSize * 2 if a.batchSize > chunk.BatchSize { a.batchSize = chunk.BatchSize } } - return a.curr.Index < a.curr.Length + if a.curr.Index < a.curr.Length { + return a.curr.ValType + } + return chunkenc.ValNone } // At implements chunkenc.Iterator. @@ -145,3 +154,18 @@ func (a *iteratorAdapter) At() (int64, float64) { func (a *iteratorAdapter) Err() error { return nil } + +// AtHistogram implements chunkenc.Iterator. +func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index] +} + +// AtFloatHistogram implements chunkenc.Iterator. +func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index] +} + +// AtT implements chunkenc.Iterator. +func (a *iteratorAdapter) AtT() int64 { + return a.curr.Timestamps[a.curr.Index] +} diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 7f40afcf79..801e28008d 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" ) func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { @@ -26,6 +27,18 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, } for _, scenario := range scenarios { @@ -37,6 +50,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { chunks := createChunks(b, step, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc) + b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() @@ -90,6 +104,7 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { chunks := createChunks(b, scenario.scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc) + b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() @@ -105,20 +120,38 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { } func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { - t.Parallel() - chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk) - chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk) - chunks := []chunk.Chunk{chunkOne, chunkTwo} - - sut := NewChunkMergeIterator(chunks, 0, 0) - - // Following calls mimics Prometheus's query engine behaviour for VectorSelector. - require.True(t, sut.Next() != chunkenc.ValNone) - require.True(t, sut.Seek(0) != chunkenc.ValNone) - - actual, val := sut.At() - require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value. - require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + histograms := histogram_util.GenerateTestHistograms(1000, 1000, 1, 5, 20) + for _, enc := range []promchunk.Encoding{ + promchunk.PrometheusXorChunk, + promchunk.PrometheusHistogramChunk, + promchunk.PrometheusFloatHistogramChunk, + } { + valType := enc.ChunkValueType() + chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, enc) + chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc) + chunks := []chunk.Chunk{chunkOne, chunkTwo} + + sut := NewChunkMergeIterator(chunks, 0, 0) + + // Following calls mimics Prometheus's query engine behaviour for VectorSelector. + require.Equal(t, valType, sut.Next()) + require.Equal(t, valType, sut.Seek(0)) + + switch enc { + case promchunk.PrometheusXorChunk: + actual, val := sut.At() + require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value. + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + case promchunk.PrometheusHistogramChunk: + actual, val := sut.AtHistogram(nil) + require.Equal(t, histograms[0], val) + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + case promchunk.PrometheusFloatHistogramChunk: + actual, val := sut.AtFloatHistogram(nil) + require.Equal(t, histograms[0].ToFloat(nil), val) + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + } + } } func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk { diff --git a/pkg/querier/batch/chunk.go b/pkg/querier/batch/chunk.go index 96e9de4db3..f546d3d0f3 100644 --- a/pkg/querier/batch/chunk.go +++ b/pkg/querier/batch/chunk.go @@ -2,6 +2,7 @@ package batch import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunkenc" promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -27,11 +28,11 @@ func (i *chunkIterator) MaxCurrentChunkTime() int64 { // Seek advances the iterator forward to the value at or after // the given timestamp. -func (i *chunkIterator) Seek(t int64, size int) bool { +func (i *chunkIterator) Seek(t int64, size int) chunkenc.ValueType { // We assume seeks only care about a specific window; if this chunk doesn't // contain samples in that window, we can shortcut. if i.chunk.MaxTime < t { - return false + return chunkenc.ValNone } // If the seek is to the middle of the current batch, and size fits, we can @@ -42,23 +43,27 @@ func (i *chunkIterator) Seek(t int64, size int) bool { i.batch.Index++ } if i.batch.Index+size < i.batch.Length { - return true + return i.batch.ValType } } - if i.it.FindAtOrAfter(model.Time(t)) { - i.batch = i.it.Batch(size) - return i.batch.Length > 0 + if valueType := i.it.FindAtOrAfter(model.Time(t)); valueType != chunkenc.ValNone { + i.batch = i.it.Batch(size, valueType) + if i.batch.Length > 0 { + return valueType + } } - return false + return chunkenc.ValNone } -func (i *chunkIterator) Next(size int) bool { - if i.it.Scan() { - i.batch = i.it.Batch(size) - return i.batch.Length > 0 +func (i *chunkIterator) Next(size int) chunkenc.ValueType { + if valueType := i.it.Scan(); valueType != chunkenc.ValNone { + i.batch = i.it.Batch(size, valueType) + if i.batch.Length > 0 { + return valueType + } } - return false + return chunkenc.ValNone } func (i *chunkIterator) AtTime() int64 { diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 2f137f956f..3b5f85d667 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" ) const ( @@ -25,16 +26,18 @@ func TestChunkIter(t *testing.T) { iter := &chunkIterator{} iter.reset(chunk) - testIter(t, 100, newIteratorAdapter(iter)) + testIter(t, 100, newIteratorAdapter(iter), enc) iter.reset(chunk) - testSeek(t, 100, newIteratorAdapter(iter)) + testSeek(t, 100, newIteratorAdapter(iter), enc) }) } func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { for _, enc := range []promchunk.Encoding{ promchunk.PrometheusXorChunk, + promchunk.PrometheusHistogramChunk, + promchunk.PrometheusFloatHistogramChunk, } { enc := enc t.Run(enc.String(), func(t *testing.T) { @@ -48,14 +51,34 @@ func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int metric := labels.Labels{ {Name: model.MetricNameLabel, Value: "foo"}, } - pc := chunkenc.NewXORChunk() - appender, err := pc.Appender() + pe := enc.PromChunkEncoding() + pc, err := chunkenc.NewEmptyChunk(pe) require.NoError(t, err) + appender, err := pc.Appender() ts := from - for i := 0; i < points; i++ { - appender.Append(int64(ts), float64(ts)) - ts = ts.Add(step) + + switch pe { + case chunkenc.EncXOR: + for i := 0; i < points; i++ { + appender.Append(int64(ts), float64(ts)) + ts = ts.Add(step) + } + case chunkenc.EncHistogram: + histograms := histogram_util.GenerateTestHistograms(int(from), int(step/time.Millisecond), points, 5, 20) + for i := 0; i < points; i++ { + _, _, appender, err = appender.AppendHistogram(nil, int64(ts), histograms[i], true) + require.NoError(t, err) + ts = ts.Add(step) + } + case chunkenc.EncFloatHistogram: + histograms := histogram_util.GenerateTestHistograms(int(from), int(step/time.Millisecond), points, 5, 20) + for i := 0; i < points; i++ { + _, _, appender, err = appender.AppendFloatHistogram(nil, int64(ts), histograms[i].ToFloat(nil), true) + require.NoError(t, err) + ts = ts.Add(step) + } } + ts = ts.Add(-step) // undo the add that we did just before exiting the loop return chunk.NewChunk(metric, pc, from, ts) } @@ -65,34 +88,71 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchu return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.NewIterator) } -func testIter(t require.TestingT, points int, iter chunkenc.Iterator) { +func testIter(t require.TestingT, points int, iter chunkenc.Iterator, enc promchunk.Encoding) { + histograms := histogram_util.GenerateTestHistograms(0, 1000, points, 5, 20) ets := model.TimeFromUnix(0) for i := 0; i < points; i++ { - require.NotEqual(t, iter.Next(), chunkenc.ValNone, strconv.Itoa(i)) - ts, v := iter.At() - require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) - require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + require.Equal(t, iter.Next(), enc.ChunkValueType(), strconv.Itoa(i)) + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i], v, strconv.Itoa(i)) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i].ToFloat(nil), v, strconv.Itoa(i)) + } ets = ets.Add(step) } require.Equal(t, iter.Next(), chunkenc.ValNone) } -func testSeek(t require.TestingT, points int, iter chunkenc.Iterator) { +func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, enc promchunk.Encoding) { + histograms := histogram_util.GenerateTestHistograms(0, 1000, points, 5, 20) for i := 0; i < points; i += points / 10 { ets := int64(i * int(step/time.Millisecond)) - require.NotEqual(t, iter.Seek(ets), chunkenc.ValNone) - ts, v := iter.At() - require.EqualValues(t, ets, ts) - require.EqualValues(t, v, float64(ets)) + require.Equal(t, iter.Seek(ets), enc.ChunkValueType(), strconv.Itoa(i)) + + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i], v, strconv.Itoa(i)) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i].ToFloat(nil), v, strconv.Itoa(i)) + } require.NoError(t, iter.Err()) for j := i + 1; j < i+points/10; j++ { ets := int64(j * int(step/time.Millisecond)) - require.NotEqual(t, iter.Next(), chunkenc.ValNone) - ts, v := iter.At() - require.EqualValues(t, ets, ts) - require.EqualValues(t, float64(ets), v) + require.Equal(t, iter.Next(), enc.ChunkValueType(), strconv.Itoa(i)) + + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, ets, ts) + require.EqualValues(t, float64(ets), v) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, ets, ts) + require.EqualValues(t, histograms[j], v) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, ets, ts) + require.EqualValues(t, histograms[j].ToFloat(nil), v) + } require.NoError(t, iter.Err()) } } @@ -109,11 +169,11 @@ func TestSeek(t *testing.T) { } for i := 0; i < chunk.BatchSize-1; i++ { - require.True(t, c.Seek(int64(i), 1)) + require.Equal(t, chunkenc.ValFloat, c.Seek(int64(i), 1)) } require.Equal(t, 1, it.seeks) - require.True(t, c.Seek(int64(chunk.BatchSize), 1)) + require.Equal(t, chunkenc.ValFloat, c.Seek(int64(chunk.BatchSize), 1)) require.Equal(t, 2, it.seeks) } @@ -121,18 +181,19 @@ type mockIterator struct { seeks int } -func (i *mockIterator) Scan() bool { - return true +func (i *mockIterator) Scan() chunkenc.ValueType { + return chunkenc.ValFloat } -func (i *mockIterator) FindAtOrAfter(model.Time) bool { +func (i *mockIterator) FindAtOrAfter(model.Time) chunkenc.ValueType { i.seeks++ - return true + return chunkenc.ValFloat } -func (i *mockIterator) Batch(size int) chunk.Batch { +func (i *mockIterator) Batch(size int, valType chunkenc.ValueType) chunk.Batch { batch := chunk.Batch{ - Length: chunk.BatchSize, + Length: chunk.BatchSize, + ValType: valType, } for i := 0; i < chunk.BatchSize; i++ { batch.Timestamps[i] = int64(i) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 7911314c1b..89688e12c3 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -2,6 +2,7 @@ package batch import ( "container/heap" + "github.com/prometheus/prometheus/tsdb/chunkenc" "sort" promchunk "github.com/cortexproject/cortex/pkg/chunk" @@ -36,7 +37,7 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { } for _, iter := range c.its { - if iter.Next(1) { + if iter.Next(1) != chunkenc.ValNone { c.h = append(c.h, iter) continue } @@ -50,7 +51,7 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { return c } -func (c *mergeIterator) Seek(t int64, size int) bool { +func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType { // Optimisation to see if the seek is within our current caches batches. found: @@ -74,14 +75,14 @@ found: c.batches = c.batches[:0] for _, iter := range c.its { - if iter.Seek(t, size) { + if iter.Seek(t, size) != chunkenc.ValNone { c.h = append(c.h, iter) continue } if err := iter.Err(); err != nil { c.currErr = err - return false + return chunkenc.ValNone } } @@ -91,7 +92,7 @@ found: return c.buildNextBatch(size) } -func (c *mergeIterator) Next(size int) bool { +func (c *mergeIterator) Next(size int) chunkenc.ValueType { // Pop the last built batch in a way that doesn't extend the slice. if len(c.batches) > 0 { copy(c.batches, c.batches[1:]) @@ -106,7 +107,7 @@ func (c *mergeIterator) nextBatchEndTime() int64 { return batch.Timestamps[batch.Length-1] } -func (c *mergeIterator) buildNextBatch(size int) bool { +func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType { // All we need to do is get enough batches that our first batch's last entry // is before all iterators next entry. for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { @@ -114,14 +115,17 @@ func (c *mergeIterator) buildNextBatch(size int) bool { c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size) c.batches = append(c.batches[:0], c.batchesBuf...) - if c.h[0].Next(size) { + if valType := c.h[0].Next(size); valType != chunkenc.ValNone { heap.Fix(&c.h, 0) } else { heap.Pop(&c.h) } } - return len(c.batches) > 0 + if len(c.batches) > 0 { + return c.batches[0].ValType + } + return chunkenc.ValNone } func (c *mergeIterator) AtTime() int64 { diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index ddd3b997c6..8ad0d16df4 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -19,10 +19,10 @@ func TestMergeIter(t *testing.T) { chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, newIteratorAdapter(iter)) + testIter(t, 200, newIteratorAdapter(iter), enc) iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testSeek(t, 200, newIteratorAdapter(iter)) + testSeek(t, 200, newIteratorAdapter(iter), enc) }) } @@ -41,9 +41,9 @@ func TestMergeHarder(t *testing.T) { from = from.Add(time.Duration(offset) * time.Second) } iter := newMergeIterator(chunks) - testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) iter = newMergeIterator(chunks) - testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) }) } diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index 58f268d9b1..0c4fd0ba0e 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -1,6 +1,8 @@ package batch import ( + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -20,14 +22,14 @@ func newNonOverlappingIterator(chunks []GenericChunk) *nonOverlappingIterator { return it } -func (it *nonOverlappingIterator) Seek(t int64, size int) bool { +func (it *nonOverlappingIterator) Seek(t int64, size int) chunkenc.ValueType { for { - if it.iter.Seek(t, size) { - return true + if valType := it.iter.Seek(t, size); valType != chunkenc.ValNone { + return valType } else if it.iter.Err() != nil { - return false + return chunkenc.ValNone } else if !it.next() { - return false + return chunkenc.ValNone } } } @@ -36,14 +38,14 @@ func (it *nonOverlappingIterator) MaxCurrentChunkTime() int64 { return it.iter.MaxCurrentChunkTime() } -func (it *nonOverlappingIterator) Next(size int) bool { +func (it *nonOverlappingIterator) Next(size int) chunkenc.ValueType { for { - if it.iter.Next(size) { - return true + if valType := it.iter.Next(size); valType != chunkenc.ValNone { + return valType } else if it.iter.Err() != nil { - return false + return chunkenc.ValNone } else if !it.next() { - return false + return chunkenc.ValNone } } } diff --git a/pkg/querier/batch/non_overlapping_test.go b/pkg/querier/batch/non_overlapping_test.go index 078650dbb0..2377e8c3fa 100644 --- a/pkg/querier/batch/non_overlapping_test.go +++ b/pkg/querier/batch/non_overlapping_test.go @@ -15,8 +15,8 @@ func TestNonOverlappingIter(t *testing.T) { for i := int64(0); i < 100; i++ { cs = append(cs, mkGenericChunk(t, model.TimeFromUnix(i*10), 10, enc)) } - testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) + testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) }) } @@ -31,7 +31,7 @@ func TestNonOverlappingIterSparse(t *testing.T) { mkGenericChunk(t, model.TimeFromUnix(95), 1, enc), mkGenericChunk(t, model.TimeFromUnix(96), 4, enc), } - testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) + testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) }) } diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 1aff5337be..475eae3258 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -1,6 +1,9 @@ package batch import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -17,8 +20,11 @@ func (bs *batchStream) reset() { } } -func (bs *batchStream) hasNext() bool { - return len(*bs) > 0 +func (bs *batchStream) hasNext() chunkenc.ValueType { + if len(*bs) > 0 { + return (*bs)[0].ValType + } + return chunkenc.ValNone } func (bs *batchStream) next() { @@ -28,6 +34,16 @@ func (bs *batchStream) next() { } } +func (bs *batchStream) atHistogram() (int64, *histogram.Histogram) { + b := &(*bs)[0] + return b.Timestamps[b.Index], b.Histograms[b.Index] +} + +func (bs *batchStream) atFloatHistogram() (int64, *histogram.FloatHistogram) { + b := &(*bs)[0] + return b.Timestamps[b.Index], b.FloatHistograms[b.Index] +} + func (bs *batchStream) atTime() int64 { return (*bs)[0].Timestamps[(*bs)[0].Index] } @@ -46,52 +62,67 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt resultLen := 1 // Number of batches in the final result. b := &result[0] - // This function adds a new batch to the result - // if the current batch being appended is full. - checkForFullBatch := func() { - if b.Index == size { - // The batch reached it intended size. - // Add another batch the result - // and use it for further appending. - - // The Index is the place at which new sample - // has to be appended, hence it tells the length. - b.Length = b.Index - resultLen++ - if resultLen > len(result) { - // It is possible that result can grow longer - // then the one provided. - result = append(result, promchunk.Batch{}) - } - b = &result[resultLen-1] + // This function adds a new batch to the result. + nextBatch := func(valueType chunkenc.ValueType) { + // The Index is the place at which new sample + // has to be appended, hence it tells the length. + b.Length = b.Index + resultLen++ + if resultLen > len(result) { + // It is possible that result can grow longer + // then the one provided. + result = append(result, promchunk.Batch{}) } + b = &result[resultLen-1] + b.ValType = valueType } - for left.hasNext() && right.hasNext() { - checkForFullBatch() + populateVal := func(bs batchStream, valueType chunkenc.ValueType) { + if b.Index == 0 { + b.ValType = valueType + } else if b.Index == size || b.ValType != valueType { + // The batch reached it intended size or a new value type is used. + // Add another batch to the result and use it for further appending. + nextBatch(valueType) + } + switch valueType { + case chunkenc.ValFloat: + b.Timestamps[b.Index], b.Values[b.Index] = bs.at() + case chunkenc.ValHistogram: + b.Timestamps[b.Index], b.Histograms[b.Index] = bs.atHistogram() + case chunkenc.ValFloatHistogram: + b.Timestamps[b.Index], b.FloatHistograms[b.Index] = bs.atFloatHistogram() + default: + panic("unsupported value type") + } + b.Index++ + } + + for leftValueType, rightValueType := left.hasNext(), right.hasNext(); leftValueType != chunkenc.ValNone && rightValueType != chunkenc.ValNone; { t1, t2 := left.atTime(), right.atTime() if t1 < t2 { - b.Timestamps[b.Index], b.Values[b.Index] = left.at() + populateVal(left, leftValueType) left.next() + leftValueType = left.hasNext() } else if t1 > t2 { - b.Timestamps[b.Index], b.Values[b.Index] = right.at() + populateVal(right, rightValueType) right.next() + rightValueType = right.hasNext() } else { - b.Timestamps[b.Index], b.Values[b.Index] = left.at() + populateVal(left, leftValueType) left.next() + leftValueType = left.hasNext() right.next() + rightValueType = right.hasNext() } - b.Index++ } // This function adds all the samples from the provided // batchStream into the result in the same order. addToResult := func(bs batchStream) { - for ; bs.hasNext(); bs.next() { - checkForFullBatch() - b.Timestamps[b.Index], b.Values[b.Index] = bs.at() - b.Index++ - b.Length++ + for valueType := bs.hasNext(); valueType != chunkenc.ValNone; valueType = bs.hasNext() { + populateVal(bs, valueType) + bs.next() } } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index 932de41c6b..62dc3ed208 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -4,62 +4,102 @@ import ( "strconv" "testing" + "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/require" promchunk "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" ) func TestStream(t *testing.T) { t.Parallel() - for i, tc := range []struct { - input1, input2 []promchunk.Batch - output batchStream - }{ - { - input1: []promchunk.Batch{mkBatch(0)}, - output: []promchunk.Batch{mkBatch(0)}, - }, + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + for i, tc := range []struct { + input1, input2 []promchunk.Batch + output batchStream + }{ + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + output: []promchunk.Batch{mkBatch(0, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0)}, - input2: []promchunk.Batch{mkBatch(0)}, - output: []promchunk.Batch{mkBatch(0)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + input2: []promchunk.Batch{mkBatch(0, enc)}, + output: []promchunk.Batch{mkBatch(0, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0)}, - input2: []promchunk.Batch{mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - input2: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize/2, enc), mkBatch(2*promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(2*promchunk.BatchSize, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(5 * promchunk.BatchSize / 2)}, - input2: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, - }, - } { - tc := tc - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - result := make(batchStream, len(tc.input1)+len(tc.input2)) - result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) - require.Equal(t, batchStream(tc.output), result) - }) - } + { + input1: []promchunk.Batch{mkBatch(promchunk.BatchSize/2, enc), mkBatch(3*promchunk.BatchSize/2, enc), mkBatch(5*promchunk.BatchSize/2, enc)}, + input2: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(3*promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(2*promchunk.BatchSize, enc), mkBatch(3*promchunk.BatchSize, enc)}, + }, + } { + tc := tc + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + result := make(batchStream, len(tc.input1)+len(tc.input2)) + result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) + require.Equal(t, batchStream(tc.output), result) + }) + } + }) } -func mkBatch(from int64) promchunk.Batch { +func mkBatch(from int64, enc encoding.Encoding) promchunk.Batch { var result promchunk.Batch for i := int64(0); i < promchunk.BatchSize; i++ { result.Timestamps[i] = from + i - result.Values[i] = float64(from + i) + switch enc { + case encoding.PrometheusXorChunk: + result.Values[i] = float64(from + i) + case encoding.PrometheusHistogramChunk: + result.Histograms[i] = testHistogram(int(from+i), 5, 20) + case encoding.PrometheusFloatHistogramChunk: + result.FloatHistograms[i] = testHistogram(int(from+i), 5, 20).ToFloat(nil) + } } result.Length = promchunk.BatchSize + result.ValType = enc.ChunkValueType() return result } + +func testHistogram(count, numSpans, numBuckets int) *histogram.Histogram { + bucketsPerSide := numBuckets / 2 + spanLength := uint32(bucketsPerSide / numSpans) + h := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: uint64(count), + ZeroCount: uint64(count), + ZeroThreshold: 1e-128, + Sum: 18.4 * float64(count+1), + Schema: 2, + NegativeSpans: make([]histogram.Span, numSpans), + PositiveSpans: make([]histogram.Span, numSpans), + NegativeBuckets: make([]int64, bucketsPerSide), + PositiveBuckets: make([]int64, bucketsPerSide), + } + for j := 0; j < numSpans; j++ { + s := histogram.Span{Offset: 1, Length: spanLength} + h.NegativeSpans[j] = s + h.PositiveSpans[j] = s + } + + for j := 0; j < bucketsPerSide; j++ { + h.NegativeBuckets[j] = 1 + h.PositiveBuckets[j] = 1 + } + return h +} diff --git a/pkg/util/histogram/testutils.go b/pkg/util/histogram/testutils.go new file mode 100644 index 0000000000..a2617d8e56 --- /dev/null +++ b/pkg/util/histogram/testutils.go @@ -0,0 +1,55 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import "github.com/prometheus/prometheus/model/histogram" + +// Adapted from Prometheus model/histogram/test_utils.go GenerateBigTestHistograms. +func GenerateTestHistograms(from, step, numHistograms, numSpans, numBuckets int) []*histogram.Histogram { + bucketsPerSide := numBuckets / 2 + spanLength := uint32(bucketsPerSide / numSpans) + // Given all bucket deltas are 1, sum bucketsPerSide + 1. + observationCount := bucketsPerSide * (1 + bucketsPerSide) + + var histograms []*histogram.Histogram + for i := 0; i < numHistograms; i++ { + v := from + i*step + h := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: uint64(v + observationCount), + ZeroCount: uint64(v), + ZeroThreshold: 1e-128, + Sum: 18.4 * float64(v+1), + Schema: 2, + NegativeSpans: make([]histogram.Span, numSpans), + PositiveSpans: make([]histogram.Span, numSpans), + NegativeBuckets: make([]int64, bucketsPerSide), + PositiveBuckets: make([]int64, bucketsPerSide), + } + + for j := 0; j < numSpans; j++ { + s := histogram.Span{Offset: 1, Length: spanLength} + h.NegativeSpans[j] = s + h.PositiveSpans[j] = s + } + + for j := 0; j < bucketsPerSide; j++ { + h.NegativeBuckets[j] = 1 + h.PositiveBuckets[j] = 1 + } + + histograms = append(histograms, h) + } + return histograms +} From 50d6fad28d83c14c9b62e2b884e0f74c8ec44e97 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 11 May 2024 23:06:25 -0700 Subject: [PATCH 2/5] lint Signed-off-by: Ben Ye --- pkg/querier/batch/chunk_test.go | 1 + pkg/querier/batch/merge.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 3b5f85d667..4909df960d 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -55,6 +55,7 @@ func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int pc, err := chunkenc.NewEmptyChunk(pe) require.NoError(t, err) appender, err := pc.Appender() + require.NoError(t, err) ts := from switch pe { diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 89688e12c3..8c7ebdf062 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -2,9 +2,10 @@ package batch import ( "container/heap" - "github.com/prometheus/prometheus/tsdb/chunkenc" "sort" + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) From 6f2e245dd5d9fbf151e9ea25802b55a7a9c63ecf Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 14 May 2024 20:45:35 -0700 Subject: [PATCH 3/5] update Signed-off-by: Ben Ye --- pkg/chunk/chunk.go | 5 +++-- pkg/chunk/iterator.go | 26 ++++++++++++++------------ pkg/querier/batch/batch.go | 10 +++++----- pkg/querier/batch/chunk_test.go | 4 ++-- pkg/querier/batch/stream.go | 14 ++++++++++---- pkg/querier/batch/stream_test.go | 24 +++++++++++++++++++++--- 6 files changed, 55 insertions(+), 28 deletions(-) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 77662b7d20..159556ec99 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "unsafe" ) // Chunk contains encoded timeseries data @@ -71,11 +72,11 @@ func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Ba case chunkenc.ValHistogram: t, v := p.it.AtHistogram(nil) batch.Timestamps[j] = t - batch.Histograms[j] = v + batch.HistogramValues[j] = unsafe.Pointer(v) case chunkenc.ValFloatHistogram: t, v := p.it.AtFloatHistogram(nil) batch.Timestamps[j] = t - batch.FloatHistograms[j] = v + batch.HistogramValues[j] = unsafe.Pointer(v) } j++ if j < size && p.it.Next() == chunkenc.ValNone { diff --git a/pkg/chunk/iterator.go b/pkg/chunk/iterator.go index 331fc15176..044d6a4943 100644 --- a/pkg/chunk/iterator.go +++ b/pkg/chunk/iterator.go @@ -1,8 +1,9 @@ package chunk import ( + "unsafe" + "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -13,12 +14,12 @@ type Iterator interface { // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or - // found (by one of the Find... methods). Returns false if either the - // end of the chunk is reached or an error has occurred. + // found (by one of the Find... methods). Returns chunkenc.ValNoe if either + // the end of the chunk is reached or an error has occurred. Scan() chunkenc.ValueType - // Finds the oldest value at or after the provided time. Returns false - // if either the chunk contains no value at or after the provided time, - // or an error has occurred. + // Finds the oldest value at or after the provided time and returns the value type. + // Returns chunkenc.ValNone if either the chunk contains no value at or after + // the provided time, or an error has occurred. FindAtOrAfter(model.Time) chunkenc.ValueType // Returns a batch of the provisded size; NB not idempotent! Should only be called // once per Scan. @@ -32,13 +33,14 @@ type Iterator interface { // 1 to 128. const BatchSize = 12 -// Batch is a sorted set of (timestamp, value) pairs. They are intended to be -// small, and passed by value. +// Batch is a sorted set of (timestamp, value) pairs. They are intended to be small, +// and passed by value. Value can vary depending on the chunk value type. type Batch struct { - Timestamps [BatchSize]int64 - Values [BatchSize]float64 - Histograms [BatchSize]*histogram.Histogram - FloatHistograms [BatchSize]*histogram.FloatHistogram + Timestamps [BatchSize]int64 + Values [BatchSize]float64 + // HistogramValues are pointers to store the value of either + // *histogram.Histogram or *histogram.FloatHistogram value. + HistogramValues [BatchSize]unsafe.Pointer Index int Length int ValType chunkenc.ValueType diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index 2cf37f9ff9..30d94afd5c 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -31,10 +31,10 @@ func (c GenericChunk) Iterator(reuse chunk.Iterator) chunk.Iterator { // iterator iterates over batches. type iterator interface { - // Seek to the batch at (or after) time t. + // Seek to the batch at (or after) time t and returns chunk value type. Seek(t int64, size int) chunkenc.ValueType - // Next moves to the next batch. + // Next moves to the next batch and returns chunk value type. Next(size int) chunkenc.ValueType // AtTime returns the start time of the next batch. Must only be called after @@ -44,7 +44,7 @@ type iterator interface { // MaxCurrentChunkTime returns the max time on the current chunk. MaxCurrentChunkTime() int64 - // Batch returns the current batch. Must only be called after Seek or Next + // Batch returns the current batch. Must only be called after Seek or Next // have returned true. Batch() chunk.Batch @@ -157,12 +157,12 @@ func (a *iteratorAdapter) Err() error { // AtHistogram implements chunkenc.Iterator. func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index] + return a.curr.Timestamps[a.curr.Index], (*histogram.Histogram)(a.curr.HistogramValues[a.curr.Index]) } // AtFloatHistogram implements chunkenc.Iterator. func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index] + return a.curr.Timestamps[a.curr.Index], (*histogram.FloatHistogram)(a.curr.HistogramValues[a.curr.Index]) } // AtT implements chunkenc.Iterator. diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 4909df960d..b8327a70e9 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -35,9 +35,9 @@ func TestChunkIter(t *testing.T) { func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { for _, enc := range []promchunk.Encoding{ - promchunk.PrometheusXorChunk, + //promchunk.PrometheusXorChunk, promchunk.PrometheusHistogramChunk, - promchunk.PrometheusFloatHistogramChunk, + //promchunk.PrometheusFloatHistogramChunk, } { enc := enc t.Run(enc.String(), func(t *testing.T) { diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 475eae3258..71ac651f07 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -1,6 +1,8 @@ package batch import ( + "unsafe" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -36,12 +38,12 @@ func (bs *batchStream) next() { func (bs *batchStream) atHistogram() (int64, *histogram.Histogram) { b := &(*bs)[0] - return b.Timestamps[b.Index], b.Histograms[b.Index] + return b.Timestamps[b.Index], (*histogram.Histogram)(b.HistogramValues[b.Index]) } func (bs *batchStream) atFloatHistogram() (int64, *histogram.FloatHistogram) { b := &(*bs)[0] - return b.Timestamps[b.Index], b.FloatHistograms[b.Index] + return b.Timestamps[b.Index], (*histogram.FloatHistogram)(b.HistogramValues[b.Index]) } func (bs *batchStream) atTime() int64 { @@ -89,9 +91,13 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt case chunkenc.ValFloat: b.Timestamps[b.Index], b.Values[b.Index] = bs.at() case chunkenc.ValHistogram: - b.Timestamps[b.Index], b.Histograms[b.Index] = bs.atHistogram() + t, v := bs.atHistogram() + b.Timestamps[b.Index] = t + b.HistogramValues[b.Index] = unsafe.Pointer(v) case chunkenc.ValFloatHistogram: - b.Timestamps[b.Index], b.FloatHistograms[b.Index] = bs.atFloatHistogram() + t, v := bs.atFloatHistogram() + b.Timestamps[b.Index] = t + b.HistogramValues[b.Index] = unsafe.Pointer(v) default: panic("unsupported value type") } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index 62dc3ed208..bdd04d53f5 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -3,8 +3,10 @@ package batch import ( "strconv" "testing" + "unsafe" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" promchunk "github.com/cortexproject/cortex/pkg/chunk" @@ -52,7 +54,23 @@ func TestStream(t *testing.T) { t.Parallel() result := make(batchStream, len(tc.input1)+len(tc.input2)) result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) - require.Equal(t, batchStream(tc.output), result) + require.Equal(t, len(tc.output), len(result)) + for j := 0; j < len(tc.output); j++ { + require.Equal(t, tc.output[j].ValType, result[j].ValType) + require.Equal(t, tc.output[j].Length, result[j].Length) + require.Equal(t, tc.output[j].Index, result[j].Index) + require.Equal(t, tc.output[j].Timestamps, result[j].Timestamps) + require.Equal(t, tc.output[j].Values, result[j].Values) + for k := 0; k < len(tc.output[j].HistogramValues); k++ { + switch tc.output[j].ValType { + case chunkenc.ValHistogram: + require.Equal(t, (*histogram.Histogram)(tc.output[j].HistogramValues[k]), (*histogram.Histogram)(result[j].HistogramValues[k])) + case chunkenc.ValFloatHistogram: + require.Equal(t, (*histogram.FloatHistogram)(tc.output[j].HistogramValues[k]), (*histogram.FloatHistogram)(result[j].HistogramValues[k])) + default: + } + } + } }) } }) @@ -66,9 +84,9 @@ func mkBatch(from int64, enc encoding.Encoding) promchunk.Batch { case encoding.PrometheusXorChunk: result.Values[i] = float64(from + i) case encoding.PrometheusHistogramChunk: - result.Histograms[i] = testHistogram(int(from+i), 5, 20) + result.HistogramValues[i] = unsafe.Pointer(testHistogram(int(from+i), 5, 20)) case encoding.PrometheusFloatHistogramChunk: - result.FloatHistograms[i] = testHistogram(int(from+i), 5, 20).ToFloat(nil) + result.HistogramValues[i] = unsafe.Pointer(testHistogram(int(from+i), 5, 20).ToFloat(nil)) } } result.Length = promchunk.BatchSize From 57643c1b2b09509c5798d490b19e1e4e36a96716 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 14 May 2024 20:58:06 -0700 Subject: [PATCH 4/5] lint Signed-off-by: Ben Ye --- pkg/chunk/chunk.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 159556ec99..4c708beb86 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -1,12 +1,13 @@ package chunk import ( + "unsafe" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" - "unsafe" ) // Chunk contains encoded timeseries data From 2afaaa06c8f28cc30ba1c056c639a15b1f780569 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 15 May 2024 15:22:27 -0700 Subject: [PATCH 5/5] revert to not use unsafe Signed-off-by: Ben Ye --- pkg/chunk/chunk.go | 10 ++-------- pkg/chunk/iterator.go | 12 +++++------- pkg/querier/batch/batch.go | 4 ++-- pkg/querier/batch/stream.go | 14 ++++---------- pkg/querier/batch/stream_test.go | 24 +++--------------------- 5 files changed, 16 insertions(+), 48 deletions(-) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 4c708beb86..be48dff43a 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -1,8 +1,6 @@ package chunk import ( - "unsafe" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" @@ -71,13 +69,9 @@ func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Ba batch.Timestamps[j] = t batch.Values[j] = v case chunkenc.ValHistogram: - t, v := p.it.AtHistogram(nil) - batch.Timestamps[j] = t - batch.HistogramValues[j] = unsafe.Pointer(v) + batch.Timestamps[j], batch.Histograms[j] = p.it.AtHistogram(nil) case chunkenc.ValFloatHistogram: - t, v := p.it.AtFloatHistogram(nil) - batch.Timestamps[j] = t - batch.HistogramValues[j] = unsafe.Pointer(v) + batch.Timestamps[j], batch.FloatHistograms[j] = p.it.AtFloatHistogram(nil) } j++ if j < size && p.it.Next() == chunkenc.ValNone { diff --git a/pkg/chunk/iterator.go b/pkg/chunk/iterator.go index 044d6a4943..daf24d9809 100644 --- a/pkg/chunk/iterator.go +++ b/pkg/chunk/iterator.go @@ -1,9 +1,8 @@ package chunk import ( - "unsafe" - "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -36,11 +35,10 @@ const BatchSize = 12 // Batch is a sorted set of (timestamp, value) pairs. They are intended to be small, // and passed by value. Value can vary depending on the chunk value type. type Batch struct { - Timestamps [BatchSize]int64 - Values [BatchSize]float64 - // HistogramValues are pointers to store the value of either - // *histogram.Histogram or *histogram.FloatHistogram value. - HistogramValues [BatchSize]unsafe.Pointer + Timestamps [BatchSize]int64 + Values [BatchSize]float64 + Histograms [BatchSize]*histogram.Histogram + FloatHistograms [BatchSize]*histogram.FloatHistogram Index int Length int ValType chunkenc.ValueType diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index 30d94afd5c..cc6bf466ea 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -157,12 +157,12 @@ func (a *iteratorAdapter) Err() error { // AtHistogram implements chunkenc.Iterator. func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - return a.curr.Timestamps[a.curr.Index], (*histogram.Histogram)(a.curr.HistogramValues[a.curr.Index]) + return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index] } // AtFloatHistogram implements chunkenc.Iterator. func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - return a.curr.Timestamps[a.curr.Index], (*histogram.FloatHistogram)(a.curr.HistogramValues[a.curr.Index]) + return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index] } // AtT implements chunkenc.Iterator. diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 71ac651f07..475eae3258 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -1,8 +1,6 @@ package batch import ( - "unsafe" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -38,12 +36,12 @@ func (bs *batchStream) next() { func (bs *batchStream) atHistogram() (int64, *histogram.Histogram) { b := &(*bs)[0] - return b.Timestamps[b.Index], (*histogram.Histogram)(b.HistogramValues[b.Index]) + return b.Timestamps[b.Index], b.Histograms[b.Index] } func (bs *batchStream) atFloatHistogram() (int64, *histogram.FloatHistogram) { b := &(*bs)[0] - return b.Timestamps[b.Index], (*histogram.FloatHistogram)(b.HistogramValues[b.Index]) + return b.Timestamps[b.Index], b.FloatHistograms[b.Index] } func (bs *batchStream) atTime() int64 { @@ -91,13 +89,9 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt case chunkenc.ValFloat: b.Timestamps[b.Index], b.Values[b.Index] = bs.at() case chunkenc.ValHistogram: - t, v := bs.atHistogram() - b.Timestamps[b.Index] = t - b.HistogramValues[b.Index] = unsafe.Pointer(v) + b.Timestamps[b.Index], b.Histograms[b.Index] = bs.atHistogram() case chunkenc.ValFloatHistogram: - t, v := bs.atFloatHistogram() - b.Timestamps[b.Index] = t - b.HistogramValues[b.Index] = unsafe.Pointer(v) + b.Timestamps[b.Index], b.FloatHistograms[b.Index] = bs.atFloatHistogram() default: panic("unsupported value type") } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index bdd04d53f5..2274cf7aa0 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -3,10 +3,8 @@ package batch import ( "strconv" "testing" - "unsafe" "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" promchunk "github.com/cortexproject/cortex/pkg/chunk" @@ -54,23 +52,7 @@ func TestStream(t *testing.T) { t.Parallel() result := make(batchStream, len(tc.input1)+len(tc.input2)) result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) - require.Equal(t, len(tc.output), len(result)) - for j := 0; j < len(tc.output); j++ { - require.Equal(t, tc.output[j].ValType, result[j].ValType) - require.Equal(t, tc.output[j].Length, result[j].Length) - require.Equal(t, tc.output[j].Index, result[j].Index) - require.Equal(t, tc.output[j].Timestamps, result[j].Timestamps) - require.Equal(t, tc.output[j].Values, result[j].Values) - for k := 0; k < len(tc.output[j].HistogramValues); k++ { - switch tc.output[j].ValType { - case chunkenc.ValHistogram: - require.Equal(t, (*histogram.Histogram)(tc.output[j].HistogramValues[k]), (*histogram.Histogram)(result[j].HistogramValues[k])) - case chunkenc.ValFloatHistogram: - require.Equal(t, (*histogram.FloatHistogram)(tc.output[j].HistogramValues[k]), (*histogram.FloatHistogram)(result[j].HistogramValues[k])) - default: - } - } - } + require.Equal(t, tc.output, result) }) } }) @@ -84,9 +66,9 @@ func mkBatch(from int64, enc encoding.Encoding) promchunk.Batch { case encoding.PrometheusXorChunk: result.Values[i] = float64(from + i) case encoding.PrometheusHistogramChunk: - result.HistogramValues[i] = unsafe.Pointer(testHistogram(int(from+i), 5, 20)) + result.Histograms[i] = testHistogram(int(from+i), 5, 20) case encoding.PrometheusFloatHistogramChunk: - result.HistogramValues[i] = unsafe.Pointer(testHistogram(int(from+i), 5, 20).ToFloat(nil)) + result.FloatHistograms[i] = testHistogram(int(from+i), 5, 20).ToFloat(nil) } } result.Length = promchunk.BatchSize