diff --git a/pkg/parquetquery/intern/intern.go b/pkg/parquetquery/intern/intern.go index d146fffd59d..69d7cd153b0 100644 --- a/pkg/parquetquery/intern/intern.go +++ b/pkg/parquetquery/intern/intern.go @@ -6,24 +6,44 @@ package intern import ( + "sync" "unique" pq "github.com/parquet-go/parquet-go" ) -type Interner struct{} +type Interner struct { + h map[unique.Handle[*pq.Value]]pq.Value + mtx sync.Mutex +} func New() *Interner { - return &Interner{} + return &Interner{ + mtx: sync.Mutex{}, + h: make(map[unique.Handle[*pq.Value]]pq.Value), + } } // Clone returns a unique shalow copy of the input pq.Value or derefernces the // received pointer. func (i *Interner) Clone(v *pq.Value) pq.Value { + i.mtx.Lock() + defer i.mtx.Unlock() switch v.Kind() { case pq.ByteArray, pq.FixedLenByteArray: - return *unique.Make(v).Value() + if vv, ok := i.h[unique.Make(v)]; ok { + return vv + } + vv := unique.Make(v) + i.h[vv] = *vv.Value() + return i.h[vv] default: return *v } } + +func (i *Interner) Close() { + i.mtx.Lock() + defer i.mtx.Unlock() + clear(i.h) +} diff --git a/pkg/parquetquery/iters.go b/pkg/parquetquery/iters.go index bef5f7d0202..10563e6eaed 100644 --- a/pkg/parquetquery/iters.go +++ b/pkg/parquetquery/iters.go @@ -1261,6 +1261,9 @@ func (c *SyncIterator) Close() { c.closeCurrRowGroup() c.span.End() + if c.intern { + c.interner.Close() + } } // ColumnIterator asynchronously iterates through the given row groups and column. Applies