From ae1a1182e7b12f59333c3c3e3594094eed097ab9 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Tue, 5 Sep 2023 14:29:19 -0700 Subject: [PATCH] Compact: write vectors (#4756) Allow writing of vectors when compacting objects --- api/client/connection.go | 5 +- cmd/zed/compact/command.go | 8 +-- cmd/zed/manage/lakemanage/branch.go | 2 +- lake/api/api.go | 2 +- lake/api/local.go | 4 +- lake/api/remote.go | 4 +- lake/branch.go | 7 ++- lake/data/vector.go | 55 ++++++++++++++----- lake/writer.go | 82 +++++++++++++++++++++-------- lake/ztests/compact-vectors.yaml | 15 ++++++ runtime/exec/compact.go | 6 +-- service/handlers.go | 6 ++- service/ztests/compact-vectors.yaml | 17 ++++++ 13 files changed, 163 insertions(+), 50 deletions(-) create mode 100644 lake/ztests/compact-vectors.yaml create mode 100644 service/ztests/compact-vectors.yaml diff --git a/api/client/connection.go b/api/client/connection.go index f58203a0e5..b61df91482 100644 --- a/api/client/connection.go +++ b/api/client/connection.go @@ -319,8 +319,11 @@ func (c *Connection) Query(ctx context.Context, head *lakeparse.Commitish, src s return res, err } -func (c *Connection) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, message api.CommitMessage) (api.CommitResponse, error) { +func (c *Connection) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, writeVectors bool, message api.CommitMessage) (api.CommitResponse, error) { path := urlPath("pool", poolID.String(), "branch", branchName, "compact") + if writeVectors { + path += "?vectors=T" + } req := c.NewRequest(ctx, http.MethodPost, path, api.CompactRequest{ObjectIDs: objects}) if err := encodeCommitMessage(req, message); err != nil { return api.CommitResponse{}, err diff --git a/cmd/zed/compact/command.go b/cmd/zed/compact/command.go index fe62102060..ba205eaeeb 100644 --- a/cmd/zed/compact/command.go +++ b/cmd/zed/compact/command.go @@ -24,14 +24,16 @@ a commit on HEAD replacing the old objects with the new ones.`, type Command struct { *root.Command - commitFlags commitflags.Flags - poolFlags poolflags.Flags + commitFlags commitflags.Flags + poolFlags poolflags.Flags + writeVectors bool } func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { c := &Command{Command: parent.(*root.Command)} c.commitFlags.SetFlags(f) c.poolFlags.SetFlags(f) + f.BoolVar(&c.writeVectors, "vectors", false, "write vectors for compacted objects") return c, nil } @@ -57,7 +59,7 @@ func (c *Command) Run(args []string) error { if err != nil { return err } - commit, err := lake.Compact(ctx, poolID, head.Branch, ids, c.commitFlags.CommitMessage()) + commit, err := lake.Compact(ctx, poolID, head.Branch, ids, c.writeVectors, c.commitFlags.CommitMessage()) if err == nil && !c.LakeFlags.Quiet { fmt.Printf("%s compaction committed\n", commit) } diff --git a/cmd/zed/manage/lakemanage/branch.go b/cmd/zed/manage/lakemanage/branch.go index 22edeb13ac..58bd5c0224 100644 --- a/cmd/zed/manage/lakemanage/branch.go +++ b/cmd/zed/manage/lakemanage/branch.go @@ -64,7 +64,7 @@ func (c *compactTask) run(ctx context.Context) error { var found int var compacted int for run := range ch { - commit, err := c.lake.Compact(ctx, c.pool.ID, c.config.Branch, run.ObjectIDs(), api.CommitMessage{}) + commit, err := c.lake.Compact(ctx, c.pool.ID, c.config.Branch, run.ObjectIDs(), false, api.CommitMessage{}) if err != nil { return err } diff --git a/lake/api/api.go b/lake/api/api.go index 32a1a3f620..dc02dd4b4c 100644 --- a/lake/api/api.go +++ b/lake/api/api.go @@ -33,7 +33,7 @@ type Interface interface { CreateBranch(ctx context.Context, pool ksuid.KSUID, name string, parent ksuid.KSUID) error RemoveBranch(ctx context.Context, pool ksuid.KSUID, branchName string) error MergeBranch(ctx context.Context, pool ksuid.KSUID, childBranch, parentBranch string, message api.CommitMessage) (ksuid.KSUID, error) - Compact(ctx context.Context, pool ksuid.KSUID, branch string, objects []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error) + Compact(ctx context.Context, pool ksuid.KSUID, branch string, objects []ksuid.KSUID, writeVectors bool, message api.CommitMessage) (ksuid.KSUID, error) Load(ctx context.Context, zctx *zed.Context, pool ksuid.KSUID, branch string, r zio.Reader, message api.CommitMessage) (ksuid.KSUID, error) Delete(ctx context.Context, poolID ksuid.KSUID, branchName string, tags []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error) DeleteWhere(ctx context.Context, poolID ksuid.KSUID, branchName, src string, commit api.CommitMessage) (ksuid.KSUID, error) diff --git a/lake/api/local.go b/lake/api/local.go index 277b89a822..bab44eace0 100644 --- a/lake/api/local.go +++ b/lake/api/local.go @@ -97,12 +97,12 @@ func (l *local) MergeBranch(ctx context.Context, poolID ksuid.KSUID, childBranch return l.root.MergeBranch(ctx, poolID, childBranch, parentBranch, message.Author, message.Body) } -func (l *local) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, commit api.CommitMessage) (ksuid.KSUID, error) { +func (l *local) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, writeVectors bool, commit api.CommitMessage) (ksuid.KSUID, error) { pool, err := l.root.OpenPool(ctx, poolID) if err != nil { return ksuid.Nil, err } - return exec.Compact(ctx, l.root, pool, branchName, objects, commit.Author, commit.Body, commit.Meta) + return exec.Compact(ctx, l.root, pool, branchName, objects, writeVectors, commit.Author, commit.Body, commit.Meta) } func (l *local) AddIndexRules(ctx context.Context, rules []index.Rule) error { diff --git a/lake/api/remote.go b/lake/api/remote.go index 0a8b7e0280..f11ad930a6 100644 --- a/lake/api/remote.go +++ b/lake/api/remote.go @@ -76,8 +76,8 @@ func (r *remote) MergeBranch(ctx context.Context, poolID ksuid.KSUID, childBranc return res.Commit, err } -func (r *remote) Compact(ctx context.Context, poolID ksuid.KSUID, branch string, objects []ksuid.KSUID, commit api.CommitMessage) (ksuid.KSUID, error) { - res, err := r.conn.Compact(ctx, poolID, branch, objects, commit) +func (r *remote) Compact(ctx context.Context, poolID ksuid.KSUID, branch string, objects []ksuid.KSUID, writeVectors bool, commit api.CommitMessage) (ksuid.KSUID, error) { + res, err := r.conn.Compact(ctx, poolID, branch, objects, writeVectors, commit) return res.Commit, err } diff --git a/lake/branch.go b/lake/branch.go index aab51ec2b9..fe784fdbba 100644 --- a/lake/branch.go +++ b/lake/branch.go @@ -238,7 +238,7 @@ func (b *Branch) Revert(ctx context.Context, commit ksuid.KSUID, author, message }) } -func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object, author, message, meta string) (ksuid.KSUID, error) { +func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object, rollupVecs []ksuid.KSUID, author, message, meta string) (ksuid.KSUID, error) { if len(rollup) < 1 { return ksuid.Nil, errors.New("compact: one or more rollup objects required") } @@ -258,6 +258,11 @@ func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object, return nil, err } } + for _, id := range rollupVecs { + if err := patch.AddVector(id); err != nil { + return nil, err + } + } for _, o := range src { if err := patch.DeleteObject(o.ID); err != nil { return nil, err diff --git a/lake/data/vector.go b/lake/data/vector.go index a630d76d73..86f003807f 100644 --- a/lake/data/vector.go +++ b/lake/data/vector.go @@ -9,6 +9,7 @@ import ( "github.com/brimdata/zed" "github.com/brimdata/zed/pkg/bufwriter" "github.com/brimdata/zed/pkg/storage" + "github.com/brimdata/zed/vng" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zio/vngio" "github.com/brimdata/zed/zio/zngio" @@ -25,26 +26,16 @@ func CreateVector(ctx context.Context, engine storage.Engine, path *storage.URI, } return err } - put, err := engine.Put(ctx, VectorURI(path, id)) - if err != nil { - get.Close() - return err - } - writer, err := vngio.NewWriter(bufwriter.New(put), vngio.WriterOpts{ - ColumnThresh: vngio.DefaultColumnThresh, - SkewThresh: vngio.DefaultSkewThresh, - }) + w, err := NewVectorWriter(ctx, engine, path, id) if err != nil { get.Close() - put.Close() - DeleteVector(ctx, engine, path, id) return err } // Note here that writer.Close closes the Put but reader.Close does not // close the Get. reader := zngio.NewReader(zed.NewContext(), get) - err = zio.Copy(writer, reader) - if closeErr := writer.Close(); err == nil { + err = zio.Copy(w, reader) + if closeErr := w.Close(); err == nil { err = closeErr } if closeErr := reader.Close(); err == nil { @@ -54,11 +45,47 @@ func CreateVector(ctx context.Context, engine storage.Engine, path *storage.URI, err = closeErr } if err != nil { - DeleteVector(ctx, engine, path, id) + w.Abort() } return err } +type VectorWriter struct { + *vng.Writer + delete func() +} + +func (o *Object) NewVectorWriter(ctx context.Context, engine storage.Engine, path *storage.URI) (*VectorWriter, error) { + return NewVectorWriter(ctx, engine, path, o.ID) +} + +func NewVectorWriter(ctx context.Context, engine storage.Engine, path *storage.URI, id ksuid.KSUID) (*VectorWriter, error) { + put, err := engine.Put(ctx, VectorURI(path, id)) + if err != nil { + return nil, err + } + delete := func() { + DeleteVector(context.Background(), engine, path, id) + } + writer, err := vngio.NewWriter(bufwriter.New(put), vngio.WriterOpts{ + ColumnThresh: vngio.DefaultColumnThresh, + SkewThresh: vngio.DefaultSkewThresh, + }) + if err != nil { + delete() + return nil, err + } + return &VectorWriter{ + Writer: writer, + delete: delete, + }, nil +} + +func (w *VectorWriter) Abort() { + w.Close() + w.delete() +} + func DeleteVector(ctx context.Context, engine storage.Engine, path *storage.URI, id ksuid.KSUID) error { if err := engine.Delete(ctx, VectorURI(path, id)); err != nil && !errors.Is(err, fs.ErrNotExist) { return err diff --git a/lake/writer.go b/lake/writer.go index cbfd558ce7..9e9b112c6f 100644 --- a/lake/writer.go +++ b/lake/writer.go @@ -11,6 +11,7 @@ import ( "github.com/brimdata/zed/runtime/expr" "github.com/brimdata/zed/zbuf" "github.com/brimdata/zed/zio" + "github.com/segmentio/ksuid" "golang.org/x/sync/errgroup" ) @@ -163,22 +164,25 @@ func (w *Writer) Stats() ImportStats { } type SortedWriter struct { - comparator *expr.Comparator - ctx context.Context - pool *Pool - poolKey field.Path - lastKey *zed.Value - writer *data.Writer - objects []*data.Object + comparator *expr.Comparator + ctx context.Context + pool *Pool + poolKey field.Path + lastKey *zed.Value + writer *data.Writer + vectorEnabled bool + vectorWriter *data.VectorWriter + objects []*data.Object } -func NewSortedWriter(ctx context.Context, zctx *zed.Context, pool *Pool) *SortedWriter { +func NewSortedWriter(ctx context.Context, zctx *zed.Context, pool *Pool, vectorEnabled bool) *SortedWriter { return &SortedWriter{ - comparator: ImportComparator(zctx, pool), - ctx: ctx, - poolKey: poolKey(pool.SortKey), - pool: pool, - lastKey: &zed.Value{}, + comparator: ImportComparator(zctx, pool), + ctx: ctx, + poolKey: poolKey(pool.SortKey), + pool: pool, + lastKey: &zed.Value{}, + vectorEnabled: vectorEnabled, } } @@ -186,26 +190,30 @@ func (w *SortedWriter) Write(val *zed.Value) error { key := val.DerefPath(w.poolKey).MissingAsNull() again: if w.writer == nil { - o := data.NewObject() - w.objects = append(w.objects, &o) - var err error - w.writer, err = o.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.SortKey.Order, poolKey(w.pool.SortKey), w.pool.SeekStride) - if err != nil { + if err := w.newWriter(); err != nil { + w.Abort() return err } } if w.writer.BytesWritten() >= w.pool.Threshold && w.comparator.Compare(w.lastKey, key) != 0 { - writer := w.writer - w.writer = nil - if err := writer.Close(w.ctx); err != nil { + if err := w.Close(); err != nil { + w.Abort() return err } + w.writer, w.vectorWriter = nil, nil goto again } if err := w.writer.WriteWithKey(key, val); err != nil { + w.Abort() return err } + if w.vectorWriter != nil { + if err := w.vectorWriter.Write(val); err != nil { + w.Abort() + return err + } + } w.lastKey.CopyFrom(key) return nil } @@ -215,16 +223,48 @@ func (w *SortedWriter) Abort() { w.writer.Abort() w.writer = nil } + if w.vectorWriter != nil { + w.vectorWriter.Abort() + w.vectorWriter = nil + } // Delete all created objects. for _, o := range w.objects { o.Remove(w.ctx, w.pool.engine, w.pool.DataPath) } } +func (w *SortedWriter) newWriter() error { + o := data.NewObject() + var err error + w.writer, err = o.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.SortKey.Order, poolKey(w.pool.SortKey), w.pool.SeekStride) + if err != nil { + return err + } + if w.vectorEnabled { + w.vectorWriter, err = o.NewVectorWriter(w.ctx, w.pool.engine, w.pool.DataPath) + if err != nil { + return err + } + } + w.objects = append(w.objects, &o) + return nil +} + func (w *SortedWriter) Objects() []*data.Object { return w.objects } +func (w *SortedWriter) Vectors() []ksuid.KSUID { + if !w.vectorEnabled { + return nil + } + var ids []ksuid.KSUID + for _, o := range w.objects { + ids = append(ids, o.ID) + } + return ids +} + func (w *SortedWriter) Close() error { if w.writer == nil { return nil diff --git a/lake/ztests/compact-vectors.yaml b/lake/ztests/compact-vectors.yaml new file mode 100644 index 0000000000..b8c0168b5b --- /dev/null +++ b/lake/ztests/compact-vectors.yaml @@ -0,0 +1,15 @@ +script: | + export ZED_LAKE=test + zed init -q + zed create -use -q test + seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q - + seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q - + ids=$(zed query -f text 'from test@main:objects | yield "0x${hex(id)}"') + zed compact -q -vectors $ids + zed query -f lake 'from test@main:vectors' + +outputs: + - name: stdout + regexp: | + \w{27} 121B bytes 20 records + min 0 max 9 diff --git a/runtime/exec/compact.go b/runtime/exec/compact.go index ae036f1899..bf55af7f83 100644 --- a/runtime/exec/compact.go +++ b/runtime/exec/compact.go @@ -13,7 +13,7 @@ import ( "github.com/segmentio/ksuid" ) -func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName string, objectIDs []ksuid.KSUID, author, message, info string) (ksuid.KSUID, error) { +func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName string, objectIDs []ksuid.KSUID, writeVectors bool, author, message, info string) (ksuid.KSUID, error) { if len(objectIDs) < 2 { return ksuid.Nil, errors.New("compact: two or more source objects required") } @@ -38,7 +38,7 @@ func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName str octx := op.NewContext(ctx, zctx, nil) slicer := meta.NewSlicer(lister, zctx) puller := meta.NewSequenceScanner(octx, slicer, pool, nil, nil, nil) - w := lake.NewSortedWriter(ctx, zctx, pool) + w := lake.NewSortedWriter(ctx, zctx, pool, writeVectors) if err := zbuf.CopyPuller(w, puller); err != nil { puller.Pull(true) w.Abort() @@ -48,7 +48,7 @@ func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName str w.Abort() return ksuid.Nil, err } - commit, err := branch.CommitCompact(ctx, compact.SelectAll(), w.Objects(), author, message, info) + commit, err := branch.CommitCompact(ctx, compact.SelectAll(), w.Objects(), w.Vectors(), author, message, info) if err != nil { w.Abort() return ksuid.Nil, err diff --git a/service/handlers.go b/service/handlers.go index 92b761e5ee..920391f26d 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -444,6 +444,10 @@ func handleCompact(c *Core, w *ResponseWriter, r *Request) { if !ok { return } + writeVectors, ok := r.BoolFromQuery(w, "vectors") + if !ok { + return + } message, ok := r.decodeCommitMessage(w) if !ok { return @@ -452,7 +456,7 @@ func handleCompact(c *Core, w *ResponseWriter, r *Request) { if !ok { return } - commit, err := exec.Compact(r.Context(), c.root, pool, branch, req.ObjectIDs, message.Author, message.Body, message.Meta) + commit, err := exec.Compact(r.Context(), c.root, pool, branch, req.ObjectIDs, writeVectors, message.Author, message.Body, message.Meta) if err != nil { w.Error(err) return diff --git a/service/ztests/compact-vectors.yaml b/service/ztests/compact-vectors.yaml new file mode 100644 index 0000000000..3bcc13bae5 --- /dev/null +++ b/service/ztests/compact-vectors.yaml @@ -0,0 +1,17 @@ +script: | + source service.sh + zed create -use -q test + seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q - + seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q - + ids=$(zed query -f text 'from test@main:objects | yield "0x${hex(id)}"') + zed compact -q -vectors $ids + zed query -f lake 'from test@main:vectors' + +inputs: + - name: service.sh + +outputs: + - name: stdout + regexp: | + \w{27} 121B bytes 20 records + min 0 max 9