From fa98adeb232f25d8cd0f620eb247b452d33ca0df Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 17 Jul 2023 20:28:55 +0800 Subject: [PATCH] Fixes --- pkg/phlaredb/compact.go | 408 ++++++++++++++++++++++++++-------------- 1 file changed, 262 insertions(+), 146 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 077d1a184..8d2513005 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -5,6 +5,7 @@ import ( "math" "os" "path/filepath" + "sort" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -27,21 +28,13 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - Symbols() SymbolsResolver + SymbolsReader } // TODO(kolesnikovae): Refactor to symdb. -// ProfileSymbols represents symbolic information associated with a profile. -type ProfileSymbols struct { - StacktracePartition uint64 - StacktraceIDs []uint32 - - Stacktraces []*schemav1.Stacktrace - Locations []*schemav1.InMemoryLocation - Mappings []*schemav1.InMemoryMapping - Functions []*schemav1.InMemoryFunction - Strings []string +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) } type SymbolsResolver interface { @@ -50,6 +43,19 @@ type SymbolsResolver interface { Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] Strings(iter.Iterator[uint32]) iter.Iterator[string] + WriteStats(*SymbolStats) +} + +type SymbolStats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int +} + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) } type SymbolsAppender interface { @@ -58,6 +64,7 @@ type SymbolsAppender interface { AppendMapping(*schemav1.InMemoryMapping) uint32 AppendFunction(*schemav1.InMemoryFunction) uint32 AppendString(string) uint32 + Flush() error } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { @@ -405,22 +412,21 @@ func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) } type symbolsRewriter struct { - profiles iter.Iterator[profileRow] - stacktraces, dst []uint32 - err error - - rewriters map[BlockReader]*stacktraceRewriter + profiles iter.Iterator[profileRow] + rewriters map[BlockReader]*stacktraceRewriter + stacktraces []uint32 + err error numSamples uint64 } -func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, a SymbolsAppender) *symbolsRewriter { +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w SymbolsWriter) *symbolsRewriter { sr := symbolsRewriter{ profiles: it, rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), } - for _, b := range blocks { - sr.rewriters[b] = newStacktraceRewriter() + for _, r := range blocks { + sr.rewriters[r] = newStacktraceRewriter(r, w) } return &sr } @@ -452,7 +458,7 @@ func (s *symbolsRewriter) Next() bool { } s.numSamples += uint64(len(values)) for i, v := range values { - values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) } }) if err != nil { @@ -465,114 +471,71 @@ func (s *symbolsRewriter) Next() bool { func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { if cap(s.stacktraces) < len(values) { s.stacktraces = make([]uint32, len(values)*2) - s.dst = make([]uint32, len(values)*2) } s.stacktraces = s.stacktraces[:len(values)] - s.dst = s.dst[:len(values)] for i := range values { s.stacktraces[i] = values[i].Uint32() } } type stacktraceRewriter struct { - partition uint64 - stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + reader SymbolsReader + writer SymbolsWriter + // Stack trace identifiers are only valid within the partition. + stacktraces map[uint64]*lookupTable[*schemav1.Stacktrace] + // Objects below have global addressing. locations *lookupTable[*schemav1.InMemoryLocation] mappings *lookupTable[*schemav1.InMemoryMapping] functions *lookupTable[*schemav1.InMemoryFunction] strings *lookupTable[string] -} - -func newStacktraceRewriter() *stacktraceRewriter { - // TODO(kolesnikovae): - return new(stacktraceRewriter) -} - -const ( - marker = 1 << 31 - markedMask = math.MaxUint32 >> 1 -) -type lookupTable[T any] struct { - // Index is source ID, and the value is the destination ID. - // If destination ID is not known, the element is index to 'unresolved' (marked). - resolved []uint32 - // Source IDs. - unresolved []uint32 - values []T + partition uint64 + resolver SymbolsResolver + appender SymbolsAppender + stats SymbolStats } -func newLookupTable[T any](size int) *lookupTable[T] { - var t lookupTable[T] - t.init(size) - return &t +func newStacktraceRewriter(r SymbolsReader, w SymbolsWriter) *stacktraceRewriter { + return &stacktraceRewriter{ + reader: r, + writer: w, + } } -func (t *lookupTable[T]) init(size int) { - if cap(t.resolved) < size { - t.resolved = make([]uint32, size) - return +func (r *stacktraceRewriter) init(partition uint64) (err error) { + r.partition = partition + if r.appender, err = r.writer.SymbolsAppender(partition); err != nil { + return err } - t.resolved = t.resolved[:size] - for i := range t.resolved { - t.resolved[i] = 0 + if r.resolver, err = r.reader.SymbolsResolver(partition); err != nil { + return err } -} - -func (t *lookupTable[T]) reset() { t.unresolved = t.unresolved[:0] } + r.resolver.WriteStats(&r.stats) -func (t *lookupTable[T]) tryLookup(x uint32) uint32 { - if v := t.resolved[x]; v != 0 { - return v - 1 + // Only stacktraces are yet partitioned. + if r.stacktraces == nil { + r.stacktraces = make(map[uint64]*lookupTable[*schemav1.Stacktrace]) } - v := uint32(len(t.unresolved)) | marker - t.unresolved = append(t.unresolved, x) - return v -} - -func (t *lookupTable[T]) storeResolved(i, v uint32) { t.resolved[i] = v + 1 } - -func (t *lookupTable[T]) lookupUnresolved(x uint32) uint32 { - if x&marker == 0 { - // Already resolved. - return x + p, ok := r.stacktraces[partition] + if !ok { + p = newLookupTable[*schemav1.Stacktrace](r.stats.StacktracesTotal) + r.stacktraces[partition] = p } - return t.unresolved[x&markedMask] -} + p.reset() -func (t *lookupTable[T]) iter() *lookupTableIterator[T] { - t.values = make([]T, len(t.resolved)) - return &lookupTableIterator[T]{ - values: t.values, + if r.locations == nil { + r.locations = newLookupTable[*schemav1.InMemoryLocation](r.stats.LocationsTotal) + r.mappings = newLookupTable[*schemav1.InMemoryMapping](r.stats.MappingsTotal) + r.functions = newLookupTable[*schemav1.InMemoryFunction](r.stats.FunctionsTotal) + r.strings = newLookupTable[string](r.stats.StringsTotal) + return nil } -} - -// TODO(kolesnikovae): -type lookupTableIterator[T any] struct { - cur uint32 - values []T -} - -func (t *lookupTableIterator[T]) set(v T) { t.values[t.cur] = v } - -func (r *stacktraceRewriter) symbolsResolver() SymbolsResolver { - // TODO(kolesnikovae): - return nil -} - -func (r *stacktraceRewriter) symbolsAppender() SymbolsAppender { - // TODO(kolesnikovae): - return nil -} - -func (r *stacktraceRewriter) reset(partition uint64) { - r.partition = partition - r.stacktraces[partition].reset() r.locations.reset() r.mappings.reset() r.functions.reset() r.strings.reset() + return nil } func (r *stacktraceRewriter) hasUnresolved() bool { @@ -583,112 +546,265 @@ func (r *stacktraceRewriter) hasUnresolved() bool { len(r.strings.unresolved) > 0 } -func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) error { - r.reset(partition) - r.populateUnresolved(stacktraces) +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) (err error) { + if err = r.init(partition); err != nil { + return err + } + if err = r.populateUnresolved(stacktraces); err != nil { + return err + } if r.hasUnresolved() { - r.append(stacktraces) + if err = r.appendRewrite(stacktraces); err != nil { + return err + } } return nil } -func (r *stacktraceRewriter) populateUnresolved(stacktraces []uint32) { +func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { // Filter out all stack traces that have been already resolved. src := r.stacktraces[r.partition] - for i, v := range stacktraces { - stacktraces[i] = src.tryLookup(v) + for i, v := range stacktraceIDs { + stacktraceIDs[i] = src.tryLookup(v) } if len(src.unresolved) == 0 { - return + return nil } // Resolve locations for new stack traces. var stacktrace *schemav1.Stacktrace unresolvedStacktraces := src.iter() - p := r.symbolsResolver() - for i := p.Stacktraces(unresolvedStacktraces); i.Next(); stacktrace = i.At() { - for i, loc := range stacktrace.LocationIDs { - stacktrace.LocationIDs[i] = uint64(r.locations.tryLookup(uint32(loc))) + stacktraces := r.resolver.Stacktraces(unresolvedStacktraces) + for ; stacktraces.Err() == nil && stacktraces.Next(); stacktrace = stacktraces.At() { + for j, loc := range stacktrace.LocationIDs { + stacktrace.LocationIDs[j] = uint64(r.locations.tryLookup(uint32(loc))) } - unresolvedStacktraces.set(stacktrace) + // TODO(kolesnikovae): Copy. + unresolvedStacktraces.setValue(stacktrace) + } + if err := stacktraces.Err(); err != nil { + return err } // Resolve functions and mappings for new locations. var location *schemav1.InMemoryLocation unresolvedLocs := r.locations.iter() - for i := p.Locations(unresolvedLocs); i.Next(); location = i.At() { + locations := r.resolver.Locations(unresolvedLocs) + for ; locations.Err() == nil && locations.Next(); location = locations.At() { location.MappingId = r.mappings.tryLookup(location.MappingId) for j, line := range location.Line { location.Line[j].FunctionId = r.functions.tryLookup(line.FunctionId) } - unresolvedLocs.set(location) + unresolvedLocs.setValue(location) + } + if err := locations.Err(); err != nil { + return err } // Resolve strings. var mapping *schemav1.InMemoryMapping unresolvedMappings := r.mappings.iter() - for i := p.Mappings(unresolvedMappings); i.Next(); mapping = i.At() { + mappings := r.resolver.Mappings(unresolvedMappings) + for ; mappings.Err() == nil && mappings.Next(); mapping = mappings.At() { mapping.BuildId = r.strings.tryLookup(mapping.BuildId) mapping.Filename = r.strings.tryLookup(mapping.Filename) - unresolvedMappings.set(mapping) + unresolvedMappings.setValue(mapping) + } + if err := mappings.Err(); err != nil { + return err } + var function *schemav1.InMemoryFunction unresolvedFunctions := r.functions.iter() - for i := p.Functions(unresolvedFunctions); i.Next(); function = i.At() { + functions := r.resolver.Functions(unresolvedFunctions) + for ; functions.Err() == nil && functions.Next(); function = functions.At() { function.Name = r.strings.tryLookup(function.Name) function.Filename = r.strings.tryLookup(function.Filename) function.SystemName = r.strings.tryLookup(function.SystemName) - unresolvedFunctions.set(function) + unresolvedFunctions.setValue(function) + } + if err := functions.Err(); err != nil { + return err } + var str string unresolvedStrings := r.strings.iter() - for i := p.Strings(unresolvedStrings); i.Next(); str = i.At() { - unresolvedStrings.set(str) + strings := r.resolver.Strings(unresolvedStrings) + for ; strings.Err() == nil && strings.Next(); str = strings.At() { + unresolvedStrings.setValue(str) } + return strings.Err() } -func (r *stacktraceRewriter) append(stacktraces []uint32) { - a := r.symbolsAppender() - for _, str := range r.strings.values { - r.functions.storeResolved(0, a.AppendString(str)) +func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { + for _, v := range r.strings.unresolved { + r.strings.storeResolved(v.uid, r.appender.AppendString(v.val)) } - for _, function := range r.functions.values { - function.Name = r.strings.lookupUnresolved(function.Name) - function.Filename = r.strings.lookupUnresolved(function.Filename) - function.SystemName = r.strings.lookupUnresolved(function.SystemName) - r.functions.storeResolved(0, a.AppendFunction(function)) + for _, v := range r.functions.unresolved { + function := v.val + function.Name = r.strings.lookupResolved(function.Name) + function.Filename = r.strings.lookupResolved(function.Filename) + function.SystemName = r.strings.lookupResolved(function.SystemName) + r.functions.storeResolved(v.uid, r.appender.AppendFunction(function)) } - for _, mapping := range r.mappings.values { - mapping.BuildId = r.strings.lookupUnresolved(mapping.BuildId) - mapping.Filename = r.strings.lookupUnresolved(mapping.Filename) - r.mappings.storeResolved(0, a.AppendMapping(mapping)) + for _, v := range r.mappings.unresolved { + mapping := v.val + mapping.BuildId = r.strings.lookupResolved(mapping.BuildId) + mapping.Filename = r.strings.lookupResolved(mapping.Filename) + r.mappings.storeResolved(v.uid, r.appender.AppendMapping(mapping)) } - for _, location := range r.locations.values { - location.MappingId = r.mappings.lookupUnresolved(location.MappingId) + for _, v := range r.locations.unresolved { + location := v.val + location.MappingId = r.mappings.lookupResolved(location.MappingId) for j, line := range location.Line { - location.Line[j].FunctionId = r.functions.lookupUnresolved(line.FunctionId) + location.Line[j].FunctionId = r.functions.lookupResolved(line.FunctionId) } - r.locations.storeResolved(0, a.AppendLocation(location)) + r.locations.storeResolved(v.uid, r.appender.AppendLocation(location)) } src := r.stacktraces[r.partition] - for _, stacktrace := range src.values { - for j, v := range stacktrace.LocationIDs { - stacktrace.LocationIDs[j] = uint64(r.locations.lookupUnresolved(uint32(v))) + for _, v := range src.unresolved { + stacktrace := v.val + for j, lid := range stacktrace.LocationIDs { + stacktrace.LocationIDs[j] = uint64(r.locations.lookupResolved(uint32(lid))) } - src.storeResolved(0, a.AppendStacktrace(stacktrace)) + src.storeResolved(v.uid, r.appender.AppendStacktrace(stacktrace)) } for i, v := range stacktraces { - stacktraces[i] = src.lookupUnresolved(v) + stacktraces[i] = src.lookupResolved(v) + } + + return r.appender.Flush() +} + +const ( + marker = 1 << 31 + markedMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + unresolved []lookupTableValue[T] + refs []lookupTableRef +} + +type lookupTableValue[T any] struct { + rid uint32 // Index to resolved. + uid uint32 // Original index (unresolved). + val T +} + +type lookupTableRef struct{ rid, uid uint32 } + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 } } -type symbolsWriter struct { - // TODO(kolesnikovae): - SymbolsAppender +func (t *lookupTable[T]) reset() { + t.unresolved = t.unresolved[:0] + t.refs = t.refs[:0] } -func newSymbolsWriter(dst string) (*symbolsWriter, error) { return &symbolsWriter{}, nil } +// tryLookup looks up the value at x in resolved. +// If x is has not been resolved yet, the x is memorized +// for future resolve, and returned values is the marked +// index to unresolved. +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + if v&marker > 0 { + return v // Already marked for resolve. + } + return v - 1 // Already resolved. + } + u := uint32(len(t.unresolved)) + t.unresolved = append(t.unresolved, lookupTableValue[T]{ + rid: x, + uid: u, + }) + u |= marker + t.resolved[x] = u + return u +} + +func (t *lookupTable[T]) storeResolved(uid, v uint32) { + t.resolved[t.unresolved[uid].rid] = v + 1 +} + +func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { + if x&marker > 0 { + return t.resolved[t.unresolved[x&markedMask].rid] - 1 + } + return x // Already resolved. +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + if cap(t.refs) < len(t.unresolved) { + t.refs = make([]lookupTableRef, len(t.unresolved)) + } else { + t.refs = t.refs[:len(t.unresolved)] + } + for i, v := range t.unresolved { + t.refs[i] = lookupTableRef{ + rid: v.rid, + uid: v.uid, + } + } + sort.Slice(t.refs, func(i, j int) bool { + return t.refs[i].rid < t.refs[j].rid + }) + return &lookupTableIterator[T]{table: t} +} + +type lookupTableIterator[T any] struct { + table *lookupTable[T] + cur uint32 +} + +func (t *lookupTableIterator[T]) Next() bool { + return t.cur < uint32(len(t.table.refs)) +} + +func (t *lookupTableIterator[T]) At() uint32 { + x := t.table.refs[t.cur].rid + t.cur++ + return x +} + +func (t *lookupTableIterator[T]) setValue(v T) { + uid := t.table.refs[t.cur].uid + t.table.unresolved[uid].val = v +} + +func (t *lookupTableIterator[T]) Close() error { return nil } + +func (t *lookupTableIterator[T]) Err() error { return nil } + +// TODO(kolesnikovae): + +type symbolsWriter struct{} + +func newSymbolsWriter(dst string) (*symbolsWriter, error) { + return &symbolsWriter{}, nil +} + +func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) { + return nil, nil +}