From ad21cb4b2c0f56121b5589eae2f93271ba058a39 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Tue, 5 Nov 2024 21:20:12 +0000 Subject: [PATCH] Revert "Drop intern package and lift method to parquetquery package" This reverts commit 76876fe57f46b72b9c7495ba394586b4c4d6935e. --- pkg/parquetquery/intern/intern.go | 28 ++++++++++ pkg/parquetquery/intern/intern_test.go | 72 ++++++++++++++++++++++++++ pkg/parquetquery/iters.go | 17 ++---- 3 files changed, 105 insertions(+), 12 deletions(-) create mode 100644 pkg/parquetquery/intern/intern.go create mode 100644 pkg/parquetquery/intern/intern_test.go diff --git a/pkg/parquetquery/intern/intern.go b/pkg/parquetquery/intern/intern.go new file mode 100644 index 00000000000..caf61ac823b --- /dev/null +++ b/pkg/parquetquery/intern/intern.go @@ -0,0 +1,28 @@ +// Package intern is a utility for interning byte slices for pq.Value's. +// It is not safe for concurrent use. +// +// The Interner is used to intern byte slices for pq.Value's. This is useful +// for reducing memory usage and improving performance when working with +// large datasets with many repeated strings. +package intern + +import ( + "unique" + + pq "github.com/parquet-go/parquet-go" +) + +type Interner struct{} + +func New() *Interner { + return &Interner{} +} + +func (i *Interner) UnsafeClone(v *pq.Value) pq.Value { + switch v.Kind() { + case pq.ByteArray, pq.FixedLenByteArray: + return *unique.Make(v).Value() + default: + return *v + } +} diff --git a/pkg/parquetquery/intern/intern_test.go b/pkg/parquetquery/intern/intern_test.go new file mode 100644 index 00000000000..349b85ca974 --- /dev/null +++ b/pkg/parquetquery/intern/intern_test.go @@ -0,0 +1,72 @@ +package intern + +import ( + "fmt" + "testing" + + pq "github.com/parquet-go/parquet-go" +) + +func TestInterner_UnsafeClone(t *testing.T) { + i := New() + + value1 := pq.ByteArrayValue([]byte("foo")) + value2 := pq.ByteArrayValue([]byte("foo")) + + clone1 := i.UnsafeClone(&value1) + clone2 := i.UnsafeClone(&value2) + + if clone1.ByteArray()[0] != clone2.ByteArray()[0] { + // Values are interned, so the memory address should be the same + t.Error("expected same memory address") + } + + if value1.ByteArray()[0] != value2.ByteArray()[0] { + // Mutates the original value, so the memory address should be different as well + t.Error("expected same memory address") + } +} + +func BenchmarkIntern(b *testing.B) { + words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} + testCases := []struct { + name string + valueFn func(i int) pq.Value + }{ + { + name: "byte_array", + valueFn: func(i int) pq.Value { return pq.ByteArrayValue([]byte(words[i%len(words)])) }, + }, + { + name: "fixed_len_byte_array", + valueFn: func(i int) pq.Value { return pq.FixedLenByteArrayValue([]byte(words[i%len(words)])) }, + }, + { + name: "bool", + valueFn: func(i int) pq.Value { return pq.BooleanValue(i%2 == 0) }, + }, + { + name: "int32", + valueFn: func(i int) pq.Value { return pq.Int32Value(int32(i)) }, + }, + } + + for _, tc := range testCases { + b.Run(fmt.Sprintf("no_interning: %s", tc.name), func(b *testing.B) { + for i := 0; i < b.N; i++ { + value := tc.valueFn(i) + _ = value.Clone() + } + }) + + b.Run(fmt.Sprintf("interning: %s", tc.name), func(b *testing.B) { + interner := New() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + value := tc.valueFn(i) + _ = interner.UnsafeClone(&value) + } + }) + } +} diff --git a/pkg/parquetquery/iters.go b/pkg/parquetquery/iters.go index d8c5735f762..4b77bd4776f 100644 --- a/pkg/parquetquery/iters.go +++ b/pkg/parquetquery/iters.go @@ -9,8 +9,8 @@ import ( "math" "sync" "sync/atomic" - "unique" + "github.com/grafana/tempo/pkg/parquetquery/intern" "github.com/grafana/tempo/pkg/util" pq "github.com/parquet-go/parquet-go" "go.opentelemetry.io/otel" @@ -781,6 +781,7 @@ type SyncIteratorOpt func(*SyncIterator) func SyncIteratorOptIntern() SyncIteratorOpt { return func(i *SyncIterator) { i.intern = true + i.interner = intern.New() } } @@ -813,7 +814,8 @@ type SyncIterator struct { currPageN int at IteratorResult // Current value pointed at by iterator. Returned by call Next and SeekTo, valid until next call. - intern bool + intern bool + interner *intern.Interner } var _ Iterator = (*SyncIterator)(nil) @@ -1246,7 +1248,7 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult { // always have length 0 or 1. if len(c.at.Entries) == 1 { if c.intern { - c.at.Entries[0].Value = pqValueIntern(v) + c.at.Entries[0].Value = c.interner.UnsafeClone(v) } else { c.at.Entries[0].Value = v.Clone() } @@ -1255,15 +1257,6 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult { return &c.at } -func pqValueIntern(v *pq.Value) pq.Value { - switch v.Kind() { - case pq.ByteArray, pq.FixedLenByteArray: - return *unique.Make(v).Value() - default: - return *v - } -} - func (c *SyncIterator) Close() { c.closeCurrRowGroup()