Skip to content

Commit

Permalink
vam: Add union aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
mattnibs committed Jan 18, 2025
1 parent 20b6abb commit 1b5f6bf
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 7 deletions.
2 changes: 1 addition & 1 deletion runtime/sam/expr/agg/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewPattern(op string, hasarg bool) (Pattern, error) {
}
case "union":
pattern = func() Function {
return newUnion()
return NewUnion()
}
case "collect":
pattern = func() Function {
Expand Down
8 changes: 4 additions & 4 deletions runtime/sam/expr/agg/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Union struct {

var _ Function = (*Union)(nil)

func newUnion() *Union {
func NewUnion() *Union {
return &Union{
types: make(map[super.Type]map[string]struct{}),
}
Expand All @@ -22,10 +22,10 @@ func (u *Union) Consume(val super.Value) {
if val.IsNull() {
return
}
u.update(val.Type(), val.Bytes())
u.Update(val.Type(), val.Bytes())
}

func (u *Union) update(typ super.Type, b zcode.Bytes) {
func (u *Union) Update(typ super.Type, b zcode.Bytes) {
m, ok := u.types[typ]
if !ok {
m = make(map[string]struct{})
Expand Down Expand Up @@ -97,7 +97,7 @@ func (u *Union) ConsumeAsPartial(val super.Value) {
if union, ok := super.TypeUnder(typ).(*super.TypeUnion); ok {
typ, b = union.Untag(b)
}
u.update(typ, b)
u.Update(typ, b)
}
}

Expand Down
6 changes: 4 additions & 2 deletions runtime/vam/expr/agg/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func NewPattern(op string, hasarg bool) (Pattern, error) {
pattern = func() Func {
return newMathReducer(mathMax)
}
// case "union":
// panic("TBD")
case "union":
pattern = func() Func {
return newUnion()
}
// case "collect":
// panic("TBD")
// case "and":
Expand Down
74 changes: 74 additions & 0 deletions runtime/vam/expr/agg/union.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package agg

import (
"github.com/brimdata/super"
samagg "github.com/brimdata/super/runtime/sam/expr/agg"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/zcode"
)

type union struct {
samunion *samagg.Union
}

func newUnion() *union {
return &union{samunion: samagg.NewUnion()}
}

func (u *union) Consume(vec vector.Any) {
switch vec := vec.(type) {
case *vector.Const:
val := vec.Value()
if val.IsNull() {
return
}
u.samunion.Update(vec.Type(), val.Bytes())
case *vector.Dict:
u.Consume(vec.Any)
default:
nulls := vector.NullsOf(vec)
typ := vec.Type()
var b zcode.Builder
for i := range vec.Len() {
if nulls.Value(i) {
continue
}
b.Truncate()
vec.Serialize(&b, i)
u.samunion.Update(typ, b.Bytes().Body())
}
}
}

func (u *union) Result(zctx *super.Context) super.Value {
return u.samunion.Result(zctx)
}

func (u *union) ConsumeAsPartial(partial vector.Any) {
if c, ok := partial.(*vector.Const); ok && c.Value().IsNull() {
return
}
set, ok := partial.(*vector.Set)
if !ok {
panic("union: partial not a set type")
}
inner := set.Values
typ := inner.Type()
union, _ := typ.(*super.TypeUnion)
var b zcode.Builder
for i := range set.Len() {
for k := set.Offsets[i]; k < set.Offsets[i+1]; k++ {
b.Truncate()
inner.Serialize(&b, k)
bytes := b.Bytes().Body()
if union != nil {
typ, bytes = union.Untag(bytes)
}
u.samunion.Update(typ, bytes)
}
}
}

func (u *union) ResultAsPartial(zctx *super.Context) super.Value {
return u.samunion.ResultAsPartial(zctx)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: union(this)

vector: true

input: |
1((int64,string))
1((int64,string))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
zed: over this => (union(this))

vector: true

input: |
[
{x:1,s:"a"},
Expand Down

0 comments on commit 1b5f6bf

Please sign in to comment.