Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace unsafe interning with unique package #4264

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 22 additions & 51 deletions pkg/parquetquery/intern/intern.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,49 @@
// Package intern is a utility for interning byte slices for pq.Value's.
// It is not safe for concurrent use.
//
// The Interner is used to intern byte slices for pq.Value's. This is useful
// for reducing memory usage and improving performance when working with
// large datasets with many repeated strings.
package intern

import (
"unsafe"
"sync"
"unique"

pq "github.com/parquet-go/parquet-go"
)

type Interner struct {
m map[string][]byte // TODO(mapno): Use swiss.Map (https://github.com/cockroachdb/swiss)
h map[unique.Handle[*pq.Value]]pq.Value
mtx sync.Mutex
}

func New() *Interner {
return NewWithSize(0)
}

func NewWithSize(size int) *Interner {
return &Interner{m: make(map[string][]byte, size)}
return &Interner{
mtx: sync.Mutex{},
h: make(map[unique.Handle[*pq.Value]]pq.Value),
}
}

func (i *Interner) UnsafeClone(v *pq.Value) pq.Value {
// Clone returns a unique shalow copy of the input pq.Value or derefernces the
// received pointer.
func (i *Interner) Clone(v *pq.Value) pq.Value {
i.mtx.Lock()
defer i.mtx.Unlock()
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
// Look away, this is unsafe.
a := *(*pqValue)(unsafe.Pointer(v))
a.ptr = addressOfBytes(i.internBytes(a.byteArray()))
return *(*pq.Value)(unsafe.Pointer(&a))
if vv, ok := i.h[unique.Make(v)]; ok {
return vv
}
vv := unique.Make(v)
i.h[vv] = *vv.Value()
return i.h[vv]
default:
return *v
}
}

func (i *Interner) internBytes(b []byte) []byte {
if x, ok := i.m[bytesToString(b)]; ok {
return x
}

clone := make([]byte, len(b))
copy(clone, b)
i.m[bytesToString(clone)] = clone
return clone
}

func (i *Interner) Close() {
clear(i.m) // clear the map
i.m = nil
}

// bytesToString converts a byte slice to a string.
// String shares the memory with the byte slice.
// The byte slice should not be modified after call.
func bytesToString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b)) }

// addressOfBytes returns the address of the first byte in data.
// The data should not be modified after call.
func addressOfBytes(data []byte) *byte { return unsafe.SliceData(data) }

// bytes converts a pointer to a slice of bytes
func bytes(data *byte, size int) []byte { return unsafe.Slice(data, size) }

// pqValue is a slimmer version of github.com/parquet-go/parquet-go's pq.Value.
type pqValue struct {
// data
ptr *byte
u64 uint64
// type
kind int8 // XOR(Kind) so the zero-value is <null>
}

func (v *pqValue) byteArray() []byte {
return bytes(v.ptr, int(v.u64))
i.mtx.Lock()
defer i.mtx.Unlock()
clear(i.h)
}
48 changes: 4 additions & 44 deletions pkg/parquetquery/intern/intern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,19 @@ package intern
import (
"fmt"
"testing"
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

func TestInterner_internBytes(t *testing.T) {
i := New()
defer i.Close()

words := []string{"hello", "world", "hello", "world", "hello", "world"}
for _, w := range words {
_ = i.internBytes([]byte(w))
}
if len(i.m) != 2 {
// Values are interned, so there should be only 2 unique words
t.Errorf("expected 2, got %d", len(i.m))
}
interned1, interned2 := i.internBytes([]byte("hello")), i.internBytes([]byte("hello"))
if interned1[0] != interned2[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}
}

func TestInterner_UnsafeClone(t *testing.T) {
func TestInterner_Clone(t *testing.T) {
i := New()
defer i.Close()

value1 := pq.ByteArrayValue([]byte("foo"))
value2 := pq.ByteArrayValue([]byte("foo"))

clone1 := i.UnsafeClone(&value1)
clone2 := i.UnsafeClone(&value2)
clone1 := i.Clone(&value1)
clone2 := i.Clone(&value2)

if clone1.ByteArray()[0] != clone2.ByteArray()[0] {
// Values are interned, so the memory address should be the same
Expand All @@ -48,26 +28,6 @@ func TestInterner_UnsafeClone(t *testing.T) {
}
}

func Test_pqValue(t *testing.T) {
// Test that conversion from pq.Value to pqValue and back to pq.Value
// does not change the value.
value := pq.ByteArrayValue([]byte("foo"))
pqValue := *(*pqValue)(unsafe.Pointer(&value))
back := *(*pq.Value)(unsafe.Pointer(&pqValue))

if value.Kind() != back.Kind() {
t.Error("expected same kind")
}

if string(value.ByteArray()) != string(back.ByteArray()) {
t.Error("expected same value")
}

if value.String() != back.String() {
t.Error("expected same value")
}
}

func BenchmarkIntern(b *testing.B) {
words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"}
testCases := []struct {
Expand Down Expand Up @@ -107,7 +67,7 @@ func BenchmarkIntern(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
value := tc.valueFn(i)
_ = interner.UnsafeClone(&value)
_ = interner.Clone(&value)
}
})
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
// always have length 0 or 1.
if len(c.at.Entries) == 1 {
if c.intern {
c.at.Entries[0].Value = c.interner.UnsafeClone(v)
c.at.Entries[0].Value = c.interner.Clone(v)
} else {
c.at.Entries[0].Value = v.Clone()
}
Expand All @@ -1261,8 +1261,7 @@ func (c *SyncIterator) Close() {
c.closeCurrRowGroup()

c.span.End()

if c.intern && c.interner != nil {
if c.intern {
c.interner.Close()
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/parquetquery/iters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ var iterTestCases = []struct {
{"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator {
return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs)
}},
{"internSync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator {
return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs, SyncIteratorOptIntern())
}},
}

// TestNext compares the unrolled Next() with the original nextSlow() to
Expand Down
Loading