diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 2d36192ee0..be48dff43a 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -3,6 +3,7 @@ package chunk import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -45,24 +46,33 @@ type prometheusChunkIterator struct { it chunkenc.Iterator } -func (p *prometheusChunkIterator) Scan() bool { - return p.it.Next() != chunkenc.ValNone +func (p *prometheusChunkIterator) Scan() chunkenc.ValueType { + return p.it.Next() } -func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool { +func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { // FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator, // otherwise we cannot guarantee OLDEST. p.it = p.c.Iterator(p.it) - return p.it.Seek(int64(time)) != chunkenc.ValNone + return p.it.Seek(int64(time)) } -func (p *prometheusChunkIterator) Batch(size int) Batch { +func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Batch { var batch Batch j := 0 for j < size { - t, v := p.it.At() - batch.Timestamps[j] = t - batch.Values[j] = v + switch valType { + case chunkenc.ValNone: + break + case chunkenc.ValFloat: + t, v := p.it.At() + batch.Timestamps[j] = t + batch.Values[j] = v + case chunkenc.ValHistogram: + batch.Timestamps[j], batch.Histograms[j] = p.it.AtHistogram(nil) + case chunkenc.ValFloatHistogram: + batch.Timestamps[j], batch.FloatHistograms[j] = p.it.AtFloatHistogram(nil) + } j++ if j < size && p.it.Next() == chunkenc.ValNone { break @@ -70,6 +80,7 @@ func (p *prometheusChunkIterator) Batch(size int) Batch { } batch.Index = 0 batch.Length = j + batch.ValType = valType return batch } @@ -79,7 +90,14 @@ func (p *prometheusChunkIterator) Err() error { type errorIterator string -func (e errorIterator) Scan() bool { return false } -func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false } -func (e errorIterator) Batch(size int) Batch { panic("no values") } -func (e errorIterator) Err() error { return errors.New(string(e)) } +func (e errorIterator) Scan() chunkenc.ValueType { return chunkenc.ValNone } +func (e errorIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { return chunkenc.ValNone } +func (e errorIterator) Value() model.SamplePair { panic("no values") } +func (e errorIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + panic("no values") +} +func (e errorIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + panic("no values") +} +func (e errorIterator) Batch(size int, valType chunkenc.ValueType) Batch { panic("no values") } +func (e errorIterator) Err() error { return errors.New(string(e)) } diff --git a/pkg/chunk/encoding/encoding.go b/pkg/chunk/encoding/encoding.go index 9a94d1e716..8dde3eb7a0 100644 --- a/pkg/chunk/encoding/encoding.go +++ b/pkg/chunk/encoding/encoding.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -// Encoding defines which encoding we are using, delta, doubledelta, or varbit +// Encoding defines which encoding we are using. type Encoding byte // String implements flag.Value. @@ -26,21 +26,46 @@ func (e Encoding) PromChunkEncoding() chunkenc.Encoding { return chunkenc.EncNone } +func (e Encoding) ChunkValueType() chunkenc.ValueType { + if known, found := encodings[e]; found { + return known.ValueType + } + return chunkenc.ValNone +} + const ( // PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk. // 4 is the magic value for backwards-compatibility with previous iota-based constants. PrometheusXorChunk Encoding = 4 + // PrometheusHistogramChunk is a wrapper around Prometheus histogram chunk. + // 5 is the magic value for backwards-compatibility with previous iota-based constants. + PrometheusHistogramChunk Encoding = 5 + // PrometheusFloatHistogramChunk is a wrapper around Prometheus float histogram chunk. + // 6 is the magic value for backwards-compatibility with previous iota-based constants. + PrometheusFloatHistogramChunk Encoding = 6 ) type encoding struct { - Name string - Encoding chunkenc.Encoding + Name string + Encoding chunkenc.Encoding + ValueType chunkenc.ValueType } var encodings = map[Encoding]encoding{ PrometheusXorChunk: { - Name: "PrometheusXorChunk", - Encoding: chunkenc.EncXOR, + Name: "PrometheusXorChunk", + Encoding: chunkenc.EncXOR, + ValueType: chunkenc.ValFloat, + }, + PrometheusHistogramChunk: { + Name: "PrometheusHistogramChunk", + Encoding: chunkenc.EncHistogram, + ValueType: chunkenc.ValHistogram, + }, + PrometheusFloatHistogramChunk: { + Name: "PrometheusFloatHistogramChunk", + Encoding: chunkenc.EncFloatHistogram, + ValueType: chunkenc.ValFloatHistogram, }, } @@ -48,6 +73,10 @@ func FromPromChunkEncoding(enc chunkenc.Encoding) (Encoding, error) { switch enc { case chunkenc.EncXOR: return PrometheusXorChunk, nil + case chunkenc.EncHistogram: + return PrometheusHistogramChunk, nil + case chunkenc.EncFloatHistogram: + return PrometheusFloatHistogramChunk, nil } return Encoding(0), errors.Errorf("unknown Prometheus chunk encoding: %v", enc) } diff --git a/pkg/chunk/iterator.go b/pkg/chunk/iterator.go index 5e41e211a6..daf24d9809 100644 --- a/pkg/chunk/iterator.go +++ b/pkg/chunk/iterator.go @@ -2,6 +2,8 @@ package chunk import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" ) // Iterator enables efficient access to the content of a chunk. It is @@ -11,16 +13,16 @@ type Iterator interface { // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or - // found (by one of the Find... methods). Returns false if either the - // end of the chunk is reached or an error has occurred. - Scan() bool - // Finds the oldest value at or after the provided time. Returns false - // if either the chunk contains no value at or after the provided time, - // or an error has occurred. - FindAtOrAfter(model.Time) bool + // found (by one of the Find... methods). Returns chunkenc.ValNoe if either + // the end of the chunk is reached or an error has occurred. + Scan() chunkenc.ValueType + // Finds the oldest value at or after the provided time and returns the value type. + // Returns chunkenc.ValNone if either the chunk contains no value at or after + // the provided time, or an error has occurred. + FindAtOrAfter(model.Time) chunkenc.ValueType // Returns a batch of the provisded size; NB not idempotent! Should only be called // once per Scan. - Batch(size int) Batch + Batch(size int, valType chunkenc.ValueType) Batch // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. Err() error @@ -30,11 +32,14 @@ type Iterator interface { // 1 to 128. const BatchSize = 12 -// Batch is a sorted set of (timestamp, value) pairs. They are intended to be -// small, and passed by value. +// Batch is a sorted set of (timestamp, value) pairs. They are intended to be small, +// and passed by value. Value can vary depending on the chunk value type. type Batch struct { - Timestamps [BatchSize]int64 - Values [BatchSize]float64 - Index int - Length int + Timestamps [BatchSize]int64 + Values [BatchSize]float64 + Histograms [BatchSize]*histogram.Histogram + FloatHistograms [BatchSize]*histogram.FloatHistogram + Index int + Length int + ValType chunkenc.ValueType } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index c1d998bea6..cc6bf466ea 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -2,10 +2,10 @@ package batch import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/querier/iterators" ) // GenericChunk is a generic chunk used by the batch iterator, in order to make the batch @@ -31,11 +31,11 @@ func (c GenericChunk) Iterator(reuse chunk.Iterator) chunk.Iterator { // iterator iterates over batches. type iterator interface { - // Seek to the batch at (or after) time t. - Seek(t int64, size int) bool + // Seek to the batch at (or after) time t and returns chunk value type. + Seek(t int64, size int) chunkenc.ValueType - // Next moves to the next batch. - Next(size int) bool + // Next moves to the next batch and returns chunk value type. + Next(size int) chunkenc.ValueType // AtTime returns the start time of the next batch. Must only be called after // Seek or Next have returned true. @@ -44,7 +44,7 @@ type iterator interface { // MaxCurrentChunkTime returns the max time on the current chunk. MaxCurrentChunkTime() int64 - // Batch returns the current batch. Must only be called after Seek or Next + // Batch returns the current batch. Must only be called after Seek or Next // have returned true. Batch() chunk.Batch @@ -78,37 +78,41 @@ type iteratorAdapter struct { } func newIteratorAdapter(underlying iterator) chunkenc.Iterator { - return iterators.NewCompatibleChunksIterator(&iteratorAdapter{ + return &iteratorAdapter{ batchSize: 1, underlying: underlying, - }) + } } // Seek implements chunkenc.Iterator. -func (a *iteratorAdapter) Seek(t int64) bool { +func (a *iteratorAdapter) Seek(t int64) chunkenc.ValueType { // Optimisation: fulfill the seek using current batch if possible. if a.curr.Length > 0 && a.curr.Index < a.curr.Length { if t <= a.curr.Timestamps[a.curr.Index] { //In this case, the interface's requirement is met, so state of this //iterator does not need any change. - return true + return a.curr.ValType } else if t <= a.curr.Timestamps[a.curr.Length-1] { //In this case, some timestamp between current sample and end of batch can fulfill //the seek. Let's find it. for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] { a.curr.Index++ } - return true + return a.curr.ValType } else if t <= a.underlying.MaxCurrentChunkTime() { // In this case, some timestamp inside the current underlying chunk can fulfill the seek. // In this case we will call next until we find the sample as it will be faster than calling // `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk. // See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45 // https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95 - for a.Next() { + for { + valType := a.Next() + if valType == chunkenc.ValNone { + break + } if t <= a.curr.Timestamps[a.curr.Index] { - return true + return valType } } } @@ -116,24 +120,29 @@ func (a *iteratorAdapter) Seek(t int64) bool { a.curr.Length = -1 a.batchSize = 1 - if a.underlying.Seek(t, a.batchSize) { + if valType := a.underlying.Seek(t, a.batchSize); valType != chunkenc.ValNone { a.curr = a.underlying.Batch() - return a.curr.Index < a.curr.Length + if a.curr.Index < a.curr.Length { + return a.curr.ValType + } } - return false + return chunkenc.ValNone } // Next implements chunkenc.Iterator. -func (a *iteratorAdapter) Next() bool { +func (a *iteratorAdapter) Next() chunkenc.ValueType { a.curr.Index++ - for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) { + for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) != chunkenc.ValNone { a.curr = a.underlying.Batch() a.batchSize = a.batchSize * 2 if a.batchSize > chunk.BatchSize { a.batchSize = chunk.BatchSize } } - return a.curr.Index < a.curr.Length + if a.curr.Index < a.curr.Length { + return a.curr.ValType + } + return chunkenc.ValNone } // At implements chunkenc.Iterator. @@ -145,3 +154,18 @@ func (a *iteratorAdapter) At() (int64, float64) { func (a *iteratorAdapter) Err() error { return nil } + +// AtHistogram implements chunkenc.Iterator. +func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index] +} + +// AtFloatHistogram implements chunkenc.Iterator. +func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index] +} + +// AtT implements chunkenc.Iterator. +func (a *iteratorAdapter) AtT() int64 { + return a.curr.Timestamps[a.curr.Index] +} diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 7f40afcf79..801e28008d 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" ) func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { @@ -26,6 +27,18 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 100, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusFloatHistogramChunk}, + {numChunks: 1, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusFloatHistogramChunk}, } for _, scenario := range scenarios { @@ -37,6 +50,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { chunks := createChunks(b, step, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc) + b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() @@ -90,6 +104,7 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { chunks := createChunks(b, scenario.scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc) + b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() @@ -105,20 +120,38 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { } func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { - t.Parallel() - chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk) - chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk) - chunks := []chunk.Chunk{chunkOne, chunkTwo} - - sut := NewChunkMergeIterator(chunks, 0, 0) - - // Following calls mimics Prometheus's query engine behaviour for VectorSelector. - require.True(t, sut.Next() != chunkenc.ValNone) - require.True(t, sut.Seek(0) != chunkenc.ValNone) - - actual, val := sut.At() - require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value. - require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + histograms := histogram_util.GenerateTestHistograms(1000, 1000, 1, 5, 20) + for _, enc := range []promchunk.Encoding{ + promchunk.PrometheusXorChunk, + promchunk.PrometheusHistogramChunk, + promchunk.PrometheusFloatHistogramChunk, + } { + valType := enc.ChunkValueType() + chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, enc) + chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc) + chunks := []chunk.Chunk{chunkOne, chunkTwo} + + sut := NewChunkMergeIterator(chunks, 0, 0) + + // Following calls mimics Prometheus's query engine behaviour for VectorSelector. + require.Equal(t, valType, sut.Next()) + require.Equal(t, valType, sut.Seek(0)) + + switch enc { + case promchunk.PrometheusXorChunk: + actual, val := sut.At() + require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value. + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + case promchunk.PrometheusHistogramChunk: + actual, val := sut.AtHistogram(nil) + require.Equal(t, histograms[0], val) + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + case promchunk.PrometheusFloatHistogramChunk: + actual, val := sut.AtFloatHistogram(nil) + require.Equal(t, histograms[0].ToFloat(nil), val) + require.Equal(t, int64(1*time.Second/time.Millisecond), actual) + } + } } func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk { diff --git a/pkg/querier/batch/chunk.go b/pkg/querier/batch/chunk.go index 96e9de4db3..f546d3d0f3 100644 --- a/pkg/querier/batch/chunk.go +++ b/pkg/querier/batch/chunk.go @@ -2,6 +2,7 @@ package batch import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunkenc" promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -27,11 +28,11 @@ func (i *chunkIterator) MaxCurrentChunkTime() int64 { // Seek advances the iterator forward to the value at or after // the given timestamp. -func (i *chunkIterator) Seek(t int64, size int) bool { +func (i *chunkIterator) Seek(t int64, size int) chunkenc.ValueType { // We assume seeks only care about a specific window; if this chunk doesn't // contain samples in that window, we can shortcut. if i.chunk.MaxTime < t { - return false + return chunkenc.ValNone } // If the seek is to the middle of the current batch, and size fits, we can @@ -42,23 +43,27 @@ func (i *chunkIterator) Seek(t int64, size int) bool { i.batch.Index++ } if i.batch.Index+size < i.batch.Length { - return true + return i.batch.ValType } } - if i.it.FindAtOrAfter(model.Time(t)) { - i.batch = i.it.Batch(size) - return i.batch.Length > 0 + if valueType := i.it.FindAtOrAfter(model.Time(t)); valueType != chunkenc.ValNone { + i.batch = i.it.Batch(size, valueType) + if i.batch.Length > 0 { + return valueType + } } - return false + return chunkenc.ValNone } -func (i *chunkIterator) Next(size int) bool { - if i.it.Scan() { - i.batch = i.it.Batch(size) - return i.batch.Length > 0 +func (i *chunkIterator) Next(size int) chunkenc.ValueType { + if valueType := i.it.Scan(); valueType != chunkenc.ValNone { + i.batch = i.it.Batch(size, valueType) + if i.batch.Length > 0 { + return valueType + } } - return false + return chunkenc.ValNone } func (i *chunkIterator) AtTime() int64 { diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 2f137f956f..b8327a70e9 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" ) const ( @@ -25,16 +26,18 @@ func TestChunkIter(t *testing.T) { iter := &chunkIterator{} iter.reset(chunk) - testIter(t, 100, newIteratorAdapter(iter)) + testIter(t, 100, newIteratorAdapter(iter), enc) iter.reset(chunk) - testSeek(t, 100, newIteratorAdapter(iter)) + testSeek(t, 100, newIteratorAdapter(iter), enc) }) } func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { for _, enc := range []promchunk.Encoding{ - promchunk.PrometheusXorChunk, + //promchunk.PrometheusXorChunk, + promchunk.PrometheusHistogramChunk, + //promchunk.PrometheusFloatHistogramChunk, } { enc := enc t.Run(enc.String(), func(t *testing.T) { @@ -48,14 +51,35 @@ func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int metric := labels.Labels{ {Name: model.MetricNameLabel, Value: "foo"}, } - pc := chunkenc.NewXORChunk() + pe := enc.PromChunkEncoding() + pc, err := chunkenc.NewEmptyChunk(pe) + require.NoError(t, err) appender, err := pc.Appender() require.NoError(t, err) ts := from - for i := 0; i < points; i++ { - appender.Append(int64(ts), float64(ts)) - ts = ts.Add(step) + + switch pe { + case chunkenc.EncXOR: + for i := 0; i < points; i++ { + appender.Append(int64(ts), float64(ts)) + ts = ts.Add(step) + } + case chunkenc.EncHistogram: + histograms := histogram_util.GenerateTestHistograms(int(from), int(step/time.Millisecond), points, 5, 20) + for i := 0; i < points; i++ { + _, _, appender, err = appender.AppendHistogram(nil, int64(ts), histograms[i], true) + require.NoError(t, err) + ts = ts.Add(step) + } + case chunkenc.EncFloatHistogram: + histograms := histogram_util.GenerateTestHistograms(int(from), int(step/time.Millisecond), points, 5, 20) + for i := 0; i < points; i++ { + _, _, appender, err = appender.AppendFloatHistogram(nil, int64(ts), histograms[i].ToFloat(nil), true) + require.NoError(t, err) + ts = ts.Add(step) + } } + ts = ts.Add(-step) // undo the add that we did just before exiting the loop return chunk.NewChunk(metric, pc, from, ts) } @@ -65,34 +89,71 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchu return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.NewIterator) } -func testIter(t require.TestingT, points int, iter chunkenc.Iterator) { +func testIter(t require.TestingT, points int, iter chunkenc.Iterator, enc promchunk.Encoding) { + histograms := histogram_util.GenerateTestHistograms(0, 1000, points, 5, 20) ets := model.TimeFromUnix(0) for i := 0; i < points; i++ { - require.NotEqual(t, iter.Next(), chunkenc.ValNone, strconv.Itoa(i)) - ts, v := iter.At() - require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) - require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + require.Equal(t, iter.Next(), enc.ChunkValueType(), strconv.Itoa(i)) + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i], v, strconv.Itoa(i)) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, int64(ets), ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i].ToFloat(nil), v, strconv.Itoa(i)) + } ets = ets.Add(step) } require.Equal(t, iter.Next(), chunkenc.ValNone) } -func testSeek(t require.TestingT, points int, iter chunkenc.Iterator) { +func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, enc promchunk.Encoding) { + histograms := histogram_util.GenerateTestHistograms(0, 1000, points, 5, 20) for i := 0; i < points; i += points / 10 { ets := int64(i * int(step/time.Millisecond)) - require.NotEqual(t, iter.Seek(ets), chunkenc.ValNone) - ts, v := iter.At() - require.EqualValues(t, ets, ts) - require.EqualValues(t, v, float64(ets)) + require.Equal(t, iter.Seek(ets), enc.ChunkValueType(), strconv.Itoa(i)) + + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, float64(ets), v, strconv.Itoa(i)) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i], v, strconv.Itoa(i)) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, ets, ts, strconv.Itoa(i)) + require.EqualValues(t, histograms[i].ToFloat(nil), v, strconv.Itoa(i)) + } require.NoError(t, iter.Err()) for j := i + 1; j < i+points/10; j++ { ets := int64(j * int(step/time.Millisecond)) - require.NotEqual(t, iter.Next(), chunkenc.ValNone) - ts, v := iter.At() - require.EqualValues(t, ets, ts) - require.EqualValues(t, float64(ets), v) + require.Equal(t, iter.Next(), enc.ChunkValueType(), strconv.Itoa(i)) + + switch enc { + case promchunk.PrometheusXorChunk: + ts, v := iter.At() + require.EqualValues(t, ets, ts) + require.EqualValues(t, float64(ets), v) + case promchunk.PrometheusHistogramChunk: + ts, v := iter.AtHistogram(nil) + require.EqualValues(t, ets, ts) + require.EqualValues(t, histograms[j], v) + case promchunk.PrometheusFloatHistogramChunk: + ts, v := iter.AtFloatHistogram(nil) + require.EqualValues(t, ets, ts) + require.EqualValues(t, histograms[j].ToFloat(nil), v) + } require.NoError(t, iter.Err()) } } @@ -109,11 +170,11 @@ func TestSeek(t *testing.T) { } for i := 0; i < chunk.BatchSize-1; i++ { - require.True(t, c.Seek(int64(i), 1)) + require.Equal(t, chunkenc.ValFloat, c.Seek(int64(i), 1)) } require.Equal(t, 1, it.seeks) - require.True(t, c.Seek(int64(chunk.BatchSize), 1)) + require.Equal(t, chunkenc.ValFloat, c.Seek(int64(chunk.BatchSize), 1)) require.Equal(t, 2, it.seeks) } @@ -121,18 +182,19 @@ type mockIterator struct { seeks int } -func (i *mockIterator) Scan() bool { - return true +func (i *mockIterator) Scan() chunkenc.ValueType { + return chunkenc.ValFloat } -func (i *mockIterator) FindAtOrAfter(model.Time) bool { +func (i *mockIterator) FindAtOrAfter(model.Time) chunkenc.ValueType { i.seeks++ - return true + return chunkenc.ValFloat } -func (i *mockIterator) Batch(size int) chunk.Batch { +func (i *mockIterator) Batch(size int, valType chunkenc.ValueType) chunk.Batch { batch := chunk.Batch{ - Length: chunk.BatchSize, + Length: chunk.BatchSize, + ValType: valType, } for i := 0; i < chunk.BatchSize; i++ { batch.Timestamps[i] = int64(i) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 7911314c1b..8c7ebdf062 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -4,6 +4,8 @@ import ( "container/heap" "sort" + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -36,7 +38,7 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { } for _, iter := range c.its { - if iter.Next(1) { + if iter.Next(1) != chunkenc.ValNone { c.h = append(c.h, iter) continue } @@ -50,7 +52,7 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { return c } -func (c *mergeIterator) Seek(t int64, size int) bool { +func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType { // Optimisation to see if the seek is within our current caches batches. found: @@ -74,14 +76,14 @@ found: c.batches = c.batches[:0] for _, iter := range c.its { - if iter.Seek(t, size) { + if iter.Seek(t, size) != chunkenc.ValNone { c.h = append(c.h, iter) continue } if err := iter.Err(); err != nil { c.currErr = err - return false + return chunkenc.ValNone } } @@ -91,7 +93,7 @@ found: return c.buildNextBatch(size) } -func (c *mergeIterator) Next(size int) bool { +func (c *mergeIterator) Next(size int) chunkenc.ValueType { // Pop the last built batch in a way that doesn't extend the slice. if len(c.batches) > 0 { copy(c.batches, c.batches[1:]) @@ -106,7 +108,7 @@ func (c *mergeIterator) nextBatchEndTime() int64 { return batch.Timestamps[batch.Length-1] } -func (c *mergeIterator) buildNextBatch(size int) bool { +func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType { // All we need to do is get enough batches that our first batch's last entry // is before all iterators next entry. for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { @@ -114,14 +116,17 @@ func (c *mergeIterator) buildNextBatch(size int) bool { c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size) c.batches = append(c.batches[:0], c.batchesBuf...) - if c.h[0].Next(size) { + if valType := c.h[0].Next(size); valType != chunkenc.ValNone { heap.Fix(&c.h, 0) } else { heap.Pop(&c.h) } } - return len(c.batches) > 0 + if len(c.batches) > 0 { + return c.batches[0].ValType + } + return chunkenc.ValNone } func (c *mergeIterator) AtTime() int64 { diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index ddd3b997c6..8ad0d16df4 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -19,10 +19,10 @@ func TestMergeIter(t *testing.T) { chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, newIteratorAdapter(iter)) + testIter(t, 200, newIteratorAdapter(iter), enc) iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testSeek(t, 200, newIteratorAdapter(iter)) + testSeek(t, 200, newIteratorAdapter(iter), enc) }) } @@ -41,9 +41,9 @@ func TestMergeHarder(t *testing.T) { from = from.Add(time.Duration(offset) * time.Second) } iter := newMergeIterator(chunks) - testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) iter = newMergeIterator(chunks) - testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) }) } diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index 58f268d9b1..0c4fd0ba0e 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -1,6 +1,8 @@ package batch import ( + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -20,14 +22,14 @@ func newNonOverlappingIterator(chunks []GenericChunk) *nonOverlappingIterator { return it } -func (it *nonOverlappingIterator) Seek(t int64, size int) bool { +func (it *nonOverlappingIterator) Seek(t int64, size int) chunkenc.ValueType { for { - if it.iter.Seek(t, size) { - return true + if valType := it.iter.Seek(t, size); valType != chunkenc.ValNone { + return valType } else if it.iter.Err() != nil { - return false + return chunkenc.ValNone } else if !it.next() { - return false + return chunkenc.ValNone } } } @@ -36,14 +38,14 @@ func (it *nonOverlappingIterator) MaxCurrentChunkTime() int64 { return it.iter.MaxCurrentChunkTime() } -func (it *nonOverlappingIterator) Next(size int) bool { +func (it *nonOverlappingIterator) Next(size int) chunkenc.ValueType { for { - if it.iter.Next(size) { - return true + if valType := it.iter.Next(size); valType != chunkenc.ValNone { + return valType } else if it.iter.Err() != nil { - return false + return chunkenc.ValNone } else if !it.next() { - return false + return chunkenc.ValNone } } } diff --git a/pkg/querier/batch/non_overlapping_test.go b/pkg/querier/batch/non_overlapping_test.go index 078650dbb0..2377e8c3fa 100644 --- a/pkg/querier/batch/non_overlapping_test.go +++ b/pkg/querier/batch/non_overlapping_test.go @@ -15,8 +15,8 @@ func TestNonOverlappingIter(t *testing.T) { for i := int64(0); i < 100; i++ { cs = append(cs, mkGenericChunk(t, model.TimeFromUnix(i*10), 10, enc)) } - testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) + testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) }) } @@ -31,7 +31,7 @@ func TestNonOverlappingIterSparse(t *testing.T) { mkGenericChunk(t, model.TimeFromUnix(95), 1, enc), mkGenericChunk(t, model.TimeFromUnix(96), 4, enc), } - testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) + testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs)), enc) }) } diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 1aff5337be..475eae3258 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -1,6 +1,9 @@ package batch import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" + promchunk "github.com/cortexproject/cortex/pkg/chunk" ) @@ -17,8 +20,11 @@ func (bs *batchStream) reset() { } } -func (bs *batchStream) hasNext() bool { - return len(*bs) > 0 +func (bs *batchStream) hasNext() chunkenc.ValueType { + if len(*bs) > 0 { + return (*bs)[0].ValType + } + return chunkenc.ValNone } func (bs *batchStream) next() { @@ -28,6 +34,16 @@ func (bs *batchStream) next() { } } +func (bs *batchStream) atHistogram() (int64, *histogram.Histogram) { + b := &(*bs)[0] + return b.Timestamps[b.Index], b.Histograms[b.Index] +} + +func (bs *batchStream) atFloatHistogram() (int64, *histogram.FloatHistogram) { + b := &(*bs)[0] + return b.Timestamps[b.Index], b.FloatHistograms[b.Index] +} + func (bs *batchStream) atTime() int64 { return (*bs)[0].Timestamps[(*bs)[0].Index] } @@ -46,52 +62,67 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt resultLen := 1 // Number of batches in the final result. b := &result[0] - // This function adds a new batch to the result - // if the current batch being appended is full. - checkForFullBatch := func() { - if b.Index == size { - // The batch reached it intended size. - // Add another batch the result - // and use it for further appending. - - // The Index is the place at which new sample - // has to be appended, hence it tells the length. - b.Length = b.Index - resultLen++ - if resultLen > len(result) { - // It is possible that result can grow longer - // then the one provided. - result = append(result, promchunk.Batch{}) - } - b = &result[resultLen-1] + // This function adds a new batch to the result. + nextBatch := func(valueType chunkenc.ValueType) { + // The Index is the place at which new sample + // has to be appended, hence it tells the length. + b.Length = b.Index + resultLen++ + if resultLen > len(result) { + // It is possible that result can grow longer + // then the one provided. + result = append(result, promchunk.Batch{}) } + b = &result[resultLen-1] + b.ValType = valueType } - for left.hasNext() && right.hasNext() { - checkForFullBatch() + populateVal := func(bs batchStream, valueType chunkenc.ValueType) { + if b.Index == 0 { + b.ValType = valueType + } else if b.Index == size || b.ValType != valueType { + // The batch reached it intended size or a new value type is used. + // Add another batch to the result and use it for further appending. + nextBatch(valueType) + } + switch valueType { + case chunkenc.ValFloat: + b.Timestamps[b.Index], b.Values[b.Index] = bs.at() + case chunkenc.ValHistogram: + b.Timestamps[b.Index], b.Histograms[b.Index] = bs.atHistogram() + case chunkenc.ValFloatHistogram: + b.Timestamps[b.Index], b.FloatHistograms[b.Index] = bs.atFloatHistogram() + default: + panic("unsupported value type") + } + b.Index++ + } + + for leftValueType, rightValueType := left.hasNext(), right.hasNext(); leftValueType != chunkenc.ValNone && rightValueType != chunkenc.ValNone; { t1, t2 := left.atTime(), right.atTime() if t1 < t2 { - b.Timestamps[b.Index], b.Values[b.Index] = left.at() + populateVal(left, leftValueType) left.next() + leftValueType = left.hasNext() } else if t1 > t2 { - b.Timestamps[b.Index], b.Values[b.Index] = right.at() + populateVal(right, rightValueType) right.next() + rightValueType = right.hasNext() } else { - b.Timestamps[b.Index], b.Values[b.Index] = left.at() + populateVal(left, leftValueType) left.next() + leftValueType = left.hasNext() right.next() + rightValueType = right.hasNext() } - b.Index++ } // This function adds all the samples from the provided // batchStream into the result in the same order. addToResult := func(bs batchStream) { - for ; bs.hasNext(); bs.next() { - checkForFullBatch() - b.Timestamps[b.Index], b.Values[b.Index] = bs.at() - b.Index++ - b.Length++ + for valueType := bs.hasNext(); valueType != chunkenc.ValNone; valueType = bs.hasNext() { + populateVal(bs, valueType) + bs.next() } } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index 932de41c6b..2274cf7aa0 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -4,62 +4,102 @@ import ( "strconv" "testing" + "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/require" promchunk "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" ) func TestStream(t *testing.T) { t.Parallel() - for i, tc := range []struct { - input1, input2 []promchunk.Batch - output batchStream - }{ - { - input1: []promchunk.Batch{mkBatch(0)}, - output: []promchunk.Batch{mkBatch(0)}, - }, + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + for i, tc := range []struct { + input1, input2 []promchunk.Batch + output batchStream + }{ + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + output: []promchunk.Batch{mkBatch(0, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0)}, - input2: []promchunk.Batch{mkBatch(0)}, - output: []promchunk.Batch{mkBatch(0)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + input2: []promchunk.Batch{mkBatch(0, enc)}, + output: []promchunk.Batch{mkBatch(0, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0)}, - input2: []promchunk.Batch{mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - input2: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, - }, + { + input1: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize/2, enc), mkBatch(2*promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(2*promchunk.BatchSize, enc)}, + }, - { - input1: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(5 * promchunk.BatchSize / 2)}, - input2: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, - }, - } { - tc := tc - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - result := make(batchStream, len(tc.input1)+len(tc.input2)) - result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) - require.Equal(t, batchStream(tc.output), result) - }) - } + { + input1: []promchunk.Batch{mkBatch(promchunk.BatchSize/2, enc), mkBatch(3*promchunk.BatchSize/2, enc), mkBatch(5*promchunk.BatchSize/2, enc)}, + input2: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(3*promchunk.BatchSize, enc)}, + output: []promchunk.Batch{mkBatch(0, enc), mkBatch(promchunk.BatchSize, enc), mkBatch(2*promchunk.BatchSize, enc), mkBatch(3*promchunk.BatchSize, enc)}, + }, + } { + tc := tc + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + result := make(batchStream, len(tc.input1)+len(tc.input2)) + result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) + require.Equal(t, tc.output, result) + }) + } + }) } -func mkBatch(from int64) promchunk.Batch { +func mkBatch(from int64, enc encoding.Encoding) promchunk.Batch { var result promchunk.Batch for i := int64(0); i < promchunk.BatchSize; i++ { result.Timestamps[i] = from + i - result.Values[i] = float64(from + i) + switch enc { + case encoding.PrometheusXorChunk: + result.Values[i] = float64(from + i) + case encoding.PrometheusHistogramChunk: + result.Histograms[i] = testHistogram(int(from+i), 5, 20) + case encoding.PrometheusFloatHistogramChunk: + result.FloatHistograms[i] = testHistogram(int(from+i), 5, 20).ToFloat(nil) + } } result.Length = promchunk.BatchSize + result.ValType = enc.ChunkValueType() return result } + +func testHistogram(count, numSpans, numBuckets int) *histogram.Histogram { + bucketsPerSide := numBuckets / 2 + spanLength := uint32(bucketsPerSide / numSpans) + h := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: uint64(count), + ZeroCount: uint64(count), + ZeroThreshold: 1e-128, + Sum: 18.4 * float64(count+1), + Schema: 2, + NegativeSpans: make([]histogram.Span, numSpans), + PositiveSpans: make([]histogram.Span, numSpans), + NegativeBuckets: make([]int64, bucketsPerSide), + PositiveBuckets: make([]int64, bucketsPerSide), + } + for j := 0; j < numSpans; j++ { + s := histogram.Span{Offset: 1, Length: spanLength} + h.NegativeSpans[j] = s + h.PositiveSpans[j] = s + } + + for j := 0; j < bucketsPerSide; j++ { + h.NegativeBuckets[j] = 1 + h.PositiveBuckets[j] = 1 + } + return h +} diff --git a/pkg/util/histogram/testutils.go b/pkg/util/histogram/testutils.go new file mode 100644 index 0000000000..a2617d8e56 --- /dev/null +++ b/pkg/util/histogram/testutils.go @@ -0,0 +1,55 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import "github.com/prometheus/prometheus/model/histogram" + +// Adapted from Prometheus model/histogram/test_utils.go GenerateBigTestHistograms. +func GenerateTestHistograms(from, step, numHistograms, numSpans, numBuckets int) []*histogram.Histogram { + bucketsPerSide := numBuckets / 2 + spanLength := uint32(bucketsPerSide / numSpans) + // Given all bucket deltas are 1, sum bucketsPerSide + 1. + observationCount := bucketsPerSide * (1 + bucketsPerSide) + + var histograms []*histogram.Histogram + for i := 0; i < numHistograms; i++ { + v := from + i*step + h := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: uint64(v + observationCount), + ZeroCount: uint64(v), + ZeroThreshold: 1e-128, + Sum: 18.4 * float64(v+1), + Schema: 2, + NegativeSpans: make([]histogram.Span, numSpans), + PositiveSpans: make([]histogram.Span, numSpans), + NegativeBuckets: make([]int64, bucketsPerSide), + PositiveBuckets: make([]int64, bucketsPerSide), + } + + for j := 0; j < numSpans; j++ { + s := histogram.Span{Offset: 1, Length: spanLength} + h.NegativeSpans[j] = s + h.PositiveSpans[j] = s + } + + for j := 0; j < bucketsPerSide; j++ { + h.NegativeBuckets[j] = 1 + h.PositiveBuckets[j] = 1 + } + + histograms = append(histograms, h) + } + return histograms +}