Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vam: Summarize use vector.Builder to materialize #5484

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions runtime/vam/expr/agg/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ func (a *avg) ConsumeAsPartial(partial vector.Any) {
if !ok1 || !ok2 {
panic("avg: invalid partial")
}
sumVal, ok1 := rec.Fields[si].(*vector.Const)
countVal, ok2 := rec.Fields[ci].(*vector.Const)
if !ok1 || !ok2 || sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 {
sumVal := rec.Fields[si]
countVal := rec.Fields[ci]
if sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 {
panic("avg: invalid partial")
}
a.sum += sumVal.Value().Float()
a.count += countVal.Value().Uint()
sum, _ := vector.FloatValue(sumVal, 0)
count, _ := vector.UintValue(countVal, 0)
a.sum += sum
a.count += count
}

func (a *avg) ResultAsPartial(zctx *super.Context) super.Value {
Expand Down
6 changes: 3 additions & 3 deletions runtime/vam/expr/agg/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ func (a *count) Result(*super.Context) super.Value {
}

func (a *count) ConsumeAsPartial(partial vector.Any) {
c, ok := partial.(*vector.Const)
if !ok || c.Len() != 1 || partial.Type() != super.TypeUint64 {
if partial.Len() != 1 || partial.Type() != super.TypeUint64 {
panic("count: bad partial")
}
a.count += c.Value().Uint()
count, _ := vector.UintValue(partial, 0)
a.count += count
}

func (a *count) ResultAsPartial(*super.Context) super.Value {
Expand Down
53 changes: 32 additions & 21 deletions runtime/vam/op/summarize/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type superTable struct {
builder *vector.RecordBuilder
partialsIn bool
partialsOut bool
table map[string]aggRow
table map[string]int
rows []aggRow
zctx *super.Context
}

Expand Down Expand Up @@ -52,11 +53,13 @@ func (s *superTable) update(keys []vector.Any, args []vector.Any) {
m[string(keyBytes)] = append(m[string(keyBytes)], slot)
}
for rowKey, index := range m {
row, ok := s.table[rowKey]
id, ok := s.table[rowKey]
if !ok {
row = s.newRow(keys, index)
s.table[rowKey] = row
id = len(s.rows)
s.table[rowKey] = id
s.rows = append(s.rows, s.newRow(keys, index))
}
row := s.rows[id]
for i, arg := range args {
if len(m) > 1 {
arg = vector.NewView(arg, index)
Expand Down Expand Up @@ -85,32 +88,40 @@ func (s *superTable) newRow(keys []vector.Any, index []uint32) aggRow {
}

func (s *superTable) materialize() vector.Any {
if len(s.rows) == 0 {
return vector.NewConst(super.Null, 0, nil)
}
var vecs []vector.Any
var tags []uint32
// XXX This should reasonably concat all materialize rows together instead
// of this crazy Dynamic hack.
for _, row := range s.table {
tags = append(tags, uint32(len(tags)))
vecs = append(vecs, s.materializeRow(row))
for i := range s.rows[0].keys {
vecs = append(vecs, s.materializeKey(i))
}
for i := range s.rows[0].funcs {
vecs = append(vecs, s.materializeAgg(i))
}
return vector.NewDynamic(tags, vecs)
// Since aggs can return dynamic values need to do apply to create record.
return vector.Apply(false, func(vecs ...vector.Any) vector.Any {
return s.builder.New(vecs)
}, vecs...)
}

func (s *superTable) materializeRow(row aggRow) vector.Any {
var vecs []vector.Any
for _, key := range row.keys {
vecs = append(vecs, vector.NewConst(key, 1, nil))
func (s *superTable) materializeKey(i int) vector.Any {
b := vector.NewBuilder(s.rows[0].keys[i].Type())
for _, row := range s.rows {
b.Write(row.keys[i].Bytes())
}
for _, fn := range row.funcs {
var val super.Value
return b.Build()
}

func (s *superTable) materializeAgg(i int) vector.Any {
b := vector.NewDynamicBuilder()
for _, row := range s.rows {
if s.partialsOut {
val = fn.ResultAsPartial(s.zctx)
b.Write(row.funcs[i].ResultAsPartial(s.zctx))
} else {
val = fn.Result(s.zctx)
b.Write(row.funcs[i].Result(s.zctx))
}
vecs = append(vecs, vector.NewConst(val, 1, nil))
}
return s.builder.New(vecs)
return b.Build()
}

type countByString struct {
Expand Down
2 changes: 1 addition & 1 deletion runtime/vam/op/summarize/summarize.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *Summarize) newAggTable(keyTypes []super.Type) aggTable {
builder: s.builder,
partialsIn: s.partialsIn,
partialsOut: s.partialsOut,
table: make(map[string]aggRow),
table: make(map[string]int),
zctx: s.zctx,
}
}
Expand Down
35 changes: 35 additions & 0 deletions vector/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,41 @@ type Builder interface {
Build() Any
}

type DynamicBuilder struct {
tags []uint32
values []Builder
which map[super.Type]int
}

func NewDynamicBuilder() *DynamicBuilder {
return &DynamicBuilder{
which: make(map[super.Type]int),
}
}

func (d *DynamicBuilder) Write(val super.Value) {
typ := val.Type()
tag, ok := d.which[typ]
if !ok {
tag = len(d.values)
d.values = append(d.values, NewBuilder(typ))
d.which[typ] = tag
}
d.tags = append(d.tags, uint32(tag))
d.values[tag].Write(val.Bytes())
}

func (d *DynamicBuilder) Build() Any {
var vecs []Any
for _, b := range d.values {
vecs = append(vecs, b.Build())
}
if len(vecs) == 1 {
return vecs[0]
}
return NewDynamic(d.tags, vecs)
}

func NewBuilder(typ super.Type) Builder {
var b Builder
switch typ := typ.(type) {
Expand Down