Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement histogram iterators for batch #5944

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunk
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
Expand Down Expand Up @@ -45,31 +46,45 @@ type prometheusChunkIterator struct {
it chunkenc.Iterator
}

func (p *prometheusChunkIterator) Scan() bool {
return p.it.Next() != chunkenc.ValNone
func (p *prometheusChunkIterator) Scan() chunkenc.ValueType {
return p.it.Next()
}

func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType {
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
// otherwise we cannot guarantee OLDEST.
p.it = p.c.Iterator(p.it)
return p.it.Seek(int64(time)) != chunkenc.ValNone
return p.it.Seek(int64(time))
}

func (p *prometheusChunkIterator) Batch(size int) Batch {
func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Batch {
var batch Batch
j := 0
for j < size {
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
switch valType {
case chunkenc.ValNone:
break
case chunkenc.ValFloat:
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
case chunkenc.ValHistogram:
t, v := p.it.AtHistogram(nil)
batch.Timestamps[j] = t
batch.Histograms[j] = v
case chunkenc.ValFloatHistogram:
t, v := p.it.AtFloatHistogram(nil)
batch.Timestamps[j] = t
batch.FloatHistograms[j] = v
}
j++
if j < size && p.it.Next() == chunkenc.ValNone {
break
}
}
batch.Index = 0
batch.Length = j
batch.ValType = valType
return batch
}

Expand All @@ -79,7 +94,14 @@ func (p *prometheusChunkIterator) Err() error {

type errorIterator string

func (e errorIterator) Scan() bool { return false }
func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false }
func (e errorIterator) Batch(size int) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
func (e errorIterator) Scan() chunkenc.ValueType { return chunkenc.ValNone }
func (e errorIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { return chunkenc.ValNone }
func (e errorIterator) Value() model.SamplePair { panic("no values") }
func (e errorIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) {
panic("no values")
}
func (e errorIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("no values")
}
func (e errorIterator) Batch(size int, valType chunkenc.ValueType) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
39 changes: 34 additions & 5 deletions pkg/chunk/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Encoding defines which encoding we are using, delta, doubledelta, or varbit
// Encoding defines which encoding we are using.
type Encoding byte

// String implements flag.Value.
Expand All @@ -26,28 +26,57 @@ func (e Encoding) PromChunkEncoding() chunkenc.Encoding {
return chunkenc.EncNone
}

func (e Encoding) ChunkValueType() chunkenc.ValueType {
if known, found := encodings[e]; found {
return known.ValueType
}
return chunkenc.ValNone
}

const (
// PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk.
// 4 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusXorChunk Encoding = 4
// PrometheusHistogramChunk is a wrapper around Prometheus histogram chunk.
// 5 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusHistogramChunk Encoding = 5
// PrometheusFloatHistogramChunk is a wrapper around Prometheus float histogram chunk.
// 6 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusFloatHistogramChunk Encoding = 6
)

type encoding struct {
Name string
Encoding chunkenc.Encoding
Name string
Encoding chunkenc.Encoding
ValueType chunkenc.ValueType
}

var encodings = map[Encoding]encoding{
PrometheusXorChunk: {
Name: "PrometheusXorChunk",
Encoding: chunkenc.EncXOR,
Name: "PrometheusXorChunk",
Encoding: chunkenc.EncXOR,
ValueType: chunkenc.ValFloat,
},
PrometheusHistogramChunk: {
Name: "PrometheusHistogramChunk",
Encoding: chunkenc.EncHistogram,
ValueType: chunkenc.ValHistogram,
},
PrometheusFloatHistogramChunk: {
Name: "PrometheusFloatHistogramChunk",
Encoding: chunkenc.EncFloatHistogram,
ValueType: chunkenc.ValFloatHistogram,
},
}

func FromPromChunkEncoding(enc chunkenc.Encoding) (Encoding, error) {
switch enc {
case chunkenc.EncXOR:
return PrometheusXorChunk, nil
case chunkenc.EncHistogram:
return PrometheusHistogramChunk, nil
case chunkenc.EncFloatHistogram:
return PrometheusFloatHistogramChunk, nil
}
return Encoding(0), errors.Errorf("unknown Prometheus chunk encoding: %v", enc)
}
19 changes: 12 additions & 7 deletions pkg/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package chunk

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Iterator enables efficient access to the content of a chunk. It is
Expand All @@ -13,14 +15,14 @@ type Iterator interface {
// chunk. Otherwise, it is the value following the last value scanned or
// found (by one of the Find... methods). Returns false if either the
// end of the chunk is reached or an error has occurred.
Scan() bool
Scan() chunkenc.ValueType
// Finds the oldest value at or after the provided time. Returns false
// if either the chunk contains no value at or after the provided time,
// or an error has occurred.
FindAtOrAfter(model.Time) bool
FindAtOrAfter(model.Time) chunkenc.ValueType
// Returns a batch of the provisded size; NB not idempotent! Should only be called
// once per Scan.
Batch(size int) Batch
Batch(size int, valType chunkenc.ValueType) Batch
// Returns the last error encountered. In general, an error signals data
// corruption in the chunk and requires quarantining.
Err() error
Expand All @@ -33,8 +35,11 @@ const BatchSize = 12
// Batch is a sorted set of (timestamp, value) pairs. They are intended to be
// small, and passed by value.
type Batch struct {
Timestamps [BatchSize]int64
Values [BatchSize]float64
Index int
Length int
Timestamps [BatchSize]int64
Values [BatchSize]float64
Histograms [BatchSize]*histogram.Histogram
FloatHistograms [BatchSize]*histogram.FloatHistogram
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea how I can change to make Batch smaller? It should be only 1 value type per batch and other array are redundant.

Index int
Length int
ValType chunkenc.ValueType
}
56 changes: 40 additions & 16 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package batch

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/iterators"
)

// GenericChunk is a generic chunk used by the batch iterator, in order to make the batch
Expand All @@ -32,10 +32,10 @@ func (c GenericChunk) Iterator(reuse chunk.Iterator) chunk.Iterator {
// iterator iterates over batches.
type iterator interface {
// Seek to the batch at (or after) time t.
Seek(t int64, size int) bool
Seek(t int64, size int) chunkenc.ValueType

// Next moves to the next batch.
Next(size int) bool
Next(size int) chunkenc.ValueType

// AtTime returns the start time of the next batch. Must only be called after
// Seek or Next have returned true.
Expand Down Expand Up @@ -78,62 +78,71 @@ type iteratorAdapter struct {
}

func newIteratorAdapter(underlying iterator) chunkenc.Iterator {
return iterators.NewCompatibleChunksIterator(&iteratorAdapter{
return &iteratorAdapter{
batchSize: 1,
underlying: underlying,
})
}
}

// Seek implements chunkenc.Iterator.
func (a *iteratorAdapter) Seek(t int64) bool {
func (a *iteratorAdapter) Seek(t int64) chunkenc.ValueType {

// Optimisation: fulfill the seek using current batch if possible.
if a.curr.Length > 0 && a.curr.Index < a.curr.Length {
if t <= a.curr.Timestamps[a.curr.Index] {
//In this case, the interface's requirement is met, so state of this
//iterator does not need any change.
return true
return a.curr.ValType
} else if t <= a.curr.Timestamps[a.curr.Length-1] {
//In this case, some timestamp between current sample and end of batch can fulfill
//the seek. Let's find it.
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
a.curr.Index++
}
return true
return a.curr.ValType
} else if t <= a.underlying.MaxCurrentChunkTime() {
// In this case, some timestamp inside the current underlying chunk can fulfill the seek.
// In this case we will call next until we find the sample as it will be faster than calling
// `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk.
// See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45
// https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95
for a.Next() {
for {
valType := a.Next()
if valType == chunkenc.ValNone {
break
}
if t <= a.curr.Timestamps[a.curr.Index] {
return true
return valType
}
}
}
}

a.curr.Length = -1
a.batchSize = 1
if a.underlying.Seek(t, a.batchSize) {
if valType := a.underlying.Seek(t, a.batchSize); valType != chunkenc.ValNone {
a.curr = a.underlying.Batch()
return a.curr.Index < a.curr.Length
if a.curr.Index < a.curr.Length {
return a.curr.ValType
}
}
return false
return chunkenc.ValNone
}

// Next implements chunkenc.Iterator.
func (a *iteratorAdapter) Next() bool {
func (a *iteratorAdapter) Next() chunkenc.ValueType {
a.curr.Index++
for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) {
for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) != chunkenc.ValNone {
a.curr = a.underlying.Batch()
a.batchSize = a.batchSize * 2
if a.batchSize > chunk.BatchSize {
a.batchSize = chunk.BatchSize
}
}
return a.curr.Index < a.curr.Length
if a.curr.Index < a.curr.Length {
return a.curr.ValType
}
return chunkenc.ValNone
}

// At implements chunkenc.Iterator.
Expand All @@ -145,3 +154,18 @@ func (a *iteratorAdapter) At() (int64, float64) {
func (a *iteratorAdapter) Err() error {
return nil
}

// AtHistogram implements chunkenc.Iterator.
func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index]
}

// AtFloatHistogram implements chunkenc.Iterator.
func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index]
}

// AtT implements chunkenc.Iterator.
func (a *iteratorAdapter) AtT() int64 {
return a.curr.Timestamps[a.curr.Index]
}
Loading
Loading