Skip to content

Commit

Permalink
Debug operator
Browse files Browse the repository at this point in the history
This commit introduces the debug operator- an operator intending to help
debugging complex queries. When running the debug operator using zq or
zed all encounterd values at the point of the operator are written to
stderr.

Closes #5181
  • Loading branch information
mattnibs committed Jul 30, 2024
1 parent 9c3da8b commit c48f269
Show file tree
Hide file tree
Showing 34 changed files with 2,690 additions and 2,256 deletions.
101 changes: 62 additions & 39 deletions api/queryio/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queryio

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -12,58 +13,80 @@ import (
"github.com/brimdata/zed/zson"
)

type Query struct {
reader *zngio.Reader
closer io.Closer
type scanner struct {
channel string
scanner zbuf.Scanner
closer io.Closer
progress zbuf.Progress
}

// NewQuery returns a Query that reads a ZNG-encoded query response
// from rc and decodes it. Closing the Query also closes rc.
func NewQuery(rc io.ReadCloser) *Query {
return &Query{
reader: zngio.NewReader(zed.NewContext(), rc),
closer: rc,
func NewScanner(ctx context.Context, rc io.ReadCloser) zbuf.Scanner {
s, err := zngio.NewReader(zed.NewContext(), rc).NewScanner(ctx, nil)
if err != nil {
// XXX This shouldn't happen since we don't have a filter.
panic(err)
}
return &scanner{
scanner: s,
closer: rc,
}
}

func (q *Query) Close() error {
err := q.reader.Close()
q.closer.Close()
return err
func (s *scanner) Progress() zbuf.Progress {
return s.progress
}

func (q *Query) Read() (*zed.Value, error) {
val, ctrl, err := q.reader.ReadPayload()
if ctrl != nil {
if ctrl.Format != zngio.ControlFormatZSON {
return nil, fmt.Errorf("unsupported app encoding: %v", ctrl.Format)
}
arena := zed.NewArena()
defer arena.Unref()
value, err := zson.ParseValue(zed.NewContext(), arena, string(ctrl.Bytes))
if err != nil {
return nil, fmt.Errorf("unable to parse Zed control message: %w (%s)", err, string(ctrl.Bytes))
func (s *scanner) Pull(done bool) (zbuf.Batch, error) {
again:
batch, err := s.scanner.Pull(done)
if err == nil {
if batch != nil {
return zbuf.Wrap(s.channel, batch), nil
}
var v interface{}
if err := unmarshaler.Unmarshal(value, &v); err != nil {
return nil, fmt.Errorf("unable to unmarshal Zed control message: %w (%s)", err, string(ctrl.Bytes))
}
return nil, controlToError(v)
return nil, s.closer.Close()
}
return val, err
}

func controlToError(ctrl interface{}) error {
switch ctrl := ctrl.(type) {
zctrl, ok := err.(*zbuf.Control)
if !ok {
return nil, err
}
v, err := s.marshalControl(zctrl)
if err != nil {
return nil, err
}
switch ctrl := v.(type) {
case *api.QueryChannelSet:
return &zbuf.Control{Message: zbuf.SetChannel(ctrl.Channel)}
s.channel = ctrl.Channel
goto again
case *api.QueryChannelEnd:
return &zbuf.Control{Message: zbuf.EndChannel(ctrl.Channel)}
eoc := zbuf.EndOfChannel(ctrl.Channel)
return &eoc, nil
case *api.QueryStats:
return &zbuf.Control{Message: ctrl.Progress}
s.progress.Add(ctrl.Progress)
goto again
case *api.QueryError:
return errors.New(ctrl.Error)
return nil, errors.New(ctrl.Error)
default:
return fmt.Errorf("unsupported control message: %T", ctrl)
return nil, fmt.Errorf("unsupported control message: %T", ctrl)
}
}

func (s *scanner) marshalControl(zctrl *zbuf.Control) (any, error) {
ctrl, ok := zctrl.Message.(*zngio.Control)
if !ok {
return nil, fmt.Errorf("unknown control type: %T", zctrl.Message)
}
if ctrl.Format != zngio.ControlFormatZSON {
return nil, fmt.Errorf("unsupported app encoding: %v", ctrl.Format)
}
arena := zed.NewArena()
defer arena.Unref()
value, err := zson.ParseValue(zed.NewContext(), arena, string(ctrl.Bytes))
if err != nil {
return nil, fmt.Errorf("unable to parse Zed control message: %w (%s)", err, string(ctrl.Bytes))
}
var v interface{}
if err := unmarshaler.Unmarshal(value, &v); err != nil {
return nil, fmt.Errorf("unable to unmarshal Zed control message: %w (%s)", err, string(ctrl.Bytes))
}
return v, nil
}
8 changes: 7 additions & 1 deletion cli/zq/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zq
import (
"flag"
"fmt"
"os"

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zfmt"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/zsonio"
)

var Cmd = &charm.Spec{
Expand Down Expand Up @@ -159,7 +161,11 @@ func (c *Command) Run(args []string) error {
return err
}
defer query.Pull(true)
err = zbuf.CopyPuller(writer, query)
out := map[string]zio.WriteCloser{
"main": writer,
"debug": zsonio.NewWriter(zio.NopCloser(os.Stderr), zsonio.WriterOpts{}),
}
err = zbuf.CopyMux(out, query)
if closeErr := writer.Close(); err == nil {
err = closeErr
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/zed/branch/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zbuf"
)

var Cmd = &charm.Spec{
Expand Down Expand Up @@ -132,8 +132,8 @@ func (c *Command) list(ctx context.Context, lake api.Interface) error {
w.Close()
return err
}
defer q.Close()
err = zio.Copy(w, q)
defer q.Pull(true)
err = zbuf.CopyPuller(w, q)
if closeErr := w.Close(); err == nil {
err = closeErr
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/zed/internal/lakemanage/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/runtime/sam/expr"
"github.com/brimdata/zed/runtime/sam/expr/extent"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zson"
"github.com/segmentio/ksuid"
)
Expand Down Expand Up @@ -66,20 +66,20 @@ from %q@%q:objects
`

type objectIterator struct {
reader zio.ReadCloser
reader zbuf.ProgressReadCloser
unmarshaler *zson.UnmarshalZNGContext
arena *zed.Arena
}

func newObjectIterator(ctx context.Context, lake api.Interface, head *lakeparse.Commitish) (*objectIterator, error) {
query := fmt.Sprintf(iteratorQuery, head.Pool, head.Branch, head.Pool, head.Branch)
r, err := lake.Query(ctx, nil, query)
q, err := lake.Query(ctx, nil, query)
if err != nil {
return nil, err
}
arena := zed.NewArena()
return &objectIterator{
reader: r,
reader: zbuf.ScannerReader(q),
unmarshaler: zson.NewZNGUnmarshaler().SetContext(zed.NewContext(), arena),
arena: arena,
}, nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/zed/log/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zbuf"
)

var Cmd = &charm.Spec{
Expand Down Expand Up @@ -76,8 +76,8 @@ func (c *Command) Run(args []string) error {
}
return err
}
defer q.Close()
err = zio.Copy(w, q)
defer q.Pull(true)
err = zbuf.CopyPuller(w, q)
if closeErr := w.Close(); err == nil {
err = closeErr
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/zed/ls/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zbuf"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -89,8 +89,8 @@ func (c *Command) Run(args []string) error {
w.Close()
return err
}
defer q.Close()
err = zio.Copy(w, q)
defer q.Pull(true)
err = zbuf.CopyPuller(w, q)
if closeErr := w.Close(); err == nil {
err = closeErr
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/zed/query/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"flag"
"os"

"github.com/brimdata/zed/cli/outputflags"
"github.com/brimdata/zed/cli/poolflags"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/zsonio"
)

var Cmd = &charm.Spec{
Expand Down Expand Up @@ -63,13 +65,17 @@ func (c *Command) Run(args []string) error {
return err
}
head, _ := c.poolFlags.HEAD()
query, err := lake.QueryWithControl(ctx, head, src, c.queryFlags.Includes...)
query, err := lake.Query(ctx, head, src, c.queryFlags.Includes...)
if err != nil {
w.Close()
return err
}
defer query.Close()
err = zio.Copy(w, zbuf.NoControl(query))
defer query.Pull(true)
out := map[string]zio.WriteCloser{
"main": w,
"debug": zsonio.NewWriter(zio.NopCloser(os.Stderr), zsonio.WriterOpts{}),
}
err = zbuf.CopyMux(out, query)
if closeErr := w.Close(); err == nil {
err = closeErr
}
Expand Down
13 changes: 13 additions & 0 deletions compiler/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ type (
KeywordPos int `json:"keyword_pos"`
Name *ID `json:"name"`
}
Debug struct {
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Expr Expr `json:"expr"`
}
)

// Source structure
Expand Down Expand Up @@ -781,6 +786,7 @@ func (*Sample) OpAST() {}
func (*Load) OpAST() {}
func (*Assert) OpAST() {}
func (*Output) OpAST() {}
func (*Debug) OpAST() {}

func (x *Scope) Pos() int {
if x.Decls != nil {
Expand Down Expand Up @@ -817,6 +823,7 @@ func (x *Sample) Pos() int { return x.KeywordPos }
func (x *Load) Pos() int { return x.KeywordPos }
func (x *Assert) Pos() int { return x.KeywordPos }
func (x *Output) Pos() int { return x.KeywordPos }
func (x *Debug) Pos() int { return x.KeywordPos }

func (x *Scope) End() int { return x.Body.End() }
func (x *Parallel) End() int { return x.Rparen }
Expand Down Expand Up @@ -926,6 +933,12 @@ func (x *Sample) End() int {
func (x *Load) End() int { return x.EndPos }
func (x *Assert) End() int { return x.Expr.End() }
func (x *Output) End() int { return x.Name.End() }
func (x *Debug) End() int {
if x.Expr != nil {
return x.Expr.End()
}
return x.KeywordPos + 6
}

// An Agg is an AST node that represents a aggregate function. The Name
// field indicates the aggregation method while the Expr field indicates
Expand Down
9 changes: 9 additions & 0 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ type (
Expr Expr `json:"expr"`
Order order.Which `json:"order"`
}
Mirror struct {
Kind string `json:"kind" unpack:""`
Main Seq `json:"main"`
Mirror Seq `json:"mirror"`
}
Over struct {
Kind string `json:"kind" unpack:""`
Defs []Def `json:"defs"`
Expand Down Expand Up @@ -314,6 +319,7 @@ func (*Over) OpNode() {}
func (*Vectorize) OpNode() {}
func (*Yield) OpNode() {}
func (*Merge) OpNode() {}
func (*Mirror) OpNode() {}
func (*Combine) OpNode() {}
func (*Scope) OpNode() {}
func (*Load) OpNode() {}
Expand Down Expand Up @@ -379,6 +385,9 @@ func Walk(seq Seq, post func(Seq) Seq) Seq {
for k := range op.Paths {
op.Paths[k] = Walk(op.Paths[k], post)
}
case *Mirror:
op.Main = Walk(op.Main, post)
op.Mirror = Walk(op.Mirror, post)
case *Scope:
op.Body = Walk(op.Body, post)
}
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var unpacker = unpack.New(
MapCall{},
MapExpr{},
Merge{},
Mirror{},
Output{},
Over{},
OverExpr{},
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var unpacker = unpack.New(
Conditional{},
ConstDecl{},
Cut{},
Debug{},
astzed.DefValue{},
Drop{},
Explode{},
Expand Down
3 changes: 3 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func describeOpAggs(op dag.Op, parents []field.List) []field.List {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Mirror:
aggs := describeAggs(op.Main, []field.List{nil})
return append(aggs, describeAggs(op.Mirror, []field.List{nil})...)
case *dag.Summarize:
// The field list for aggregation with no keys is an empty slice and
// not nil.
Expand Down
Loading

0 comments on commit c48f269

Please sign in to comment.