Skip to content

Commit

Permalink
Revert "Drop intern package and lift method to parquetquery package"
Browse files Browse the repository at this point in the history
This reverts commit 76876fe.
  • Loading branch information
zalegrala committed Nov 5, 2024
1 parent 76876fe commit ad21cb4
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 12 deletions.
28 changes: 28 additions & 0 deletions pkg/parquetquery/intern/intern.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Package intern is a utility for interning byte slices for pq.Value's.
// It is not safe for concurrent use.
//
// The Interner is used to intern byte slices for pq.Value's. This is useful
// for reducing memory usage and improving performance when working with
// large datasets with many repeated strings.
package intern

import (
"unique"

pq "github.com/parquet-go/parquet-go"
)

type Interner struct{}

func New() *Interner {
return &Interner{}
}

func (i *Interner) UnsafeClone(v *pq.Value) pq.Value {
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
return *unique.Make(v).Value()
default:
return *v
}
}
72 changes: 72 additions & 0 deletions pkg/parquetquery/intern/intern_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package intern

import (
"fmt"
"testing"

pq "github.com/parquet-go/parquet-go"
)

func TestInterner_UnsafeClone(t *testing.T) {
i := New()

value1 := pq.ByteArrayValue([]byte("foo"))
value2 := pq.ByteArrayValue([]byte("foo"))

clone1 := i.UnsafeClone(&value1)
clone2 := i.UnsafeClone(&value2)

if clone1.ByteArray()[0] != clone2.ByteArray()[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}

if value1.ByteArray()[0] != value2.ByteArray()[0] {
// Mutates the original value, so the memory address should be different as well
t.Error("expected same memory address")
}
}

func BenchmarkIntern(b *testing.B) {
words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"}
testCases := []struct {
name string
valueFn func(i int) pq.Value
}{
{
name: "byte_array",
valueFn: func(i int) pq.Value { return pq.ByteArrayValue([]byte(words[i%len(words)])) },
},
{
name: "fixed_len_byte_array",
valueFn: func(i int) pq.Value { return pq.FixedLenByteArrayValue([]byte(words[i%len(words)])) },
},
{
name: "bool",
valueFn: func(i int) pq.Value { return pq.BooleanValue(i%2 == 0) },
},
{
name: "int32",
valueFn: func(i int) pq.Value { return pq.Int32Value(int32(i)) },
},
}

for _, tc := range testCases {
b.Run(fmt.Sprintf("no_interning: %s", tc.name), func(b *testing.B) {
for i := 0; i < b.N; i++ {
value := tc.valueFn(i)
_ = value.Clone()
}
})

b.Run(fmt.Sprintf("interning: %s", tc.name), func(b *testing.B) {
interner := New()

b.ResetTimer()
for i := 0; i < b.N; i++ {
value := tc.valueFn(i)
_ = interner.UnsafeClone(&value)
}
})
}
}
17 changes: 5 additions & 12 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"math"
"sync"
"sync/atomic"
"unique"

"github.com/grafana/tempo/pkg/parquetquery/intern"
"github.com/grafana/tempo/pkg/util"
pq "github.com/parquet-go/parquet-go"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -781,6 +781,7 @@ type SyncIteratorOpt func(*SyncIterator)
func SyncIteratorOptIntern() SyncIteratorOpt {
return func(i *SyncIterator) {
i.intern = true
i.interner = intern.New()
}
}

Expand Down Expand Up @@ -813,7 +814,8 @@ type SyncIterator struct {
currPageN int
at IteratorResult // Current value pointed at by iterator. Returned by call Next and SeekTo, valid until next call.

intern bool
intern bool
interner *intern.Interner
}

var _ Iterator = (*SyncIterator)(nil)
Expand Down Expand Up @@ -1246,7 +1248,7 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
// always have length 0 or 1.
if len(c.at.Entries) == 1 {
if c.intern {
c.at.Entries[0].Value = pqValueIntern(v)
c.at.Entries[0].Value = c.interner.UnsafeClone(v)
} else {
c.at.Entries[0].Value = v.Clone()
}
Expand All @@ -1255,15 +1257,6 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
return &c.at
}

func pqValueIntern(v *pq.Value) pq.Value {
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
return *unique.Make(v).Value()
default:
return *v
}
}

func (c *SyncIterator) Close() {
c.closeCurrRowGroup()

Expand Down

0 comments on commit ad21cb4

Please sign in to comment.