Skip to content

Commit

Permalink
Compact: write vectors
Browse files Browse the repository at this point in the history
Allow writing of vectors when compacting objects
  • Loading branch information
mattnibs committed Aug 28, 2023
1 parent 08fc271 commit fd87f28
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 43 deletions.
5 changes: 4 additions & 1 deletion api/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,11 @@ func (c *Connection) Query(ctx context.Context, head *lakeparse.Commitish, src s
return res, err
}

func (c *Connection) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, message api.CommitMessage) (api.CommitResponse, error) {
func (c *Connection) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, writeVectors bool, message api.CommitMessage) (api.CommitResponse, error) {
path := urlPath("pool", poolID.String(), "branch", branchName, "compact")
if writeVectors {
path += "?vectors=T"
}
req := c.NewRequest(ctx, http.MethodPost, path, api.CompactRequest{ObjectIDs: objects})
if err := encodeCommitMessage(req, message); err != nil {
return api.CommitResponse{}, err
Expand Down
8 changes: 5 additions & 3 deletions cmd/zed/compact/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ a commit on HEAD replacing the old objects with the new ones.`,

type Command struct {
*root.Command
commitFlags commitflags.Flags
poolFlags poolflags.Flags
commitFlags commitflags.Flags
poolFlags poolflags.Flags
writeVectors bool
}

func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*root.Command)}
c.commitFlags.SetFlags(f)
c.poolFlags.SetFlags(f)
f.BoolVar(&c.writeVectors, "vectors", false, "write vectors for compacted objects")
return c, nil
}

Expand All @@ -57,7 +59,7 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
commit, err := lake.Compact(ctx, poolID, head.Branch, ids, c.commitFlags.CommitMessage())
commit, err := lake.Compact(ctx, poolID, head.Branch, ids, c.writeVectors, c.commitFlags.CommitMessage())
if err == nil && !c.LakeFlags.Quiet {
fmt.Printf("%s compaction committed\n", commit)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/zed/manage/lakemanage/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *compactTask) run(ctx context.Context) error {
var found int
var compacted int
for run := range ch {
commit, err := c.lake.Compact(ctx, c.pool.ID, c.config.Branch, run.ObjectIDs(), api.CommitMessage{})
commit, err := c.lake.Compact(ctx, c.pool.ID, c.config.Branch, run.ObjectIDs(), false, api.CommitMessage{})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion lake/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Interface interface {
CreateBranch(ctx context.Context, pool ksuid.KSUID, name string, parent ksuid.KSUID) error
RemoveBranch(ctx context.Context, pool ksuid.KSUID, branchName string) error
MergeBranch(ctx context.Context, pool ksuid.KSUID, childBranch, parentBranch string, message api.CommitMessage) (ksuid.KSUID, error)
Compact(ctx context.Context, pool ksuid.KSUID, branch string, objects []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error)
Compact(ctx context.Context, pool ksuid.KSUID, branch string, objects []ksuid.KSUID, writeVectors bool, message api.CommitMessage) (ksuid.KSUID, error)
Load(ctx context.Context, zctx *zed.Context, pool ksuid.KSUID, branch string, r zio.Reader, message api.CommitMessage) (ksuid.KSUID, error)
Delete(ctx context.Context, poolID ksuid.KSUID, branchName string, tags []ksuid.KSUID, message api.CommitMessage) (ksuid.KSUID, error)
DeleteWhere(ctx context.Context, poolID ksuid.KSUID, branchName, src string, commit api.CommitMessage) (ksuid.KSUID, error)
Expand Down
4 changes: 2 additions & 2 deletions lake/api/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ func (l *local) MergeBranch(ctx context.Context, poolID ksuid.KSUID, childBranch
return l.root.MergeBranch(ctx, poolID, childBranch, parentBranch, message.Author, message.Body)
}

func (l *local) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, commit api.CommitMessage) (ksuid.KSUID, error) {
func (l *local) Compact(ctx context.Context, poolID ksuid.KSUID, branchName string, objects []ksuid.KSUID, writeVectors bool, commit api.CommitMessage) (ksuid.KSUID, error) {
pool, err := l.root.OpenPool(ctx, poolID)
if err != nil {
return ksuid.Nil, err
}
return exec.Compact(ctx, l.root, pool, branchName, objects, commit.Author, commit.Body, commit.Meta)
return exec.Compact(ctx, l.root, pool, branchName, objects, writeVectors, commit.Author, commit.Body, commit.Meta)
}

func (l *local) AddIndexRules(ctx context.Context, rules []index.Rule) error {
Expand Down
4 changes: 2 additions & 2 deletions lake/api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (r *remote) MergeBranch(ctx context.Context, poolID ksuid.KSUID, childBranc
return res.Commit, err
}

func (r *remote) Compact(ctx context.Context, poolID ksuid.KSUID, branch string, objects []ksuid.KSUID, commit api.CommitMessage) (ksuid.KSUID, error) {
res, err := r.conn.Compact(ctx, poolID, branch, objects, commit)
func (r *remote) Compact(ctx context.Context, poolID ksuid.KSUID, branch string, objects []ksuid.KSUID, writeVectors bool, commit api.CommitMessage) (ksuid.KSUID, error) {
res, err := r.conn.Compact(ctx, poolID, branch, objects, writeVectors, commit)
return res.Commit, err
}

Expand Down
7 changes: 6 additions & 1 deletion lake/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (b *Branch) Revert(ctx context.Context, commit ksuid.KSUID, author, message
})
}

func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object, author, message, meta string) (ksuid.KSUID, error) {
func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object, rollupVecs []ksuid.KSUID, author, message, meta string) (ksuid.KSUID, error) {
if len(rollup) < 1 {
return ksuid.Nil, errors.New("compact: one or more rollup objects required")
}
Expand All @@ -258,6 +258,11 @@ func (b *Branch) CommitCompact(ctx context.Context, src, rollup []*data.Object,
return nil, err
}
}
for _, id := range rollupVecs {
if err := patch.AddVector(id); err != nil {
return nil, err
}
}
for _, o := range src {
if err := patch.DeleteObject(o.ID); err != nil {
return nil, err
Expand Down
65 changes: 51 additions & 14 deletions lake/data/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"io"
"io/fs"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/bufwriter"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/vng"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/vngio"
"github.com/brimdata/zed/zio/zngio"
Expand All @@ -25,26 +27,16 @@ func CreateVector(ctx context.Context, engine storage.Engine, path *storage.URI,
}
return err
}
put, err := engine.Put(ctx, VectorURI(path, id))
if err != nil {
get.Close()
return err
}
writer, err := vngio.NewWriter(bufwriter.New(put), vngio.WriterOpts{
ColumnThresh: vngio.DefaultColumnThresh,
SkewThresh: vngio.DefaultSkewThresh,
})
w, err := NewVectorWriter(ctx, engine, path, id)
if err != nil {
get.Close()
put.Close()
DeleteVector(ctx, engine, path, id)
return err
}
// Note here that writer.Close closes the Put but reader.Close does not
// close the Get.
reader := zngio.NewReader(zed.NewContext(), get)
err = zio.Copy(writer, reader)
if closeErr := writer.Close(); err == nil {
err = zio.Copy(w, reader)
if closeErr := w.Close(); err == nil {
err = closeErr
}
if closeErr := reader.Close(); err == nil {
Expand All @@ -54,11 +46,56 @@ func CreateVector(ctx context.Context, engine storage.Engine, path *storage.URI,
err = closeErr
}
if err != nil {
DeleteVector(ctx, engine, path, id)
w.Abort()
}
return err
}

type VectorWriter struct {
*vng.Writer
closer io.Closer
delete func()
}

func (o *Object) NewVectorWriter(ctx context.Context, engine storage.Engine, path *storage.URI) (*VectorWriter, error) {
return NewVectorWriter(ctx, engine, path, o.ID)
}

func NewVectorWriter(ctx context.Context, engine storage.Engine, path *storage.URI, id ksuid.KSUID) (*VectorWriter, error) {
put, err := engine.Put(ctx, VectorURI(path, id))
if err != nil {
return nil, err
}
delete := func() {
DeleteVector(context.Background(), engine, path, id)
}
writer, err := vngio.NewWriter(bufwriter.New(put), vngio.WriterOpts{
ColumnThresh: vngio.DefaultColumnThresh,
SkewThresh: vngio.DefaultSkewThresh,
})
if err != nil {
delete()
return nil, err
}
return &VectorWriter{
Writer: writer,
closer: put,
delete: delete,
}, nil
}

func (w *VectorWriter) Close() error {
if err := w.closer.Close(); err != nil {
return err
}
return w.Writer.Close()
}

func (w *VectorWriter) Abort() {
w.Writer.Close()
w.delete()
}

func DeleteVector(ctx context.Context, engine storage.Engine, path *storage.URI, id ksuid.KSUID) error {
if err := engine.Delete(ctx, VectorURI(path, id)); err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
Expand Down
73 changes: 58 additions & 15 deletions lake/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -163,13 +164,15 @@ func (w *Writer) Stats() ImportStats {
}

type SortedWriter struct {
comparator *expr.Comparator
ctx context.Context
pool *Pool
poolKey field.Path
lastKey *zed.Value
writer *data.Writer
objects []*data.Object
comparator *expr.Comparator
ctx context.Context
pool *Pool
poolKey field.Path
lastKey *zed.Value
writer *data.Writer
vectorEnabled bool
vectorWriter *data.VectorWriter
objects []*data.Object
}

func NewSortedWriter(ctx context.Context, zctx *zed.Context, pool *Pool) *SortedWriter {
Expand All @@ -186,26 +189,30 @@ func (w *SortedWriter) Write(val *zed.Value) error {
key := val.DerefPath(w.poolKey).MissingAsNull()
again:
if w.writer == nil {
o := data.NewObject()
w.objects = append(w.objects, &o)
var err error
w.writer, err = o.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.SortKey.Order, poolKey(w.pool.SortKey), w.pool.SeekStride)
if err != nil {
if err := w.newWriter(); err != nil {
w.Abort()
return err
}
}
if w.writer.BytesWritten() >= w.pool.Threshold &&
w.comparator.Compare(w.lastKey, key) != 0 {
writer := w.writer
w.writer = nil
if err := writer.Close(w.ctx); err != nil {
if err := w.Close(); err != nil {
w.Abort()
return err
}
w.writer, w.vectorWriter = nil, nil
goto again
}
if err := w.writer.WriteWithKey(key, val); err != nil {
w.Abort()
return err
}
if w.vectorWriter != nil {
if err := w.vectorWriter.Write(val); err != nil {
w.Abort()
return err
}
}
w.lastKey.CopyFrom(key)
return nil
}
Expand All @@ -215,23 +222,59 @@ func (w *SortedWriter) Abort() {
w.writer.Abort()
w.writer = nil
}
if w.vectorWriter != nil {
w.vectorWriter.Abort()
w.vectorWriter = nil
}
// Delete all created objects.
for _, o := range w.objects {
o.Remove(w.ctx, w.pool.engine, w.pool.DataPath)
}
}

func (w *SortedWriter) newWriter() error {
o := data.NewObject()
var err error
w.writer, err = o.NewWriter(w.ctx, w.pool.engine, w.pool.DataPath, w.pool.SortKey.Order, poolKey(w.pool.SortKey), w.pool.SeekStride)
if err != nil {
return err
}
if w.vectorEnabled {
w.vectorWriter, err = o.NewVectorWriter(w.ctx, w.pool.engine, w.pool.DataPath)
if err != nil {
return err
}
}
w.objects = append(w.objects, &o)
return nil
}

func (w *SortedWriter) Objects() []*data.Object {
return w.objects
}

func (w *SortedWriter) Vectors() []ksuid.KSUID {
if !w.vectorEnabled {
return nil
}
var ids []ksuid.KSUID
for _, o := range w.objects {
ids = append(ids, o.ID)
}
return ids
}

func (w *SortedWriter) Close() error {
if w.writer == nil {
return nil
}
return w.writer.Close(w.ctx)
}

func (w *SortedWriter) EnableVectorWrite() {
w.vectorEnabled = true
}

type ImportStats struct {
ObjectsWritten int64
RecordBytesWritten int64
Expand Down
15 changes: 15 additions & 0 deletions lake/ztests/compact-vectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
script: |
export ZED_LAKE=test
zed init -q
zed create -use -q test
seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q -
seq 10 | zq '{ts:this-1,s:"val${this-1}"}' - | zed load -q -
ids=$(zed query -f text 'from test@main:objects | yield "0x${hex(id)}"')
zed compact -q -vectors $ids
zed query -f lake 'from test@main:vectors'
outputs:
- name: stdout
regexp: |
\w{27} 121B bytes 20 records
min 0 max 9
7 changes: 5 additions & 2 deletions runtime/exec/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/segmentio/ksuid"
)

func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName string, objectIDs []ksuid.KSUID, author, message, info string) (ksuid.KSUID, error) {
func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName string, objectIDs []ksuid.KSUID, writeVectors bool, author, message, info string) (ksuid.KSUID, error) {
if len(objectIDs) < 2 {
return ksuid.Nil, errors.New("compact: two or more source objects required")
}
Expand All @@ -39,6 +39,9 @@ func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName str
slicer := meta.NewSlicer(lister, zctx)
puller := meta.NewSequenceScanner(octx, slicer, pool, nil, nil, nil)
w := lake.NewSortedWriter(ctx, zctx, pool)
if writeVectors {
w.EnableVectorWrite()
}
if err := zbuf.CopyPuller(w, puller); err != nil {
puller.Pull(true)
w.Abort()
Expand All @@ -48,7 +51,7 @@ func Compact(ctx context.Context, lk *lake.Root, pool *lake.Pool, branchName str
w.Abort()
return ksuid.Nil, err
}
commit, err := branch.CommitCompact(ctx, compact.SelectAll(), w.Objects(), author, message, info)
commit, err := branch.CommitCompact(ctx, compact.SelectAll(), w.Objects(), w.Vectors(), author, message, info)
if err != nil {
w.Abort()
return ksuid.Nil, err
Expand Down
Loading

0 comments on commit fd87f28

Please sign in to comment.