Skip to content

Commit

Permalink
vam: Summarize use vector.Builder to materialize (#5484)
Browse files Browse the repository at this point in the history
The commit changes slow path aggregations to use vector.Builder when
materializing the aggregation table. The previous approaching of
creating a dynamic of consts was causing the system to run out of memory
when querying larger datasets.
  • Loading branch information
mattnibs authored Nov 20, 2024
1 parent 222ded6 commit 87ab6b7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 30 deletions.
12 changes: 7 additions & 5 deletions runtime/vam/expr/agg/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ func (a *avg) ConsumeAsPartial(partial vector.Any) {
if !ok1 || !ok2 {
panic("avg: invalid partial")
}
sumVal, ok1 := rec.Fields[si].(*vector.Const)
countVal, ok2 := rec.Fields[ci].(*vector.Const)
if !ok1 || !ok2 || sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 {
sumVal := rec.Fields[si]
countVal := rec.Fields[ci]
if sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 {
panic("avg: invalid partial")
}
a.sum += sumVal.Value().Float()
a.count += countVal.Value().Uint()
sum, _ := vector.FloatValue(sumVal, 0)
count, _ := vector.UintValue(countVal, 0)
a.sum += sum
a.count += count
}

func (a *avg) ResultAsPartial(zctx *super.Context) super.Value {
Expand Down
6 changes: 3 additions & 3 deletions runtime/vam/expr/agg/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func (a *count) Result(*super.Context) super.Value {
}

func (a *count) ConsumeAsPartial(partial vector.Any) {
c, ok := partial.(*vector.Const)
if !ok || c.Len() != 1 || partial.Type() != super.TypeUint64 {
if partial.Len() != 1 || partial.Type() != super.TypeUint64 {
panic("count: bad partial")
}
a.count += c.Value().Uint()
count, _ := vector.UintValue(partial, 0)
a.count += count
}

func (a *count) ResultAsPartial(*super.Context) super.Value {
Expand Down
53 changes: 32 additions & 21 deletions runtime/vam/op/summarize/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type superTable struct {
builder *vector.RecordBuilder
partialsIn bool
partialsOut bool
table map[string]aggRow
table map[string]int
rows []aggRow
zctx *super.Context
}

Expand Down Expand Up @@ -52,11 +53,13 @@ func (s *superTable) update(keys []vector.Any, args []vector.Any) {
m[string(keyBytes)] = append(m[string(keyBytes)], slot)
}
for rowKey, index := range m {
row, ok := s.table[rowKey]
id, ok := s.table[rowKey]
if !ok {
row = s.newRow(keys, index)
s.table[rowKey] = row
id = len(s.rows)
s.table[rowKey] = id
s.rows = append(s.rows, s.newRow(keys, index))
}
row := s.rows[id]
for i, arg := range args {
if len(m) > 1 {
arg = vector.NewView(arg, index)
Expand Down Expand Up @@ -85,32 +88,40 @@ func (s *superTable) newRow(keys []vector.Any, index []uint32) aggRow {
}

func (s *superTable) materialize() vector.Any {
if len(s.rows) == 0 {
return vector.NewConst(super.Null, 0, nil)
}
var vecs []vector.Any
var tags []uint32
// XXX This should reasonably concat all materialize rows together instead
// of this crazy Dynamic hack.
for _, row := range s.table {
tags = append(tags, uint32(len(tags)))
vecs = append(vecs, s.materializeRow(row))
for i := range s.rows[0].keys {
vecs = append(vecs, s.materializeKey(i))
}
for i := range s.rows[0].funcs {
vecs = append(vecs, s.materializeAgg(i))
}
return vector.NewDynamic(tags, vecs)
// Since aggs can return dynamic values need to do apply to create record.
return vector.Apply(false, func(vecs ...vector.Any) vector.Any {
return s.builder.New(vecs)
}, vecs...)
}

func (s *superTable) materializeRow(row aggRow) vector.Any {
var vecs []vector.Any
for _, key := range row.keys {
vecs = append(vecs, vector.NewConst(key, 1, nil))
func (s *superTable) materializeKey(i int) vector.Any {
b := vector.NewBuilder(s.rows[0].keys[i].Type())
for _, row := range s.rows {
b.Write(row.keys[i].Bytes())
}
for _, fn := range row.funcs {
var val super.Value
return b.Build()
}

func (s *superTable) materializeAgg(i int) vector.Any {
b := vector.NewDynamicBuilder()
for _, row := range s.rows {
if s.partialsOut {
val = fn.ResultAsPartial(s.zctx)
b.Write(row.funcs[i].ResultAsPartial(s.zctx))
} else {
val = fn.Result(s.zctx)
b.Write(row.funcs[i].Result(s.zctx))
}
vecs = append(vecs, vector.NewConst(val, 1, nil))
}
return s.builder.New(vecs)
return b.Build()
}

type countByString struct {
Expand Down
2 changes: 1 addition & 1 deletion runtime/vam/op/summarize/summarize.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *Summarize) newAggTable(keyTypes []super.Type) aggTable {
builder: s.builder,
partialsIn: s.partialsIn,
partialsOut: s.partialsOut,
table: make(map[string]aggRow),
table: make(map[string]int),
zctx: s.zctx,
}
}
Expand Down
35 changes: 35 additions & 0 deletions vector/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,41 @@ type Builder interface {
Build() Any
}

type DynamicBuilder struct {
tags []uint32
values []Builder
which map[super.Type]int
}

func NewDynamicBuilder() *DynamicBuilder {
return &DynamicBuilder{
which: make(map[super.Type]int),
}
}

func (d *DynamicBuilder) Write(val super.Value) {
typ := val.Type()
tag, ok := d.which[typ]
if !ok {
tag = len(d.values)
d.values = append(d.values, NewBuilder(typ))
d.which[typ] = tag
}
d.tags = append(d.tags, uint32(tag))
d.values[tag].Write(val.Bytes())
}

func (d *DynamicBuilder) Build() Any {
var vecs []Any
for _, b := range d.values {
vecs = append(vecs, b.Build())
}
if len(vecs) == 1 {
return vecs[0]
}
return NewDynamic(d.tags, vecs)
}

func NewBuilder(typ super.Type) Builder {
var b Builder
switch typ := typ.(type) {
Expand Down

0 comments on commit 87ab6b7

Please sign in to comment.