diff --git a/pkg/parquetquery/intern/intern.go b/pkg/parquetquery/intern/intern.go index 2f2c19834e1..69d7cd153b0 100644 --- a/pkg/parquetquery/intern/intern.go +++ b/pkg/parquetquery/intern/intern.go @@ -1,5 +1,4 @@ // Package intern is a utility for interning byte slices for pq.Value's. -// It is not safe for concurrent use. // // The Interner is used to intern byte slices for pq.Value's. This is useful // for reducing memory usage and improving performance when working with @@ -7,72 +6,44 @@ package intern import ( - "unsafe" + "sync" + "unique" pq "github.com/parquet-go/parquet-go" ) type Interner struct { - m map[string][]byte // TODO(mapno): Use swiss.Map (https://github.com/cockroachdb/swiss) + h map[unique.Handle[*pq.Value]]pq.Value + mtx sync.Mutex } func New() *Interner { - return NewWithSize(0) -} - -func NewWithSize(size int) *Interner { - return &Interner{m: make(map[string][]byte, size)} + return &Interner{ + mtx: sync.Mutex{}, + h: make(map[unique.Handle[*pq.Value]]pq.Value), + } } -func (i *Interner) UnsafeClone(v *pq.Value) pq.Value { +// Clone returns a unique shalow copy of the input pq.Value or derefernces the +// received pointer. +func (i *Interner) Clone(v *pq.Value) pq.Value { + i.mtx.Lock() + defer i.mtx.Unlock() switch v.Kind() { case pq.ByteArray, pq.FixedLenByteArray: - // Look away, this is unsafe. - a := *(*pqValue)(unsafe.Pointer(v)) - a.ptr = addressOfBytes(i.internBytes(a.byteArray())) - return *(*pq.Value)(unsafe.Pointer(&a)) + if vv, ok := i.h[unique.Make(v)]; ok { + return vv + } + vv := unique.Make(v) + i.h[vv] = *vv.Value() + return i.h[vv] default: return *v } } -func (i *Interner) internBytes(b []byte) []byte { - if x, ok := i.m[bytesToString(b)]; ok { - return x - } - - clone := make([]byte, len(b)) - copy(clone, b) - i.m[bytesToString(clone)] = clone - return clone -} - func (i *Interner) Close() { - clear(i.m) // clear the map - i.m = nil -} - -// bytesToString converts a byte slice to a string. -// String shares the memory with the byte slice. -// The byte slice should not be modified after call. -func bytesToString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b)) } - -// addressOfBytes returns the address of the first byte in data. -// The data should not be modified after call. -func addressOfBytes(data []byte) *byte { return unsafe.SliceData(data) } - -// bytes converts a pointer to a slice of bytes -func bytes(data *byte, size int) []byte { return unsafe.Slice(data, size) } - -// pqValue is a slimmer version of github.com/parquet-go/parquet-go's pq.Value. -type pqValue struct { - // data - ptr *byte - u64 uint64 - // type - kind int8 // XOR(Kind) so the zero-value is -} - -func (v *pqValue) byteArray() []byte { - return bytes(v.ptr, int(v.u64)) + i.mtx.Lock() + defer i.mtx.Unlock() + clear(i.h) } diff --git a/pkg/parquetquery/intern/intern_test.go b/pkg/parquetquery/intern/intern_test.go index f4296c710a6..11030b4703a 100644 --- a/pkg/parquetquery/intern/intern_test.go +++ b/pkg/parquetquery/intern/intern_test.go @@ -3,39 +3,19 @@ package intern import ( "fmt" "testing" - "unsafe" pq "github.com/parquet-go/parquet-go" ) -func TestInterner_internBytes(t *testing.T) { - i := New() - defer i.Close() - - words := []string{"hello", "world", "hello", "world", "hello", "world"} - for _, w := range words { - _ = i.internBytes([]byte(w)) - } - if len(i.m) != 2 { - // Values are interned, so there should be only 2 unique words - t.Errorf("expected 2, got %d", len(i.m)) - } - interned1, interned2 := i.internBytes([]byte("hello")), i.internBytes([]byte("hello")) - if interned1[0] != interned2[0] { - // Values are interned, so the memory address should be the same - t.Error("expected same memory address") - } -} - -func TestInterner_UnsafeClone(t *testing.T) { +func TestInterner_Clone(t *testing.T) { i := New() defer i.Close() value1 := pq.ByteArrayValue([]byte("foo")) value2 := pq.ByteArrayValue([]byte("foo")) - clone1 := i.UnsafeClone(&value1) - clone2 := i.UnsafeClone(&value2) + clone1 := i.Clone(&value1) + clone2 := i.Clone(&value2) if clone1.ByteArray()[0] != clone2.ByteArray()[0] { // Values are interned, so the memory address should be the same @@ -48,26 +28,6 @@ func TestInterner_UnsafeClone(t *testing.T) { } } -func Test_pqValue(t *testing.T) { - // Test that conversion from pq.Value to pqValue and back to pq.Value - // does not change the value. - value := pq.ByteArrayValue([]byte("foo")) - pqValue := *(*pqValue)(unsafe.Pointer(&value)) - back := *(*pq.Value)(unsafe.Pointer(&pqValue)) - - if value.Kind() != back.Kind() { - t.Error("expected same kind") - } - - if string(value.ByteArray()) != string(back.ByteArray()) { - t.Error("expected same value") - } - - if value.String() != back.String() { - t.Error("expected same value") - } -} - func BenchmarkIntern(b *testing.B) { words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} testCases := []struct { @@ -107,7 +67,7 @@ func BenchmarkIntern(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { value := tc.valueFn(i) - _ = interner.UnsafeClone(&value) + _ = interner.Clone(&value) } }) } diff --git a/pkg/parquetquery/iters.go b/pkg/parquetquery/iters.go index 7d8aea60bc4..10563e6eaed 100644 --- a/pkg/parquetquery/iters.go +++ b/pkg/parquetquery/iters.go @@ -1248,7 +1248,7 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult { // always have length 0 or 1. if len(c.at.Entries) == 1 { if c.intern { - c.at.Entries[0].Value = c.interner.UnsafeClone(v) + c.at.Entries[0].Value = c.interner.Clone(v) } else { c.at.Entries[0].Value = v.Clone() } @@ -1261,8 +1261,7 @@ func (c *SyncIterator) Close() { c.closeCurrRowGroup() c.span.End() - - if c.intern && c.interner != nil { + if c.intern { c.interner.Close() } } diff --git a/pkg/parquetquery/iters_test.go b/pkg/parquetquery/iters_test.go index cec4e081a01..aeb588b7ba7 100644 --- a/pkg/parquetquery/iters_test.go +++ b/pkg/parquetquery/iters_test.go @@ -24,6 +24,9 @@ var iterTestCases = []struct { {"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator { return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs) }}, + {"internSync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator { + return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs, SyncIteratorOptIntern()) + }}, } // TestNext compares the unrolled Next() with the original nextSlow() to