Skip to content

Commit

Permalink
Remove unsafe from pkg/parquetquery/intern
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Nov 1, 2024
1 parent fc55642 commit f936d2d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 99 deletions.
58 changes: 4 additions & 54 deletions pkg/parquetquery/intern/intern.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,22 @@
package intern

import (
"unsafe"
"unique"

pq "github.com/parquet-go/parquet-go"
)

type Interner struct {
m map[string][]byte // TODO(mapno): Use swiss.Map (https://github.com/cockroachdb/swiss)
}
type Interner struct{}

func New() *Interner {
return NewWithSize(0)
}

func NewWithSize(size int) *Interner {
return &Interner{m: make(map[string][]byte, size)}
return &Interner{}
}

func (i *Interner) UnsafeClone(v *pq.Value) pq.Value {
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
// Look away, this is unsafe.
a := *(*pqValue)(unsafe.Pointer(v))
a.ptr = addressOfBytes(i.internBytes(a.byteArray()))
return *(*pq.Value)(unsafe.Pointer(&a))
return *unique.Make(v).Value()
default:
return *v
}
}

func (i *Interner) internBytes(b []byte) []byte {
if x, ok := i.m[bytesToString(b)]; ok {
return x
}

clone := make([]byte, len(b))
copy(clone, b)
i.m[bytesToString(clone)] = clone
return clone
}

func (i *Interner) Close() {
clear(i.m) // clear the map
i.m = nil
}

// bytesToString converts a byte slice to a string.
// String shares the memory with the byte slice.
// The byte slice should not be modified after call.
func bytesToString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b)) }

// addressOfBytes returns the address of the first byte in data.
// The data should not be modified after call.
func addressOfBytes(data []byte) *byte { return unsafe.SliceData(data) }

// bytes converts a pointer to a slice of bytes
func bytes(data *byte, size int) []byte { return unsafe.Slice(data, size) }

// pqValue is a slimmer version of github.com/parquet-go/parquet-go's pq.Value.
type pqValue struct {
// data
ptr *byte
u64 uint64
// type
kind int8 // XOR(Kind) so the zero-value is <null>
}

func (v *pqValue) byteArray() []byte {
return bytes(v.ptr, int(v.u64))
}
42 changes: 0 additions & 42 deletions pkg/parquetquery/intern/intern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,12 @@ package intern
import (
"fmt"
"testing"
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

func TestInterner_internBytes(t *testing.T) {
i := New()
defer i.Close()

words := []string{"hello", "world", "hello", "world", "hello", "world"}
for _, w := range words {
_ = i.internBytes([]byte(w))
}
if len(i.m) != 2 {
// Values are interned, so there should be only 2 unique words
t.Errorf("expected 2, got %d", len(i.m))
}
interned1, interned2 := i.internBytes([]byte("hello")), i.internBytes([]byte("hello"))
if interned1[0] != interned2[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}
}

func TestInterner_UnsafeClone(t *testing.T) {
i := New()
defer i.Close()

value1 := pq.ByteArrayValue([]byte("foo"))
value2 := pq.ByteArrayValue([]byte("foo"))
Expand All @@ -48,26 +27,6 @@ func TestInterner_UnsafeClone(t *testing.T) {
}
}

func Test_pqValue(t *testing.T) {
// Test that conversion from pq.Value to pqValue and back to pq.Value
// does not change the value.
value := pq.ByteArrayValue([]byte("foo"))
pqValue := *(*pqValue)(unsafe.Pointer(&value))
back := *(*pq.Value)(unsafe.Pointer(&pqValue))

if value.Kind() != back.Kind() {
t.Error("expected same kind")
}

if string(value.ByteArray()) != string(back.ByteArray()) {
t.Error("expected same value")
}

if value.String() != back.String() {
t.Error("expected same value")
}
}

func BenchmarkIntern(b *testing.B) {
words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"}
testCases := []struct {
Expand Down Expand Up @@ -102,7 +61,6 @@ func BenchmarkIntern(b *testing.B) {

b.Run(fmt.Sprintf("interning: %s", tc.name), func(b *testing.B) {
interner := New()
defer interner.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
13 changes: 10 additions & 3 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"math"
"sync"
"sync/atomic"
"unique"

"github.com/grafana/tempo/pkg/parquetquery/intern"
"github.com/grafana/tempo/pkg/util"
pq "github.com/parquet-go/parquet-go"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -781,6 +781,7 @@ type SyncIteratorOpt func(*SyncIterator)
func SyncIteratorOptIntern() SyncIteratorOpt {
return func(i *SyncIterator) {
i.intern = true
i.interner = intern.New()
}
}

Expand Down Expand Up @@ -813,7 +814,8 @@ type SyncIterator struct {
currPageN int
at IteratorResult // Current value pointed at by iterator. Returned by call Next and SeekTo, valid until next call.

intern bool
intern bool
interner *intern.Interner
}

var _ Iterator = (*SyncIterator)(nil)
Expand Down Expand Up @@ -1246,7 +1248,7 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
// always have length 0 or 1.
if len(c.at.Entries) == 1 {
if c.intern {
c.at.Entries[0].Value = *unique.Make(v).Value()
c.at.Entries[0].Value = c.interner.UnsafeClone(v)
} else {
c.at.Entries[0].Value = v.Clone()
}
Expand All @@ -1257,7 +1259,12 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {

func (c *SyncIterator) Close() {
c.closeCurrRowGroup()

c.span.End()

if c.intern && c.interner != nil {
// c.interner.Close()
}
}

// ColumnIterator asynchronously iterates through the given row groups and column. Applies
Expand Down
3 changes: 3 additions & 0 deletions pkg/parquetquery/iters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ var iterTestCases = []struct {
{"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator {
return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs)
}},
{"internSync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator {
return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs, SyncIteratorOptIntern())
}},
}

// TestNext compares the unrolled Next() with the original nextSlow() to
Expand Down

0 comments on commit f936d2d

Please sign in to comment.