From bb4a6ec7696070f5847b5ac927bf89d04e0b2195 Mon Sep 17 00:00:00 2001 From: sahnib Date: Thu, 23 Mar 2023 11:01:50 -0700 Subject: [PATCH] Adds a OperatorTracer object to extract observability information from vector operators during query execution. The OperatorTracer is passed to the VectorOperator during Series() and Next() operations, and is a container to aggregate o11y information. Currently, the vector selector and matrix selector operators use this tracer for calculating query samples. --- engine/engine.go | 19 +++++++++------ execution/aggregate/hashaggregate.go | 26 ++++++++++----------- execution/aggregate/khashaggregate.go | 16 ++++++------- execution/binary/scalar.go | 16 ++++++------- execution/binary/vector.go | 18 +++++++-------- execution/exchange/coalesce.go | 14 +++++------ execution/exchange/concurrent.go | 12 +++++----- execution/exchange/dedup.go | 14 +++++------ execution/function/histogram.go | 16 ++++++------- execution/function/operator.go | 20 ++++++++-------- execution/model/operator.go | 11 +++++++-- execution/noop/operator.go | 8 +++++-- execution/remote/operator.go | 8 +++---- execution/scan/literal_selector.go | 4 ++-- execution/scan/matrix_selector.go | 27 ++++++++++++++++++++-- execution/scan/vector_selector.go | 10 ++++++-- execution/step_invariant/step_invariant.go | 14 +++++------ execution/unary/unary.go | 12 +++++----- 18 files changed, 155 insertions(+), 110 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 26f98f79a..2023ecf53 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -453,6 +453,8 @@ type compatibilityQuery struct { t QueryType resultSort resultSorter + querySamples *stats.QuerySamples + cancel context.CancelFunc } @@ -471,7 +473,8 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { defer cancel() q.cancel = cancel - resultSeries, err := q.Query.exec.Series(ctx) + tracer := newOperatorTracer(q.opts.EnablePerStepStats) + resultSeries, err := q.Query.exec.Series(ctx, tracer) if err != nil { return newErrResult(ret, err) } @@ -489,7 +492,7 @@ loop: case <-ctx.Done(): return newErrResult(ret, ctx.Err()) default: - r, err := q.Query.exec.Next(ctx) + r, err := q.Query.exec.Next(ctx, tracer) if err != nil { return newErrResult(ret, err) } @@ -612,11 +615,7 @@ func (q *compatibilityQuery) Statement() promparser.Statement { return nil } // Stats always returns empty query stats for now to avoid panic. func (q *compatibilityQuery) Stats() *stats.Statistics { - var enablePerStepStats bool - if q.opts != nil { - enablePerStepStats = q.opts.EnablePerStepStats - } - return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: stats.NewQuerySamples(enablePerStepStats)} + return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: q.querySamples} } func (q *compatibilityQuery) Close() { q.Cancel() } @@ -679,3 +678,9 @@ func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) { } } } + +func newOperatorTracer(enablePerStepStats bool) *model.OperatorTracer { + return &model.OperatorTracer{ + QuerySamples: stats.NewQuerySamples(enablePerStepStats), + } +} diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index d2bff408a..7d6d171b8 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -87,9 +87,9 @@ func (a *aggregate) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*aggregate] %v without (%v)", a.aggregation.String(), a.labels), ops } -func (a *aggregate) Series(ctx context.Context) ([]labels.Labels, error) { +func (a *aggregate) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - a.once.Do(func() { err = a.initializeTables(ctx) }) + a.once.Do(func() { err = a.initializeTables(ctx, tracer) }) if err != nil { return nil, err } @@ -101,14 +101,14 @@ func (a *aggregate) GetPool() *model.VectorPool { return a.vectorPool } -func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { +func (a *aggregate) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } - in, err := a.next.Next(ctx) + in, err := a.next.Next(ctx, tracer) if err != nil { return nil, err } @@ -117,13 +117,13 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { } defer a.next.GetPool().PutVectors(in) - a.once.Do(func() { err = a.initializeTables(ctx) }) + a.once.Do(func() { err = a.initializeTables(ctx, tracer) }) if err != nil { return nil, err } if a.paramOp != nil { - args, err := a.paramOp.Next(ctx) + args, err := a.paramOp.Next(ctx, tracer) if err != nil { return nil, err } @@ -156,7 +156,7 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { return result, nil } -func (a *aggregate) initializeTables(ctx context.Context) error { +func (a *aggregate) initializeTables(ctx context.Context, tracer *model.OperatorTracer) error { var ( tables []aggregateTable series []labels.Labels @@ -164,9 +164,9 @@ func (a *aggregate) initializeTables(ctx context.Context) error { ) if a.by && len(a.labels) == 0 { - tables, series, err = a.initializeVectorizedTables(ctx) + tables, series, err = a.initializeVectorizedTables(ctx, tracer) } else { - tables, series, err = a.initializeScalarTables(ctx) + tables, series, err = a.initializeScalarTables(ctx, tracer) } if err != nil { return err @@ -184,10 +184,10 @@ func (a *aggregate) workerTask(workerID int, arg float64, vector model.StepVecto return table.toVector(a.vectorPool) } -func (a *aggregate) initializeVectorizedTables(ctx context.Context) ([]aggregateTable, []labels.Labels, error) { +func (a *aggregate) initializeVectorizedTables(ctx context.Context, tracer *model.OperatorTracer) ([]aggregateTable, []labels.Labels, error) { tables, err := newVectorizedTables(a.stepsBatch, a.aggregation) if errors.Is(err, parse.ErrNotSupportedExpr) { - return a.initializeScalarTables(ctx) + return a.initializeScalarTables(ctx, tracer) } if err != nil { @@ -197,8 +197,8 @@ func (a *aggregate) initializeVectorizedTables(ctx context.Context) ([]aggregate return tables, []labels.Labels{{}}, nil } -func (a *aggregate) initializeScalarTables(ctx context.Context) ([]aggregateTable, []labels.Labels, error) { - series, err := a.next.Series(ctx) +func (a *aggregate) initializeScalarTables(ctx context.Context, tracer *model.OperatorTracer) ([]aggregateTable, []labels.Labels, error) { + series, err := a.next.Series(ctx, tracer) if err != nil { return nil, nil, err } diff --git a/execution/aggregate/khashaggregate.go b/execution/aggregate/khashaggregate.go index f2bb61cd8..89ba93a13 100644 --- a/execution/aggregate/khashaggregate.go +++ b/execution/aggregate/khashaggregate.go @@ -77,12 +77,12 @@ func NewKHashAggregate( return a, nil } -func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) { - in, err := a.next.Next(ctx) +func (a *kAggregate) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { + in, err := a.next.Next(ctx, tracer) if err != nil { return nil, err } - args, err := a.paramOp.Next(ctx) + args, err := a.paramOp.Next(ctx, tracer) if err != nil { return nil, err } @@ -108,7 +108,7 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) { return nil, errors.New("scalar argument not found") } - a.once.Do(func() { err = a.init(ctx) }) + a.once.Do(func() { err = a.init(ctx, tracer) }) if err != nil { return nil, err } @@ -123,9 +123,9 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) { return result, nil } -func (a *kAggregate) Series(ctx context.Context) ([]labels.Labels, error) { +func (a *kAggregate) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - a.once.Do(func() { err = a.init(ctx) }) + a.once.Do(func() { err = a.init(ctx, tracer) }) if err != nil { return nil, err } @@ -144,8 +144,8 @@ func (a *kAggregate) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*kaggregate] %v without (%v)", a.aggregation.String(), a.labels), []model.VectorOperator{a.paramOp, a.next} } -func (a *kAggregate) init(ctx context.Context) error { - series, err := a.next.Series(ctx) +func (a *kAggregate) init(ctx context.Context, tracer *model.OperatorTracer) error { + series, err := a.next.Series(ctx, tracer) if err != nil { return err } diff --git a/execution/binary/scalar.go b/execution/binary/scalar.go index 27f48f4a0..237a453a2 100644 --- a/execution/binary/scalar.go +++ b/execution/binary/scalar.go @@ -83,35 +83,35 @@ func (o *scalarOperator) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*scalarOperator] %s", parser.ItemTypeStr[o.opType]), []model.VectorOperator{o.next, o.scalar} } -func (o *scalarOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (o *scalarOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - o.seriesOnce.Do(func() { err = o.loadSeries(ctx) }) + o.seriesOnce.Do(func() { err = o.loadSeries(ctx, tracer) }) if err != nil { return nil, err } return o.series, nil } -func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *scalarOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } - in, err := o.next.Next(ctx) + in, err := o.next.Next(ctx, tracer) if err != nil { return nil, err } if in == nil { return nil, nil } - o.seriesOnce.Do(func() { err = o.loadSeries(ctx) }) + o.seriesOnce.Do(func() { err = o.loadSeries(ctx, tracer) }) if err != nil { return nil, err } - scalarIn, err := o.scalar.Next(ctx) + scalarIn, err := o.scalar.Next(ctx, tracer) if err != nil { return nil, err } @@ -157,8 +157,8 @@ func (o *scalarOperator) GetPool() *model.VectorPool { return o.pool } -func (o *scalarOperator) loadSeries(ctx context.Context) error { - vectorSeries, err := o.next.Series(ctx) +func (o *scalarOperator) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { + vectorSeries, err := o.next.Series(ctx, tracer) if err != nil { return err } diff --git a/execution/binary/vector.go b/execution/binary/vector.go index 011556d53..648aa5b50 100644 --- a/execution/binary/vector.go +++ b/execution/binary/vector.go @@ -83,9 +83,9 @@ func (o *vectorOperator) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*vectorOperator] %s %v ignoring %v group %v", parser.ItemTypeStr[o.opType], o.matching.Card.String(), o.matching.On, o.matching.Include), []model.VectorOperator{o.lhs, o.rhs} } -func (o *vectorOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (o *vectorOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - o.once.Do(func() { err = o.initOutputs(ctx) }) + o.once.Do(func() { err = o.initOutputs(ctx, tracer) }) if err != nil { return nil, err } @@ -93,19 +93,19 @@ func (o *vectorOperator) Series(ctx context.Context) ([]labels.Labels, error) { return o.series, nil } -func (o *vectorOperator) initOutputs(ctx context.Context) error { +func (o *vectorOperator) initOutputs(ctx context.Context, tracer *model.OperatorTracer) error { var highCardSide []labels.Labels var errChan = make(chan error, 1) go func() { var err error - highCardSide, err = o.lhs.Series(ctx) + highCardSide, err = o.lhs.Series(ctx, tracer) if err != nil { errChan <- err } close(errChan) }() - lowCardSide, err := o.rhs.Series(ctx) + lowCardSide, err := o.rhs.Series(ctx, tracer) if err != nil { return err } @@ -155,7 +155,7 @@ func (o *vectorOperator) initOutputs(ctx context.Context) error { return nil } -func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *vectorOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -166,14 +166,14 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { var lerrChan = make(chan error, 1) go func() { var err error - lhs, err = o.lhs.Next(ctx) + lhs, err = o.lhs.Next(ctx, tracer) if err != nil { lerrChan <- err } close(lerrChan) }() - rhs, rerr := o.rhs.Next(ctx) + rhs, rerr := o.rhs.Next(ctx, tracer) lerr := <-lerrChan if rerr != nil { return nil, rerr @@ -190,7 +190,7 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { } var err error - o.once.Do(func() { err = o.initOutputs(ctx) }) + o.once.Do(func() { err = o.initOutputs(ctx, tracer) }) if err != nil { return nil, err } diff --git a/execution/exchange/coalesce.go b/execution/exchange/coalesce.go index bf3f17154..68efca1d7 100644 --- a/execution/exchange/coalesce.go +++ b/execution/exchange/coalesce.go @@ -61,16 +61,16 @@ func (c *coalesce) GetPool() *model.VectorPool { return c.pool } -func (c *coalesce) Series(ctx context.Context) ([]labels.Labels, error) { +func (c *coalesce) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - c.once.Do(func() { err = c.loadSeries(ctx) }) + c.once.Do(func() { err = c.loadSeries(ctx, tracer) }) if err != nil { return nil, err } return c.series, nil } -func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { +func (c *coalesce) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -78,7 +78,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { } var err error - c.once.Do(func() { err = c.loadSeries(ctx) }) + c.once.Do(func() { err = c.loadSeries(ctx, tracer) }) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { go func(opIdx int, o model.VectorOperator) { defer c.wg.Done() - in, err := o.Next(ctx) + in, err := o.Next(ctx, tracer) if err != nil { errChan <- err return @@ -139,7 +139,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { return out, nil } -func (c *coalesce) loadSeries(ctx context.Context) error { +func (c *coalesce) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { var wg sync.WaitGroup var numSeries uint64 allSeries := make([][]labels.Labels, len(c.operators)) @@ -160,7 +160,7 @@ func (c *coalesce) loadSeries(ctx context.Context) error { } }() - series, err := c.operators[i].Series(ctx) + series, err := c.operators[i].Series(ctx, tracer) if err != nil { errChan <- err return diff --git a/execution/exchange/concurrent.go b/execution/exchange/concurrent.go index 7a64fda9a..5b91fbda5 100644 --- a/execution/exchange/concurrent.go +++ b/execution/exchange/concurrent.go @@ -37,15 +37,15 @@ func (c *concurrencyOperator) Explain() (me string, next []model.VectorOperator) return fmt.Sprintf("[*concurrencyOperator(buff=%v)]", c.bufferSize), []model.VectorOperator{c.next} } -func (c *concurrencyOperator) Series(ctx context.Context) ([]labels.Labels, error) { - return c.next.Series(ctx) +func (c *concurrencyOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { + return c.next.Series(ctx, tracer) } func (c *concurrencyOperator) GetPool() *model.VectorPool { return c.next.GetPool() } -func (c *concurrencyOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (c *concurrencyOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -53,7 +53,7 @@ func (c *concurrencyOperator) Next(ctx context.Context) ([]model.StepVector, err } c.once.Do(func() { - go c.pull(ctx) + go c.pull(ctx, tracer) go c.drainBufferOnCancel(ctx) }) @@ -68,7 +68,7 @@ func (c *concurrencyOperator) Next(ctx context.Context) ([]model.StepVector, err return r.stepVector, nil } -func (c *concurrencyOperator) pull(ctx context.Context) { +func (c *concurrencyOperator) pull(ctx context.Context, tracer *model.OperatorTracer) { defer close(c.buffer) for { @@ -77,7 +77,7 @@ func (c *concurrencyOperator) pull(ctx context.Context) { c.buffer <- maybeStepVector{err: ctx.Err()} return default: - r, err := c.next.Next(ctx) + r, err := c.next.Next(ctx, tracer) if err != nil { c.buffer <- maybeStepVector{err: err} return diff --git a/execution/exchange/dedup.go b/execution/exchange/dedup.go index 5efbdbf0f..4b4ec1e24 100644 --- a/execution/exchange/dedup.go +++ b/execution/exchange/dedup.go @@ -46,14 +46,14 @@ func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator) model.V } } -func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (d *dedupOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { var err error - d.once.Do(func() { err = d.loadSeries(ctx) }) + d.once.Do(func() { err = d.loadSeries(ctx, tracer) }) if err != nil { return nil, err } - in, err := d.next.Next(ctx) + in, err := d.next.Next(ctx, tracer) if err != nil { return nil, err } @@ -95,9 +95,9 @@ func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { return result, nil } -func (d *dedupOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (d *dedupOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - d.once.Do(func() { err = d.loadSeries(ctx) }) + d.once.Do(func() { err = d.loadSeries(ctx, tracer) }) if err != nil { return nil, err } @@ -112,8 +112,8 @@ func (d *dedupOperator) Explain() (me string, next []model.VectorOperator) { return "[*dedup]", []model.VectorOperator{d.next} } -func (d *dedupOperator) loadSeries(ctx context.Context) error { - series, err := d.next.Series(ctx) +func (d *dedupOperator) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { + series, err := d.next.Series(ctx, tracer) if err != nil { return err } diff --git a/execution/function/histogram.go b/execution/function/histogram.go index 1bbd941fd..dd9cba2fc 100644 --- a/execution/function/histogram.go +++ b/execution/function/histogram.go @@ -63,9 +63,9 @@ func (o *histogramOperator) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*functionOperator] histogram_quantile(%v)", o.funcArgs), next } -func (o *histogramOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (o *histogramOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error - o.once.Do(func() { err = o.loadSeries(ctx) }) + o.once.Do(func() { err = o.loadSeries(ctx, tracer) }) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (o *histogramOperator) GetPool() *model.VectorPool { return o.pool } -func (o *histogramOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *histogramOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -85,12 +85,12 @@ func (o *histogramOperator) Next(ctx context.Context) ([]model.StepVector, error } var err error - o.once.Do(func() { err = o.loadSeries(ctx) }) + o.once.Do(func() { err = o.loadSeries(ctx, tracer) }) if err != nil { return nil, err } - scalars, err := o.scalarOp.Next(ctx) + scalars, err := o.scalarOp.Next(ctx, tracer) if err != nil { return nil, err } @@ -99,7 +99,7 @@ func (o *histogramOperator) Next(ctx context.Context) ([]model.StepVector, error return nil, nil } - vectors, err := o.vectorOp.Next(ctx) + vectors, err := o.vectorOp.Next(ctx, tracer) if err != nil { return nil, err } @@ -174,8 +174,8 @@ func (o *histogramOperator) processInputSeries(vectors []model.StepVector) ([]mo return out, nil } -func (o *histogramOperator) loadSeries(ctx context.Context) error { - series, err := o.vectorOp.Series(ctx) +func (o *histogramOperator) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { + series, err := o.vectorOp.Series(ctx, tracer) if err != nil { return err } diff --git a/execution/function/operator.go b/execution/function/operator.go index 959e51dce..51f6a4512 100644 --- a/execution/function/operator.go +++ b/execution/function/operator.go @@ -51,7 +51,7 @@ func (o *noArgFunctionOperator) Explain() (me string, next []model.VectorOperato return fmt.Sprintf("[*noArgFunctionOperator] %v()", o.funcExpr.Func.Name), []model.VectorOperator{} } -func (o *noArgFunctionOperator) Series(_ context.Context) ([]labels.Labels, error) { +func (o *noArgFunctionOperator) Series(_ context.Context, _ *model.OperatorTracer) ([]labels.Labels, error) { return o.series, nil } @@ -59,7 +59,7 @@ func (o *noArgFunctionOperator) GetPool() *model.VectorPool { return o.vectorPool } -func (o *noArgFunctionOperator) Next(_ context.Context) ([]model.StepVector, error) { +func (o *noArgFunctionOperator) Next(_ context.Context, _ *model.OperatorTracer) ([]model.StepVector, error) { if o.currentStep > o.maxt { return nil, nil } @@ -144,8 +144,8 @@ func (o *functionOperator) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*functionOperator] %v(%v)", o.funcExpr.Func.Name, o.funcExpr.Args), o.nextOps } -func (o *functionOperator) Series(ctx context.Context) ([]labels.Labels, error) { - if err := o.loadSeries(ctx); err != nil { +func (o *functionOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { + if err := o.loadSeries(ctx, tracer); err != nil { return nil, err } @@ -156,20 +156,20 @@ func (o *functionOperator) GetPool() *model.VectorPool { return o.nextOps[o.vectorIndex].GetPool() } -func (o *functionOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *functionOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } - if err := o.loadSeries(ctx); err != nil { + if err := o.loadSeries(ctx, tracer); err != nil { return nil, err } // Process non-variadic single/multi-arg instant vector and scalar input functions. // Call next on vector input. - vectors, err := o.nextOps[o.vectorIndex].Next(ctx) + vectors, err := o.nextOps[o.vectorIndex].Next(ctx, tracer) if err != nil { return nil, err } @@ -184,7 +184,7 @@ func (o *functionOperator) Next(ctx context.Context) ([]model.StepVector, error) continue } - scalarVectors, err := o.nextOps[i].Next(ctx) + scalarVectors, err := o.nextOps[i].Next(ctx, tracer) if err != nil { return nil, err } @@ -265,7 +265,7 @@ func (o *functionOperator) newFunctionArgs(vector model.StepVector, batchIndex i } } -func (o *functionOperator) loadSeries(ctx context.Context) error { +func (o *functionOperator) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { var err error o.once.Do(func() { if o.funcExpr.Func.Name == "vector" { @@ -278,7 +278,7 @@ func (o *functionOperator) loadSeries(ctx context.Context) error { return } - series, loadErr := o.nextOps[o.vectorIndex].Series(ctx) + series, loadErr := o.nextOps[o.vectorIndex].Series(ctx, tracer) if loadErr != nil { err = loadErr return diff --git a/execution/model/operator.go b/execution/model/operator.go index 40d04eb67..2a65c2de1 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -5,19 +5,26 @@ package model import ( "context" + "github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/model/labels" ) +// OperatorTracer aggregates the observability information from all VectorOperator operators +// during query execution. +type OperatorTracer struct { + QuerySamples *stats.QuerySamples +} + // VectorOperator performs operations on series in step by step fashion. type VectorOperator interface { // Next yields vectors of samples from all series for one or more execution steps. - Next(ctx context.Context) ([]StepVector, error) + Next(ctx context.Context, tracer *OperatorTracer) ([]StepVector, error) // Series returns all series that the operator will process during Next results. // The result can be used by upstream operators to allocate output tables and buffers // before starting to process samples. - Series(ctx context.Context) ([]labels.Labels, error) + Series(ctx context.Context, tracer *OperatorTracer) ([]labels.Labels, error) // GetPool returns pool of vectors that can be shared across operators. GetPool() *VectorPool diff --git a/execution/noop/operator.go b/execution/noop/operator.go index 29e8a3cf2..10b22caa2 100644 --- a/execution/noop/operator.go +++ b/execution/noop/operator.go @@ -15,9 +15,13 @@ type operator struct{} func NewOperator() model.VectorOperator { return &operator{} } -func (o operator) Next(ctx context.Context) ([]model.StepVector, error) { return nil, nil } +func (o operator) Next(_ context.Context, _ *model.OperatorTracer) ([]model.StepVector, error) { + return nil, nil +} -func (o operator) Series(ctx context.Context) ([]labels.Labels, error) { return nil, nil } +func (o operator) Series(_ context.Context, _ *model.OperatorTracer) ([]labels.Labels, error) { + return nil, nil +} func (o operator) GetPool() *model.VectorPool { return nil } diff --git a/execution/remote/operator.go b/execution/remote/operator.go index dfcc90215..36253f39d 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -34,12 +34,12 @@ func NewExecution(query promql.Query, pool *model.VectorPool, opts *query.Option } } -func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) { - return e.vectorSelector.Series(ctx) +func (e *Execution) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { + return e.vectorSelector.Series(ctx, tracer) } -func (e *Execution) Next(ctx context.Context) ([]model.StepVector, error) { - next, err := e.vectorSelector.Next(ctx) +func (e *Execution) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { + next, err := e.vectorSelector.Next(ctx, tracer) if next == nil { // Closing the storage prematurely can lead to results from the query // engine to be recycled. Because of this, we close the storage only diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index 9b9d68c98..bbb51ead2 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -45,7 +45,7 @@ func (o *numberLiteralSelector) Explain() (me string, next []model.VectorOperato return fmt.Sprintf("[*numberLiteralSelector] %v", o.val), nil } -func (o *numberLiteralSelector) Series(context.Context) ([]labels.Labels, error) { +func (o *numberLiteralSelector) Series(_ context.Context, _ *model.OperatorTracer) ([]labels.Labels, error) { o.loadSeries() return o.series, nil } @@ -54,7 +54,7 @@ func (o *numberLiteralSelector) GetPool() *model.VectorPool { return o.vectorPool } -func (o *numberLiteralSelector) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *numberLiteralSelector) Next(ctx context.Context, _ *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() diff --git a/execution/scan/matrix_selector.go b/execution/scan/matrix_selector.go index be4bd33d9..0254288a6 100644 --- a/execution/scan/matrix_selector.go +++ b/execution/scan/matrix_selector.go @@ -8,6 +8,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "time" "github.com/prometheus/prometheus/model/labels" @@ -97,7 +98,7 @@ func (o *matrixSelector) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*matrixSelector] {%v}[%s] %v mod %v", o.storage.Matchers(), r, o.shard, o.numShards), nil } -func (o *matrixSelector) Series(ctx context.Context) ([]labels.Labels, error) { +func (o *matrixSelector) Series(ctx context.Context, _ *model.OperatorTracer) ([]labels.Labels, error) { if err := o.loadSeries(ctx); err != nil { return nil, err } @@ -108,7 +109,7 @@ func (o *matrixSelector) GetPool() *model.VectorPool { return o.vectorPool } -func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *matrixSelector) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -125,6 +126,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { vectors := o.vectorPool.GetVectorBatch() ts := o.currentStep + var currSamples int64 = 0 for i := 0; i < len(o.scanners); i++ { var ( series = o.scanners[i] @@ -151,6 +153,8 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { return nil, err } + currSamples += o.countSamples(rangePoints) + // TODO(saswatamcode): Handle multi-arg functions for matrixSelectors. // Also, allow operator to exist independently without being nested // under parser.Call by implementing new data model. @@ -191,9 +195,25 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { } o.currentStep += o.step * int64(o.numSteps) + atomic.AddInt64(&tracer.QuerySamples.TotalSamples, currSamples) + return vectors, nil } +func (o *matrixSelector) countSamples(points []promql.Point) int64 { + var samples int64 = 0 + + for _, point := range points { + if point.H != nil { + samples += int64(len(point.H.NegativeBuckets) + len(point.H.PositiveBuckets)) + } else { + samples += 1 + } + } + + return samples +} + func (o *matrixSelector) loadSeries(ctx context.Context) error { var err error o.once.Do(func() { @@ -246,6 +266,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // are populated from the iterator. // TODO(fpetkovski): Add max samples limit. func selectPoints(it *storage.BufferedSeriesIterator, mint, maxt int64, out []promql.Point) ([]promql.Point, error) { + var currSamples int64 = 0 if len(out) > 0 && out[len(out)-1].T >= mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: @@ -271,6 +292,7 @@ func selectPoints(it *storage.BufferedSeriesIterator, mint, maxt int64, out []pr } buf := it.Buffer() + loop: for { switch buf.Next() { @@ -299,6 +321,7 @@ loop: } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { + currSamples += 1 out = append(out, promql.Point{T: t, V: v}) } } diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 377b66993..abe40efb1 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/efficientgo/core/errors" @@ -78,7 +79,7 @@ func (o *vectorSelector) Explain() (me string, next []model.VectorOperator) { return fmt.Sprintf("[*vectorSelector] {%v} %v mod %v", o.storage.Matchers(), o.shard, o.numShards), nil } -func (o *vectorSelector) Series(ctx context.Context) ([]labels.Labels, error) { +func (o *vectorSelector) Series(ctx context.Context, _ *model.OperatorTracer) ([]labels.Labels, error) { if err := o.loadSeries(ctx); err != nil { return nil, err } @@ -89,7 +90,7 @@ func (o *vectorSelector) GetPool() *model.VectorPool { return o.vectorPool } -func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { +func (o *vectorSelector) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -106,6 +107,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { vectors := o.vectorPool.GetVectorBatch() ts := o.currentStep + var currentSamples int64 = 0 for i := 0; i < len(o.scanners); i++ { var ( series = o.scanners[i] @@ -122,8 +124,10 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { } if ok { if h != nil { + currentSamples += int64(len(h.PositiveBuckets) + len(h.NegativeBuckets)) vectors[currStep].AppendHistogram(o.vectorPool, series.signature, h) } else { + currentSamples += 1 vectors[currStep].AppendSample(o.vectorPool, series.signature, v) } } @@ -137,6 +141,8 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { } o.currentStep += o.step * int64(o.numSteps) + atomic.AddInt64(&tracer.QuerySamples.TotalSamples, currentSamples) + return vectors, nil } diff --git a/execution/step_invariant/step_invariant.go b/execution/step_invariant/step_invariant.go index d3032b1bb..5fdd49891 100644 --- a/execution/step_invariant/step_invariant.go +++ b/execution/step_invariant/step_invariant.go @@ -69,10 +69,10 @@ func NewStepInvariantOperator( return u, nil } -func (u *stepInvariantOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (u *stepInvariantOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { var err error u.seriesOnce.Do(func() { - u.series, err = u.next.Series(ctx) + u.series, err = u.next.Series(ctx, tracer) }) if err != nil { return nil, err @@ -84,7 +84,7 @@ func (u *stepInvariantOperator) GetPool() *model.VectorPool { return u.vectorPool } -func (u *stepInvariantOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (u *stepInvariantOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { if u.currentStep > u.maxt { return nil, nil } @@ -96,10 +96,10 @@ func (u *stepInvariantOperator) Next(ctx context.Context) ([]model.StepVector, e } if !u.cacheResult { - return u.next.Next(ctx) + return u.next.Next(ctx, tracer) } - if err := u.cacheInputVector(ctx); err != nil { + if err := u.cacheInputVector(ctx, tracer); err != nil { return nil, err } @@ -119,11 +119,11 @@ func (u *stepInvariantOperator) Next(ctx context.Context) ([]model.StepVector, e return result, nil } -func (u *stepInvariantOperator) cacheInputVector(ctx context.Context) error { +func (u *stepInvariantOperator) cacheInputVector(ctx context.Context, tracer *model.OperatorTracer) error { var err error var in []model.StepVector u.cacheVectorOnce.Do(func() { - in, err = u.next.Next(ctx) + in, err = u.next.Next(ctx, tracer) if err != nil { return } diff --git a/execution/unary/unary.go b/execution/unary/unary.go index e8af994b7..f1b85ee3d 100644 --- a/execution/unary/unary.go +++ b/execution/unary/unary.go @@ -34,18 +34,18 @@ func NewUnaryNegation( return u, nil } -func (u *unaryNegation) Series(ctx context.Context) ([]labels.Labels, error) { - if err := u.loadSeries(ctx); err != nil { +func (u *unaryNegation) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) { + if err := u.loadSeries(ctx, tracer); err != nil { return nil, err } return u.series, nil } -func (u *unaryNegation) loadSeries(ctx context.Context) error { +func (u *unaryNegation) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error { var err error u.once.Do(func() { var series []labels.Labels - series, err = u.next.Series(ctx) + series, err = u.next.Series(ctx, tracer) if err != nil { return } @@ -62,14 +62,14 @@ func (u *unaryNegation) GetPool() *model.VectorPool { return u.next.GetPool() } -func (u *unaryNegation) Next(ctx context.Context) ([]model.StepVector, error) { +func (u *unaryNegation) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() default: } - in, err := u.next.Next(ctx) + in, err := u.next.Next(ctx, tracer) if err != nil { return nil, err }