Skip to content

Commit

Permalink
Implement vector head operator (#5227)
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt authored Aug 16, 2024
1 parent 60e2601 commit e7623c5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
2 changes: 2 additions & 0 deletions compiler/kernel/vop.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (b *Builder) compileVamScan(scan *dag.SeqScan, parent zbuf.Puller) (vector.

func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller, error) {
switch o := o.(type) {
case *dag.Head:
return vamop.NewHead(parent, o.Count), nil
case *dag.Summarize:
if name, ok := optimizer.IsCountByString(o); ok {
return vamop.NewCountByString(b.rctx.Zctx, parent, name), nil
Expand Down
61 changes: 61 additions & 0 deletions runtime/vam/op/head.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package op

import (
"github.com/brimdata/zed/vector"
)

type Head struct {
parent vector.Puller
limit, count int
}

func NewHead(parent vector.Puller, limit int) *Head {
return &Head{
parent: parent,
limit: limit,
}
}

func (h *Head) Pull(done bool) (vector.Any, error) {
if h.count >= h.limit {
// If we are at limit we already sent a done upstream,
// so for either sense of the done flag, we return EOS
// and reset our state.
h.count = 0
return nil, nil
}
if done {
if _, err := h.parent.Pull(true); err != nil {
return nil, err
}
h.count = 0
return nil, nil
}
again:
vec, err := h.parent.Pull(false)
if vec == nil || err != nil {
h.count = 0
return nil, err
}
remaining := h.limit - h.count
if remaining <= 0 {
goto again
}
if n := int(vec.Len()); n < remaining {
// This vector has fewer than the needed records.
// Send them all downstream and update the count.
h.count += n
return vec, nil
}
// This vector has more than the needed records.
// Signal to the parent that we are done.
if _, err := h.parent.Pull(true); err != nil {
return nil, err
}
h.count = h.limit
index := make([]uint32, remaining)
for k := range index {
index[k] = uint32(k)
}
return vector.NewView(index, vec), nil
}
20 changes: 20 additions & 0 deletions runtime/vam/op/ztests/head.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
script: |
zq -o t.vng -f vng -
zed dev vector query -z "head" t.vng
echo //
zed dev vector query -z "head 2" t.vng
inputs:
- name: stdin
data: |
1
2
3
outputs:
- name: stdout
data: |
1
//
1
2

0 comments on commit e7623c5

Please sign in to comment.