From 1b5f6bff1b2404f5a919dae5ac2bc01179cbd746 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Fri, 17 Jan 2025 14:42:15 -0800 Subject: [PATCH] vam: Add union aggregation --- runtime/sam/expr/agg/agg.go | 2 +- runtime/sam/expr/agg/union.go | 8 +- runtime/vam/expr/agg/agg.go | 6 +- runtime/vam/expr/agg/union.go | 74 +++++++++++++++++++ .../summarize}/union-single-union-type.yaml | 2 + .../ztests => ztests/op/summarize}/union.yaml | 2 + 6 files changed, 87 insertions(+), 7 deletions(-) create mode 100644 runtime/vam/expr/agg/union.go rename runtime/{sam/expr/agg/ztests => ztests/op/summarize}/union-single-union-type.yaml (88%) rename runtime/{sam/expr/agg/ztests => ztests/op/summarize}/union.yaml (94%) diff --git a/runtime/sam/expr/agg/agg.go b/runtime/sam/expr/agg/agg.go index 701d06cdd5..aca88ee8b0 100644 --- a/runtime/sam/expr/agg/agg.go +++ b/runtime/sam/expr/agg/agg.go @@ -67,7 +67,7 @@ func NewPattern(op string, hasarg bool) (Pattern, error) { } case "union": pattern = func() Function { - return newUnion() + return NewUnion() } case "collect": pattern = func() Function { diff --git a/runtime/sam/expr/agg/union.go b/runtime/sam/expr/agg/union.go index 203ae95365..6fe6d680dd 100644 --- a/runtime/sam/expr/agg/union.go +++ b/runtime/sam/expr/agg/union.go @@ -12,7 +12,7 @@ type Union struct { var _ Function = (*Union)(nil) -func newUnion() *Union { +func NewUnion() *Union { return &Union{ types: make(map[super.Type]map[string]struct{}), } @@ -22,10 +22,10 @@ func (u *Union) Consume(val super.Value) { if val.IsNull() { return } - u.update(val.Type(), val.Bytes()) + u.Update(val.Type(), val.Bytes()) } -func (u *Union) update(typ super.Type, b zcode.Bytes) { +func (u *Union) Update(typ super.Type, b zcode.Bytes) { m, ok := u.types[typ] if !ok { m = make(map[string]struct{}) @@ -97,7 +97,7 @@ func (u *Union) ConsumeAsPartial(val super.Value) { if union, ok := super.TypeUnder(typ).(*super.TypeUnion); ok { typ, b = union.Untag(b) } - u.update(typ, b) + u.Update(typ, b) } } diff --git a/runtime/vam/expr/agg/agg.go b/runtime/vam/expr/agg/agg.go index d82dd4a237..6b46850f49 100644 --- a/runtime/vam/expr/agg/agg.go +++ b/runtime/vam/expr/agg/agg.go @@ -53,8 +53,10 @@ func NewPattern(op string, hasarg bool) (Pattern, error) { pattern = func() Func { return newMathReducer(mathMax) } - // case "union": - // panic("TBD") + case "union": + pattern = func() Func { + return newUnion() + } // case "collect": // panic("TBD") // case "and": diff --git a/runtime/vam/expr/agg/union.go b/runtime/vam/expr/agg/union.go new file mode 100644 index 0000000000..e29b890d9f --- /dev/null +++ b/runtime/vam/expr/agg/union.go @@ -0,0 +1,74 @@ +package agg + +import ( + "github.com/brimdata/super" + samagg "github.com/brimdata/super/runtime/sam/expr/agg" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/zcode" +) + +type union struct { + samunion *samagg.Union +} + +func newUnion() *union { + return &union{samunion: samagg.NewUnion()} +} + +func (u *union) Consume(vec vector.Any) { + switch vec := vec.(type) { + case *vector.Const: + val := vec.Value() + if val.IsNull() { + return + } + u.samunion.Update(vec.Type(), val.Bytes()) + case *vector.Dict: + u.Consume(vec.Any) + default: + nulls := vector.NullsOf(vec) + typ := vec.Type() + var b zcode.Builder + for i := range vec.Len() { + if nulls.Value(i) { + continue + } + b.Truncate() + vec.Serialize(&b, i) + u.samunion.Update(typ, b.Bytes().Body()) + } + } +} + +func (u *union) Result(zctx *super.Context) super.Value { + return u.samunion.Result(zctx) +} + +func (u *union) ConsumeAsPartial(partial vector.Any) { + if c, ok := partial.(*vector.Const); ok && c.Value().IsNull() { + return + } + set, ok := partial.(*vector.Set) + if !ok { + panic("union: partial not a set type") + } + inner := set.Values + typ := inner.Type() + union, _ := typ.(*super.TypeUnion) + var b zcode.Builder + for i := range set.Len() { + for k := set.Offsets[i]; k < set.Offsets[i+1]; k++ { + b.Truncate() + inner.Serialize(&b, k) + bytes := b.Bytes().Body() + if union != nil { + typ, bytes = union.Untag(bytes) + } + u.samunion.Update(typ, bytes) + } + } +} + +func (u *union) ResultAsPartial(zctx *super.Context) super.Value { + return u.samunion.ResultAsPartial(zctx) +} diff --git a/runtime/sam/expr/agg/ztests/union-single-union-type.yaml b/runtime/ztests/op/summarize/union-single-union-type.yaml similarity index 88% rename from runtime/sam/expr/agg/ztests/union-single-union-type.yaml rename to runtime/ztests/op/summarize/union-single-union-type.yaml index 33de1a9b52..f87b4eb2e6 100644 --- a/runtime/sam/expr/agg/ztests/union-single-union-type.yaml +++ b/runtime/ztests/op/summarize/union-single-union-type.yaml @@ -1,5 +1,7 @@ zed: union(this) +vector: true + input: | 1((int64,string)) 1((int64,string)) diff --git a/runtime/sam/expr/agg/ztests/union.yaml b/runtime/ztests/op/summarize/union.yaml similarity index 94% rename from runtime/sam/expr/agg/ztests/union.yaml rename to runtime/ztests/op/summarize/union.yaml index acce8748d4..df5b6a04c3 100644 --- a/runtime/sam/expr/agg/ztests/union.yaml +++ b/runtime/ztests/op/summarize/union.yaml @@ -1,5 +1,7 @@ zed: over this => (union(this)) +vector: true + input: | [ {x:1,s:"a"},