From 5a00746083150a7eeb27cef1d4ccdab562884ecc Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 12 Apr 2023 17:37:44 -0400 Subject: [PATCH] *: Implement iterators and seqnum substitution for foreign SSTs When sstables written to by other stores in shared storage are read, we need to collapse range keys/range dels/point keys within each SST and then return keys at fixed sequence numbers reserved for that level. This change implements that path for all sstables that are in shared storage according to objstorage.Provider but have a creator ID that doesn't match ours. --- compaction.go | 2 +- flushable.go | 2 +- internal/base/seqnums.go | 15 + internal/keyspan/iter.go | 5 +- internal/keyspan/level_iter.go | 2 +- internal/keyspan/level_iter_test.go | 4 +- internal/keyspan/merging_iter.go | 43 --- internal/keyspan/transformer.go | 149 +++++++++ internal/rangekey/coalesce.go | 70 +++++ level_checker.go | 2 +- level_iter.go | 4 +- objstorage/objstorage.go | 5 + objstorage/objstorageprovider/shared.go | 8 + options.go | 13 + range_keys.go | 5 +- scan_internal.go | 394 ++++++++++++++++++++---- scan_internal_test.go | 60 ++++ sstable/block.go | 28 ++ sstable/options.go | 6 + sstable/reader.go | 35 ++- table_cache.go | 92 +++++- table_cache_test.go | 5 +- testdata/event_listener | 4 +- testdata/ingest | 2 +- testdata/metrics | 4 +- testdata/point_collapsing_iter | 173 +++++++++++ 26 files changed, 995 insertions(+), 137 deletions(-) create mode 100644 internal/keyspan/transformer.go create mode 100644 testdata/point_collapsing_iter diff --git a/compaction.go b/compaction.go index 77c95e1ea4..1c17d2423d 100644 --- a/compaction.go +++ b/compaction.go @@ -1474,7 +1474,7 @@ func (c *compaction) newInputIter( } if hasRangeKeys { li := &keyspan.LevelIter{} - newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { + newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { iter, err := newRangeKeyIter(file, iterOptions) if iter != nil { // Ensure that the range key iter is not closed until the compaction is diff --git a/flushable.go b/flushable.go index 0a5b2ae2bf..43bab4e3a1 100644 --- a/flushable.go +++ b/flushable.go @@ -184,7 +184,7 @@ func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) i } func (s *ingestedFlushable) constructRangeDelIter( - file *manifest.FileMetadata, _ *keyspan.SpanIterOptions, + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { // Note that the keyspan level iter expects a non-nil iterator to be // returned even if there is an error. So, we return the emptyKeyspanIter. diff --git a/internal/base/seqnums.go b/internal/base/seqnums.go index f563c1fbb9..aa38226a17 100644 --- a/internal/base/seqnums.go +++ b/internal/base/seqnums.go @@ -4,6 +4,8 @@ package base +import "fmt" + // This file defines sequence numbers that are reserved for foreign keys i.e. // internal keys coming from other Pebble instances and existing in shared // storage, as those will "slot below" any internal keys added by our own Pebble @@ -52,3 +54,16 @@ const ( // ourselves. SeqNumStart = uint64(10) ) + +// PointSeqNumForLevel returns the appropriate reserved sequence number for +// point keys in foreign sstables at the specified level. +func PointSeqNumForLevel(level int) uint64 { + switch level { + case 5: + return SeqNumL5Point + case 6: + return SeqNumL6Point + default: + panic(fmt.Sprintf("unexpected foreign sstable at level %d", level)) + } +} diff --git a/internal/keyspan/iter.go b/internal/keyspan/iter.go index 4798561ac6..1b08126b2a 100644 --- a/internal/keyspan/iter.go +++ b/internal/keyspan/iter.go @@ -62,7 +62,7 @@ type FragmentIterator interface { // TableNewSpanIter creates a new iterator for range key spans for the given // file. -type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error) +type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error) // SpanIterOptions is a subset of IterOptions that are necessary to instantiate // per-sstable span iterators. @@ -70,6 +70,9 @@ type SpanIterOptions struct { // RangeKeyFilters can be used to avoid scanning tables and blocks in tables // when iterating over range keys. RangeKeyFilters []base.BlockPropertyFilter + // Level specifies the level where this sstable is being read. Must be + // specified for foreign (i.e. shared not-created-by-this-instance) sstables. + Level manifest.Level } // Iter is an iterator over a set of fragmented spans. diff --git a/internal/keyspan/level_iter.go b/internal/keyspan/level_iter.go index 047c6a24ab..718e58fffa 100644 --- a/internal/keyspan/level_iter.go +++ b/internal/keyspan/level_iter.go @@ -179,7 +179,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur return noFileLoaded } if indicator != fileAlreadyLoaded { - l.iter, l.err = l.newIter(file, &l.tableOpts) + l.iter, l.err = l.newIter(file, l.tableOpts) indicator = newFileLoaded } if l.err != nil { diff --git a/internal/keyspan/level_iter_test.go b/internal/keyspan/level_iter_test.go index fe3cd33729..d9897dc28b 100644 --- a/internal/keyspan/level_iter_test.go +++ b/internal/keyspan/level_iter_test.go @@ -299,7 +299,7 @@ func TestLevelIterEquivalence(t *testing.T) { metas = append(metas, meta) } - tableNewIters := func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error) { + tableNewIters := func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error) { return NewIter(base.DefaultComparer.Compare, tc.levels[j][file.FileNum-1]), nil } // Add all the fileMetadatas to L6. @@ -433,7 +433,7 @@ func TestLevelIter(t *testing.T) { } if iter == nil { var lastFileNum base.FileNum - tableNewIters := func(file *manifest.FileMetadata, _ *SpanIterOptions) (FragmentIterator, error) { + tableNewIters := func(file *manifest.FileMetadata, _ SpanIterOptions) (FragmentIterator, error) { keyType := keyType spans := level[file.FileNum-1] if keyType == manifest.KeyTypePoint { diff --git a/internal/keyspan/merging_iter.go b/internal/keyspan/merging_iter.go index 247602423e..c73ba59b32 100644 --- a/internal/keyspan/merging_iter.go +++ b/internal/keyspan/merging_iter.go @@ -25,49 +25,6 @@ import ( // MergingIter implementation, but will require a bit of plumbing to thread the // Equal function. -// Transformer defines a transformation to be applied to a Span. -type Transformer interface { - // Transform takes a Span as input and writes the transformed Span to the - // provided output *Span pointer. The output Span's Keys slice may be reused - // by Transform to reduce allocations. - Transform(cmp base.Compare, in Span, out *Span) error -} - -// The TransformerFunc type is an adapter to allow the use of ordinary functions -// as Transformers. If f is a function with the appropriate signature, -// TransformerFunc(f) is a Transformer that calls f. -type TransformerFunc func(base.Compare, Span, *Span) error - -// Transform calls f(cmp, in, out). -func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error { - return tf(cmp, in, out) -} - -var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { - dst.Start, dst.End = s.Start, s.End - dst.Keys = append(dst.Keys[:0], s.Keys...) - return nil -}) - -// VisibleTransform filters keys that are invisible at the provided snapshot -// sequence number. -func VisibleTransform(snapshot uint64) Transformer { - return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { - dst.Start, dst.End = s.Start, s.End - dst.Keys = dst.Keys[:0] - for _, k := range s.Keys { - // NB: The InternalKeySeqNumMax value is used for the batch snapshot - // because a batch's visible span keys are filtered when they're - // fragmented. There's no requirement to enforce visibility at - // iteration time. - if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { - dst.Keys = append(dst.Keys, k) - } - } - return nil - }) -} - // MergingIter merges spans across levels of the LSM, exposing an iterator over // spans that yields sets of spans fragmented at unique user key boundaries. // diff --git a/internal/keyspan/transformer.go b/internal/keyspan/transformer.go new file mode 100644 index 0000000000..e0152cf4d6 --- /dev/null +++ b/internal/keyspan/transformer.go @@ -0,0 +1,149 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package keyspan + +import "github.com/cockroachdb/pebble/internal/base" + +// Transformer defines a transformation to be applied to a Span. +type Transformer interface { + // Transform takes a Span as input and writes the transformed Span to the + // provided output *Span pointer. The output Span's Keys slice may be reused + // by Transform to reduce allocations. + Transform(cmp base.Compare, in Span, out *Span) error +} + +// The TransformerFunc type is an adapter to allow the use of ordinary functions +// as Transformers. If f is a function with the appropriate signature, +// TransformerFunc(f) is a Transformer that calls f. +type TransformerFunc func(base.Compare, Span, *Span) error + +// Transform calls f(cmp, in, out). +func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error { + return tf(cmp, in, out) +} + +var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { + dst.Start, dst.End = s.Start, s.End + dst.Keys = append(dst.Keys[:0], s.Keys...) + return nil +}) + +// VisibleTransform filters keys that are invisible at the provided snapshot +// sequence number. +func VisibleTransform(snapshot uint64) Transformer { + return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { + dst.Start, dst.End = s.Start, s.End + dst.Keys = dst.Keys[:0] + for _, k := range s.Keys { + // NB: The InternalKeySeqNumMax value is used for the batch snapshot + // because a batch's visible span keys are filtered when they're + // fragmented. There's no requirement to enforce visibility at + // iteration time. + if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { + dst.Keys = append(dst.Keys, k) + } + } + return nil + }) +} + +// TransformerIter is a FragmentIterator that applies a Transformer on all +// returned keys. Used for when a caller needs to apply a transformer on an +// iterator but does not otherwise need the mergingiter's merging ability. +type TransformerIter struct { + FragmentIterator + + // Transformer is applied on every Span returned by this iterator. + Transformer Transformer + // Comparer in use for this keyspace. + Compare base.Compare + + span Span + err error +} + +func (t *TransformerIter) applyTransform(span *Span) *Span { + t.span = Span{ + Start: t.span.Start[:0], + End: t.span.End[:0], + Keys: t.span.Keys[:0], + } + if err := t.Transformer.Transform(t.Compare, *span, &t.span); err != nil { + t.err = err + return nil + } + return &t.span +} + +// SeekGE implements the FragmentIterator interface. +func (t *TransformerIter) SeekGE(key []byte) *Span { + span := t.FragmentIterator.SeekGE(key) + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// SeekLT implements the FragmentIterator interface. +func (t *TransformerIter) SeekLT(key []byte) *Span { + span := t.FragmentIterator.SeekLT(key) + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// First implements the FragmentIterator interface. +func (t *TransformerIter) First() *Span { + span := t.FragmentIterator.First() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Last implements the FragmentIterator interface. +func (t *TransformerIter) Last() *Span { + span := t.FragmentIterator.Last() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Next implements the FragmentIterator interface. +func (t *TransformerIter) Next() *Span { + span := t.FragmentIterator.Next() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Prev implements the FragmentIterator interface. +func (t *TransformerIter) Prev() *Span { + span := t.FragmentIterator.Prev() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Error implements the FragmentIterator interface. +func (t *TransformerIter) Error() error { + if t.err != nil { + return t.err + } + return t.FragmentIterator.Error() +} + +// Close implements the FragmentIterator interface. +func (t *TransformerIter) Close() error { + err := t.FragmentIterator.Close() + if err != nil { + return err + } + return t.err +} diff --git a/internal/rangekey/coalesce.go b/internal/rangekey/coalesce.go index a25b64dfc4..18500e7a03 100644 --- a/internal/rangekey/coalesce.go +++ b/internal/rangekey/coalesce.go @@ -367,3 +367,73 @@ func coalesce( } return nil } + +// ForeignSSTTransformer implements a keyspan.Transformer for range keys in +// foreign sstables (i.e. shared sstables not created by us). It is largely +// similar to the Transform function implemented in UserIteratorConfig in that +// it calls coalesce to remove range keys shadowed by other range keys, but also +// retains the range key that does the shadowing. In addition, it outputs range +// keys with sequence numbers that match reserved sequence numbers for that +// level (i.e. SeqNumL5RangeKeySet for L5 sets, while L6 unsets/dels are elided). +type ForeignSSTTransformer struct { + Comparer *base.Comparer + Level int + sortBuf keyspan.KeysBySuffix +} + +// Transform implements the Transformer interface. +func (f *ForeignSSTTransformer) Transform( + cmp base.Compare, s keyspan.Span, dst *keyspan.Span, +) error { + // Apply shadowing of keys. + dst.Start = s.Start + dst.End = s.End + f.sortBuf = keyspan.KeysBySuffix{ + Cmp: cmp, + Keys: f.sortBuf.Keys[:0], + } + if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil { + return err + } + keys := f.sortBuf.Keys + dst.Keys = dst.Keys[:0] + for i := range keys { + var seqNum uint64 + switch keys[i].Kind() { + case base.InternalKeyKindRangeKeySet: + if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { + panic("pebble: keys unexpectedly not in ascending suffix order") + } + switch f.Level { + case 5: + seqNum = base.SeqNumL5RangeKeySet + case 6: + seqNum = base.SeqNumL6RangeKey + } + case base.InternalKeyKindRangeKeyUnset: + if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { + panic("pebble: keys unexpectedly not in ascending suffix order") + } + fallthrough + case base.InternalKeyKindRangeKeyDelete: + switch f.Level { + case 5: + seqNum = base.SeqNumL5RangeKeyUnsetDel + case 6: + // Skip this key, as foreign sstable in L6 do not need to emit range key + // unsets/dels as they do not apply to any other sstables. + continue + } + default: + return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind()) + } + dst.Keys = append(dst.Keys, keyspan.Key{ + Trailer: base.MakeTrailer(seqNum, keys[i].Kind()), + Suffix: keys[i].Suffix, + Value: keys[i].Value, + }) + } + // coalesce results in dst.Keys being sorted by Suffix. + dst.KeysOrder = keyspan.BySuffixAsc + return nil +} diff --git a/level_checker.go b/level_checker.go index 96321380fa..4cec440c30 100644 --- a/level_checker.go +++ b/level_checker.go @@ -379,7 +379,7 @@ func checkRangeTombstones(c *checkConfig) error { atomicUnit, _ := expandToAtomicUnit(c.cmp, lf.Slice(), true /* disableIsCompacting */) lower, upper := manifest.KeyRange(c.cmp, atomicUnit.Iter()) iterToClose, iter, err := c.newIters( - context.Background(), lf.FileMetadata, nil, internalIterOpts{}) + context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{}) if err != nil { return err } diff --git a/level_iter.go b/level_iter.go index feb809d064..10ee903338 100644 --- a/level_iter.go +++ b/level_iter.go @@ -36,9 +36,9 @@ type tableNewIters func( // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter // for the rangedel iterator returned by tableNewIters. func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter { - return func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { + return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { iter, rangeDelIter, err := newIters( - ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters}, internalIterOpts{}) + ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters, level: iterOptions.Level}, internalIterOpts{}) if iter != nil { _ = iter.Close() } diff --git a/objstorage/objstorage.go b/objstorage/objstorage.go index 268eb80d11..ec96e814a6 100644 --- a/objstorage/objstorage.go +++ b/objstorage/objstorage.go @@ -216,6 +216,11 @@ type Provider interface { // Cannot be called if shared storage is not configured for the provider. SetCreatorID(creatorID CreatorID) error + // IsForeign returns whether this object is owned by a different node. Return + // value undefined if creator ID is not set yet, or if this object does not + // exist in this provider. + IsForeign(meta ObjectMetadata) bool + // SharedObjectBacking encodes the shared object metadata. SharedObjectBacking(meta *ObjectMetadata) (SharedObjectBackingHandle, error) diff --git a/objstorage/objstorageprovider/shared.go b/objstorage/objstorageprovider/shared.go index 2adc5dcefd..cb8cd66105 100644 --- a/objstorage/objstorageprovider/shared.go +++ b/objstorage/objstorageprovider/shared.go @@ -115,6 +115,14 @@ func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error { return nil } +// IsForeign is part of the objstorage.Provider interface. +func (p *provider) IsForeign(meta objstorage.ObjectMetadata) bool { + if !p.shared.initialized.Load() { + return false + } + return meta.IsShared() && p.shared.creatorID != meta.Shared.CreatorID +} + func (p *provider) sharedCheckInitialized() error { if p.sharedStorage() == nil { return errors.Errorf("shared object support not configured") diff --git a/options.go b/options.go index 6f7bd9ebb3..c415f77fa3 100644 --- a/options.go +++ b/options.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/humanize" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/shared" "github.com/cockroachdb/pebble/sstable" @@ -222,6 +223,17 @@ func (o *IterOptions) getLogger() Logger { return o.logger } +// SpanIterOptions creates a SpanIterOptions from this IterOptions. +func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOptions { + if o == nil { + return keyspan.SpanIterOptions{Level: level} + } + return keyspan.SpanIterOptions{ + RangeKeyFilters: o.RangeKeyFilters, + Level: level, + } +} + // scanInternalOptions is similar to IterOptions, meant for use with // scanInternalIterator. type scanInternalOptions struct { @@ -1605,6 +1617,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions { readerOpts.Comparer = o.Comparer readerOpts.Filters = o.Filters if o.Merger != nil { + readerOpts.Merge = o.Merger.Merge readerOpts.MergerName = o.Merger.Name } readerOpts.LoggerAndTracer = o.LoggerAndTracer diff --git a/range_keys.go b/range_keys.go index 30549d5ca3..6cd3e5875e 100644 --- a/range_keys.go +++ b/range_keys.go @@ -63,8 +63,7 @@ func (i *Iterator) constructRangeKeyIter() { // around Key Trailer order. iter := current.RangeKeyLevels[0].Iter() for f := iter.Last(); f != nil; f = iter.Prev() { - spanIterOpts := &keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} - spanIter, err := i.newIterRangeKey(f, spanIterOpts) + spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) if err != nil { i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) continue @@ -78,7 +77,7 @@ func (i *Iterator) constructRangeKeyIter() { continue } li := i.rangeKey.iterConfig.NewLevelIter() - spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} + spanIterOpts := i.opts.SpanIterOptions(manifest.Level(level)) li.Init(spanIterOpts, i.cmp, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange) i.rangeKey.iterConfig.AddLevel(li) diff --git a/scan_internal.go b/scan_internal.go index 76d20d29ed..beee42004f 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -7,12 +7,15 @@ package pebble import ( "context" "fmt" + "sync" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable" ) const ( @@ -92,8 +95,41 @@ type pcIterPos int const ( pcIterPosCur pcIterPos = iota pcIterPosNext + pcIterPosPrev ) +// pointCollapsingSSTIterator implements sstable.Iterator while composing +// pointCollapsingIterator. +type pointCollapsingSSTIterator struct { + pointCollapsingIterator + childIter sstable.Iterator +} + +var pcSSTIterPool = sync.Pool{ + New: func() interface{} { + return &pointCollapsingSSTIterator{} + }, +} + +// MaybeFilteredKeys implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) MaybeFilteredKeys() bool { + return p.childIter.MaybeFilteredKeys() +} + +// SetCloseHook implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) SetCloseHook(fn func(i sstable.Iterator) error) { + p.childIter.SetCloseHook(fn) +} + +// Close implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) Close() error { + err := p.pointCollapsingIterator.Close() + p.pointCollapsingIterator = pointCollapsingIterator{} + p.childIter = nil + pcSSTIterPool.Put(p) + return err +} + // pointCollapsingIterator is an internalIterator that collapses point keys and // returns at most one point internal key for each user key. Merges are merged, // sets are emitted as-is, and SingleDeletes are collapsed with the next point @@ -112,32 +148,54 @@ type pointCollapsingIterator struct { merge base.Merge err error seqNum uint64 - // The current position of `iter`. Could be backed by savedKey (i.e. iterKey == - // &savedKey) or could be owned by `iter`. findNextEntry and similar methods - // are expected to save the current value of this to savedKey if they're - // iterating away from the current key but still need to retain it. See - // comments in findNextEntry on how this field is used. + // The current position of `iter`. Always owned by the underlying iter. + iterKey *InternalKey + // The last saved key. findNextEntry and similar methods are expected to save + // the current value of iterKey to savedKey if they're iterating away from the + // current key but still need to retain it. See comments in findNextEntry on + // how this field is used. // // At the end of a positioning call: // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned // by `iter` while savedKey is holding a copy to our current key. - // - If pos == pcIterPosCur, iterKey is pointing to either a savedKey-backed - // copy of the current key, or an `iter`-owned current key in which case - // the value of savedKey is undefined. - iterKey *InternalKey - savedKey InternalKey + // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current + // key, and savedKey is either undefined or pointing to a version of the + // current key owned by this iterator (i.e. backed by savedKeyBuf). + // - if pos == pcIterPosPrev, iterKey is pointing to the key before + // p.savedKey. p.savedKey is treated as the current key and is owned by + // this iterator, while p.iterKey is the previous key that is the current + // position of the child iterator. + savedKey InternalKey + savedKeyBuf []byte + // Saved key for substituting sequence numbers. Reused to avoid an allocation. + seqNumKey InternalKey + // elideRangeDeletes ignores range deletes returned by the interleaving + // iterator if true. + elideRangeDeletes bool // Value at the current iterator position, at iterKey. - value base.LazyValue + iterValue base.LazyValue + // Saved value backed by valueBuf, if set. Used in reverse iteration, and + // for merges. + savedValue base.LazyValue // Used for Merge keys only. valueMerger ValueMerger valueBuf []byte + // If fixedSeqNum is non-zero, all emitted points have this fixed sequence + // number. + fixedSeqNum uint64 } // SeekPrefixGE implements the InternalIterator interface. func (p *pointCollapsingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.SeekPrefixGE(prefix, key, flags) + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findNextEntry() } // SeekGE implements the InternalIterator interface. @@ -145,7 +203,7 @@ func (p *pointCollapsingIterator) SeekGE( key []byte, flags base.SeekGEFlags, ) (*base.InternalKey, base.LazyValue) { p.resetKey() - p.iterKey, p.value = p.iter.SeekGE(key, flags) + p.iterKey, p.iterValue = p.iter.SeekGE(key, flags) p.pos = pcIterPosCur if p.iterKey == nil { return nil, base.LazyValue{} @@ -157,71 +215,89 @@ func (p *pointCollapsingIterator) SeekGE( func (p *pointCollapsingIterator) SeekLT( key []byte, flags base.SeekLTFlags, ) (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.SeekLT(key, flags) + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findPrevEntry() } func (p *pointCollapsingIterator) resetKey() { - p.savedKey.UserKey = p.savedKey.UserKey[:0] + p.savedKey.UserKey = p.savedKeyBuf[:0] p.savedKey.Trailer = 0 + p.seqNumKey = InternalKey{} p.valueMerger = nil p.valueBuf = p.valueBuf[:0] p.iterKey = nil p.pos = pcIterPosCur } -// findNextEntry is called to return the next key. p.iter must be positioned at -// the start of the first user key we are interested in. -func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) { - finishAndReturnMerge := func() (*base.InternalKey, base.LazyValue) { - value, closer, err := p.valueMerger.Finish(true /* includesBase */) - if err != nil { - p.err = err - return nil, base.LazyValue{} - } - p.valueBuf = append(p.valueBuf[:0], value...) - if closer != nil { - _ = closer.Close() - } - p.valueMerger = nil - newValue := base.MakeInPlaceValue(value) - return &p.savedKey, newValue +func (p *pointCollapsingIterator) subSeqNum(key *base.InternalKey) *base.InternalKey { + if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete { + return key } + // Reuse seqNumKey. This avoids an allocation. + p.seqNumKey.UserKey = key.UserKey + p.seqNumKey.Trailer = base.MakeTrailer(p.fixedSeqNum, key.Kind()) + return &p.seqNumKey +} + +// finishAndReturnMerge finishes off the valueMerger and returns the saved key. +func (p *pointCollapsingIterator) finishAndReturnMerge() (*base.InternalKey, base.LazyValue) { + value, closer, err := p.valueMerger.Finish(true /* includesBase */) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + p.valueBuf = append(p.valueBuf[:0], value...) + if closer != nil { + _ = closer.Close() + } + p.valueMerger = nil + val := base.MakeInPlaceValue(p.valueBuf) + return p.subSeqNum(&p.savedKey), val +} - // saveKey sets p.iterKey (if not-nil) to &p.savedKey. We can use this equality - // as a proxy to determine if we're at the first internal key for a user key. +// findNextEntry is called to return the next key. p.iter must be positioned at the +// start of the first user key we are interested in. +func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) { p.saveKey() + // Saves a comparison in the fast path + firstIteration := true for p.iterKey != nil { - // NB: p.savedKey is either the current key (iff p.iterKey == &p.savedKey), + // NB: p.savedKey is either the current key (iff p.iterKey == firstKey), // or the previous key. - if p.iterKey != &p.savedKey && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { if p.valueMerger != nil { if p.savedKey.Kind() != InternalKeyKindMerge { panic(fmt.Sprintf("expected key %s to have MERGE kind", p.iterKey)) } p.pos = pcIterPosNext - return finishAndReturnMerge() + return p.finishAndReturnMerge() } p.saveKey() continue } + firstIteration = false if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) { // All future keys for this user key must be deleted. if p.valueMerger != nil { - return finishAndReturnMerge() + return p.finishAndReturnMerge() } else if p.savedKey.Kind() == InternalKeyKindSingleDelete { panic("cannot process singledel key in point collapsing iterator") } // Fast forward to the next user key. p.saveKey() - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() for p.iterKey != nil && p.savedKey.SeqNum() >= p.iterKey.SeqNum() && p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() } continue } switch p.savedKey.Kind() { case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized: - p.saveKey() // Note that we return SETs directly, even if they would otherwise get // compacted into a Del to turn into a SetWithDelete. This is a fast // path optimization that can break SINGLEDEL determinism. To lead to @@ -240,14 +316,14 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV // of blocks and can determine user key changes without doing key saves // or comparisons. p.pos = pcIterPosCur - return p.iterKey, p.value + return p.subSeqNum(p.iterKey), p.iterValue case InternalKeyKindSingleDelete: // Panic, as this iterator is not expected to observe single deletes. panic("cannot process singledel key in point collapsing iterator") case InternalKeyKindMerge: if p.valueMerger == nil { // Set up merger. This is the first Merge key encountered. - value, callerOwned, err := p.value.Value(p.valueBuf[:0]) + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) if err != nil { p.err = err return nil, base.LazyValue{} @@ -263,13 +339,13 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV return nil, base.LazyValue{} } p.saveKey() - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() continue } switch p.iterKey.Kind() { case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete: // Merge into key. - value, callerOwned, err := p.value.Value(p.valueBuf[:0]) + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) if err != nil { p.err = err return nil, base.LazyValue{} @@ -287,31 +363,144 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV if p.iterKey.Kind() != InternalKeyKindMerge { p.savedKey.SetKind(p.iterKey.Kind()) p.pos = pcIterPosCur - return finishAndReturnMerge() + return p.finishAndReturnMerge() } - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() case InternalKeyKindRangeDelete: + if p.elideRangeDeletes { + // Skip this range delete, and process any point after it. + p.iterKey, p.iterValue = p.iter.Next() + p.saveKey() + continue + } // These are interleaved by the interleaving iterator ahead of all points. // We should pass them as-is, but also account for any points ahead of // them. p.pos = pcIterPosCur - return p.iterKey, p.value + return p.subSeqNum(p.iterKey), p.iterValue default: panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind())) } } if p.valueMerger != nil { p.pos = pcIterPosNext - return finishAndReturnMerge() + return p.finishAndReturnMerge() } p.resetKey() return nil, base.LazyValue{} } +// findPrevEntry finds the relevant point key to return for the previous user key +// (i.e. in reverse iteration). Requires that the iterator is already positioned +// at the first-in-reverse (i.e. rightmost / largest) internal key encountered +// for that user key. +func (p *pointCollapsingIterator) findPrevEntry() (*base.InternalKey, base.LazyValue) { + if p.iterKey == nil { + p.pos = pcIterPosCur + return nil, base.LazyValue{} + } + + p.saveKey() + firstIteration := true + for p.iterKey != nil { + if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue + } + firstIteration = false + if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) { + // Skip this key. + p.iterKey, p.iterValue = p.iter.Prev() + p.saveKey() + continue + } + switch p.iterKey.Kind() { + case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete: + // Instead of calling saveKey(), we take advantage of the invariant that + // p.savedKey.UserKey == p.iterKey.UserKey (otherwise we'd have gone into + // the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) case at + // the top of this for loop). That allows us to just save the trailer + // and move on. + if invariants.Enabled && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + panic("unexpected inequality between p.iterKey and p.savedKey") + } + p.savedKey.Trailer = p.iterKey.Trailer + // Copy value into p.savedValue. + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + if !callerOwned { + p.valueBuf = append(p.valueBuf[:0], value...) + } else { + p.valueBuf = value + } + p.valueMerger = nil + p.savedValue = base.MakeInPlaceValue(p.valueBuf) + p.iterKey, p.iterValue = p.iter.Prev() + continue + case InternalKeyKindSingleDelete: + // Panic, as this iterator is not expected to observe single deletes. + panic("cannot process singledel key in point collapsing iterator") + case InternalKeyKindMerge: + panic("cannot process merge key in point collapsing iterator in reverse iteration") + case InternalKeyKindRangeDelete: + if p.elideRangeDeletes { + // Skip this range delete, and process any point before it. + p.iterKey, p.iterValue = p.iter.Prev() + continue + } + // These are interleaved by the interleaving iterator behind all points. + if p.savedKey.Kind() != InternalKeyKindRangeDelete { + // If the previous key was not a rangedel, we need to return it. Pretend that we're at the + // previous user key (i.e. with p.pos = pcIterPosPrev) even if we're not, so on the next + // Prev() we encounter and return this rangedel. For now return the point ahead of + // this range del (if any). + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue + } + // We take advantage of the fact that a Prev() *on* a RangeDel iterKey + // always takes us to a different user key, so on the next iteration + // we will fall into the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) + // case. + // + // Instead of calling saveKey(), we take advantage of the invariant that + // p.savedKey.UserKey == p.iterKey.UserKey (otherwise we'd have gone into + // the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) case at + // the top of this for loop). That allows us to just save the trailer + // and move on. + if invariants.Enabled && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + panic("unexpected inequality between p.iterKey and p.savedKey") + } + p.savedKey.Trailer = p.iterKey.Trailer + // Copy value into p.savedValue. + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + if !callerOwned { + p.valueBuf = append(p.valueBuf[:0], value...) + } else { + p.valueBuf = value + } + p.valueMerger = nil + p.savedValue = base.MakeInPlaceValue(p.valueBuf) + p.iterKey, p.iterValue = p.iter.Prev() + continue + default: + panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind())) + } + } + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue +} + // First implements the InternalIterator interface. func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) { p.resetKey() - p.iterKey, p.value = p.iter.First() + p.iterKey, p.iterValue = p.iter.First() p.pos = pcIterPosCur if p.iterKey == nil { return nil, base.LazyValue{} @@ -321,41 +510,67 @@ func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) { // Last implements the InternalIterator interface. func (p *pointCollapsingIterator) Last() (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.Last() + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findPrevEntry() } func (p *pointCollapsingIterator) saveKey() { if p.iterKey == nil { - p.savedKey = InternalKey{UserKey: p.savedKey.UserKey[:0]} - return - } else if p.iterKey == &p.savedKey { - // Do nothing. + p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]} return } - p.savedKey.CopyFrom(*p.iterKey) - p.iterKey = &p.savedKey + p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKey.UserKey...) + p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKey.Trailer} } // Next implements the InternalIterator interface. func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { switch p.pos { + case pcIterPosPrev: + p.saveKey() + if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + p.iterKey, p.iterValue = p.iter.Next() + p.pos = pcIterPosCur + } else { + // Fast forward to the next user key. + key, val := p.iter.Next() + // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to + // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has + // changed, without needing to do the full key comparison. + for key != nil && p.savedKey.SeqNum() >= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + key, val = p.iter.Next() + } + if key == nil { + // There are no keys to return. + p.resetKey() + return nil, base.LazyValue{} + } + p.iterKey, p.iterValue = key, val + p.pos = pcIterPosCur + } + fallthrough case pcIterPosCur: p.saveKey() - if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete { + if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { // Step over the interleaved range delete and process the very next // internal key, even if it's at the same user key. This is because a // point for that user key has not been returned yet. - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() break } - // Fast forward to the next user key. p.iterKey stayed at p.savedKey for - // this loop. + // Fast forward to the next user key. key, val := p.iter.Next() // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has // changed, without needing to do the full key comparison. - for p.iterKey != nil && key != nil && p.iterKey.SeqNum() >= key.SeqNum() && - p.comparer.Equal(p.iterKey.UserKey, key.UserKey) { + for key != nil && p.savedKey.SeqNum() >= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { key, val = p.iter.Next() } if key == nil { @@ -363,7 +578,7 @@ func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { p.resetKey() return nil, base.LazyValue{} } - p.iterKey, p.value = key, val + p.iterKey, p.iterValue = key, val case pcIterPosNext: p.pos = pcIterPosCur } @@ -376,12 +591,56 @@ func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { // NextPrefix implements the InternalIterator interface. func (p *pointCollapsingIterator) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) { + // TODO(bilal): Implement this. It'll be similar to SeekGE, except we'll call + // the child iterator's NextPrefix, and have some special logic in case pos + // is pcIterPosNext. panic("unimplemented") } // Prev implements the InternalIterator interface. func (p *pointCollapsingIterator) Prev() (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + switch p.pos { + case pcIterPosNext: + // Rewind backwards to the previous iter key. + p.saveKey() + key, val := p.iter.Prev() + for key != nil && p.savedKey.SeqNum() <= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + if key.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + // We need to pause at this range delete and return it as-is, as "cur" + // is referencing the point key after it, not the range delete. + break + } + key, val = p.iter.Prev() + } + p.iterKey = key + p.iterValue = val + p.pos = pcIterPosCur + fallthrough + case pcIterPosCur: + p.saveKey() + key, val := p.iter.Prev() + for key != nil && p.savedKey.SeqNum() <= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + if key.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + // We need to pause at this range delete and return it as-is, as "cur" + // is referencing the point key after it, not the range delete. + break + } + key, val = p.iter.Prev() + } + p.iterKey = key + p.iterValue = val + p.pos = pcIterPosCur + case pcIterPosPrev: + // Do nothing. + p.pos = pcIterPosCur + } + if p.iterKey == nil { + p.resetKey() + return nil, base.LazyValue{} + } + return p.findPrevEntry() } // Error implements the InternalIterator interface. @@ -497,7 +756,7 @@ func (d *DB) truncateSharedFile( ) defer rangeDelIter.Close() } - rangeKeyIter, err := d.tableNewRangeKeyIter(file, nil /* spanIterOptions */) + rangeKeyIter, err := d.tableNewRangeKeyIter(file, keyspan.SpanIterOptions{Level: manifest.Level(level)}) if err != nil { return nil, false, err } @@ -804,8 +1063,7 @@ func (i *scanInternalIterator) constructRangeKeyIter() { // around Key Trailer order. iter := current.RangeKeyLevels[0].Iter() for f := iter.Last(); f != nil; f = iter.Prev() { - spanIterOpts := &keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} - spanIter, err := i.newIterRangeKey(f, spanIterOpts) + spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) if err != nil { i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) continue @@ -822,7 +1080,7 @@ func (i *scanInternalIterator) constructRangeKeyIter() { continue } li := i.rangeKey.iterConfig.NewLevelIter() - spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} + spanIterOpts := i.opts.SpanIterOptions(manifest.Level(level)) li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange) i.rangeKey.iterConfig.AddLevel(li) diff --git a/scan_internal_test.go b/scan_internal_test.go index 997470320d..a009fd7616 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -7,6 +7,7 @@ package pebble import ( "context" "fmt" + "math" "strconv" "strings" "testing" @@ -251,3 +252,62 @@ func TestScanInternal(t *testing.T) { } }) } + +func TestPointCollapsingIter(t *testing.T) { + var def string + datadriven.RunTest(t, "testdata/point_collapsing_iter", func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "define": + def = d.Input + return "" + + case "iter": + var elideRangeDels bool + for i := range d.CmdArgs { + switch d.CmdArgs[i].Key { + case "elide-range-dels": + var err error + elideRangeDels, err = strconv.ParseBool(d.CmdArgs[i].Vals[0]) + if err != nil { + return err.Error() + } + } + } + f := &fakeIter{} + var spans []keyspan.Span + for _, line := range strings.Split(def, "\n") { + for _, key := range strings.Fields(line) { + j := strings.Index(key, ":") + k := base.ParseInternalKey(key[:j]) + v := []byte(key[j+1:]) + if k.Kind() == InternalKeyKindRangeDelete { + spans = append(spans, keyspan.Span{ + Start: k.UserKey, + End: v, + Keys: []keyspan.Key{{Trailer: k.Trailer}}, + KeysOrder: 0, + }) + continue + } + f.keys = append(f.keys, k) + f.vals = append(f.vals, v) + } + } + + ksIter := keyspan.NewIter(base.DefaultComparer.Compare, spans) + pcIter := &pointCollapsingIterator{ + comparer: base.DefaultComparer, + merge: base.DefaultMerger.Merge, + seqNum: math.MaxUint64, + elideRangeDeletes: elideRangeDels, + } + pcIter.iter.Init(base.DefaultComparer, f, ksIter, nil /* mask */, nil, nil) + defer pcIter.Close() + + return runInternalIterCmd(t, d, pcIter, iterCmdVerboseKey) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} diff --git a/sstable/block.go b/sstable/block.go index 27cf77ff8c..40ad4ec1bc 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -1455,6 +1455,10 @@ type fragmentBlockIter struct { err error dir int8 closeHook func(i keyspan.FragmentIterator) error + + // elideSameSeqnum, if true, returns only the first-occurring (in forward + // order) Key for each sequence number. + elideSameSeqnum bool } func (i *fragmentBlockIter) resetForReuse() fragmentBlockIter { @@ -1483,6 +1487,24 @@ func (i *fragmentBlockIter) decodeSpanKeys(k *InternalKey, internalValue []byte) } } +func (i *fragmentBlockIter) elideKeysOfSameSeqNum() { + if invariants.Enabled { + if !i.elideSameSeqnum || len(i.span.Keys) == 0 { + panic("elideKeysOfSameSeqNum called when it should not be") + } + } + lastSeqNum := i.span.Keys[0].SeqNum() + k := 1 + for j := 1; j < len(i.span.Keys); j++ { + if lastSeqNum != i.span.Keys[j].SeqNum() { + lastSeqNum = i.span.Keys[j].SeqNum() + i.span.Keys[k] = i.span.Keys[j] + k++ + } + } + i.span.Keys = i.span.Keys[:k] +} + // gatherForward gathers internal keys with identical bounds. Keys defined over // spans of the keyspace are fragmented such that any overlapping key spans have // identical bounds. When these spans are persisted to a range deletion or range @@ -1532,6 +1554,9 @@ func (i *fragmentBlockIter) gatherForward(k *InternalKey, lazyValue base.LazyVal k, lazyValue = i.blockIter.Next() internalValue = lazyValue.InPlaceValue() } + if i.elideSameSeqnum && len(i.span.Keys) > 0 { + i.elideKeysOfSameSeqNum() + } // i.blockIter is positioned over the first internal key for the next span. return &i.span } @@ -1591,6 +1616,9 @@ func (i *fragmentBlockIter) gatherBackward(k *InternalKey, lazyValue base.LazyVa // Backwards iteration encounters internal keys in the wrong order. keyspan.SortKeysByTrailer(&i.span.Keys) + if i.elideSameSeqnum && len(i.span.Keys) > 0 { + i.elideKeysOfSameSeqNum() + } return &i.span } diff --git a/sstable/options.go b/sstable/options.go index 66dbdd4d0a..869e69b17c 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -107,6 +107,9 @@ type ReaderOptions struct { // The default value uses the same ordering as bytes.Compare. Comparer *Comparer + // Merge defines the Merge function in use for this keyspace. + Merge base.Merge + // Filters is a map from filter policy name to filter policy. It is used for // debugging tools which may be used on multiple databases configured with // different filter policies. It is not necessary to populate this filters @@ -126,6 +129,9 @@ func (o ReaderOptions) ensureDefaults() ReaderOptions { if o.Comparer == nil { o.Comparer = base.DefaultComparer } + if o.Merge == nil { + o.Merge = base.DefaultMerger.Merge + } if o.MergerName == "" { o.MergerName = base.DefaultMerger.Name } diff --git a/sstable/reader.go b/sstable/reader.go index 710198edd5..b37e6f0186 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -2943,6 +2943,26 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { ), nil } +// NewFixedSeqnumRangeDelIter wraps Reader.NewFixedSeqnumRangeDelIter. +func (v *VirtualReader) NewFixedSeqnumRangeDelIter( + seqNum uint64, +) (keyspan.FragmentIterator, error) { + iter, err := v.reader.NewFixedSeqnumRangeDelIter(seqNum) + if err != nil { + return nil, err + } + if iter == nil { + return nil, nil + } + + // There should be no spans which cross virtual sstable bounds. So, no + // truncation should occur. + return keyspan.Truncate( + v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey, + &v.vState.lower, &v.vState.upper, true, /* panicOnPartialOverlap */ + ), nil +} + // NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter. func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { iter, err := v.reader.NewRawRangeKeyIter() @@ -3168,6 +3188,17 @@ func (r *Reader) newCompactionIter( // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing // iterator. Add WithContext methods since the existing ones are public. func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { + return r.NewFixedSeqnumRangeDelIter(r.Properties.GlobalSeqNum) +} + +// NewFixedSeqnumRangeDelIter returns an internal iterator for the contents of +// the range-del block of the table, with a custom sequence number to be used as +// the global sequence number for this block. Returns nil if the table does not +// contain any range deletions. +// +// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing +// iterator. Add WithContext methods since the existing ones are public. +func (r *Reader) NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) { if r.rangeDelBH.Length == 0 { return nil, nil } @@ -3175,8 +3206,8 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { if err != nil { return nil, err } - i := &fragmentBlockIter{} - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { + i := &fragmentBlockIter{elideSameSeqnum: true} + if err := i.blockIter.initHandle(r.Compare, h, seqNum); err != nil { return nil, err } return i, nil diff --git a/table_cache.go b/table_cache.go index 9a3c5b5724..186b928d9b 100644 --- a/table_cache.go +++ b/table_cache.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "io" + "math" "runtime/debug" "runtime/pprof" "sync" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" + "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "github.com/cockroachdb/pebble/sstable" @@ -138,7 +140,7 @@ func (c *tableCacheContainer) newIters( } func (c *tableCacheContainer) newRangeKeyIter( - file *manifest.FileMetadata, opts *keyspan.SpanIterOptions, + file *manifest.FileMetadata, opts keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts) } @@ -386,6 +388,7 @@ func (c *tableCacheShard) newIters( } type iterCreator interface { + NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) NewRawRangeDelIter() (keyspan.FragmentIterator, error) NewIterWithBlockPropertyFiltersAndContext( ctx context.Context, @@ -410,11 +413,38 @@ func (c *tableCacheShard) newIters( ic = &virtualReader } + provider := dbOpts.objProvider + // Check if this file is a foreign file. + objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum) + if err != nil { + return nil, nil, err + } + // NB: range-del iterator does not maintain a reference to the table, nor // does it need to read from it after creation. var rangeDelIter keyspan.FragmentIterator - rangeDelIter, err = ic.NewRawRangeDelIter() - + if provider.IsForeign(objMeta) { + if opts == nil { + panic("unexpected nil opts when reading foreign file") + } + if !file.Virtual { + // Foreign sstables must be virtual by definition. + panic(fmt.Sprintf("sstable is foreign but not virtual: %s", file.FileNum)) + } + switch manifest.LevelToInt(opts.level) { + case 5: + rangeDelIter, err = ic.NewFixedSeqnumRangeDelIter(base.SeqNumL5RangeDel) + case 6: + // Let rangeDelIter remain nil. We don't need to return rangedels from + // this file as they will not apply to any other files. For the purpose + // of collapsing rangedels within this file, we create another rangeDelIter + // below for use with the interleaving iter. + default: + panic(fmt.Sprintf("unexpected level for foreign sstable: %d", manifest.LevelToInt(opts.level))) + } + } else { + rangeDelIter, err = ic.NewRawRangeDelIter() + } if err != nil { c.unrefValue(v) return nil, nil, err @@ -470,6 +500,35 @@ func (c *tableCacheShard) newIters( // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take // care to avoid introducing an allocation here by adding a closure. iter.SetCloseHook(v.closeHook) + if provider.IsForeign(objMeta) { + // NB: IsForeign() guarantees IsShared, so opts must not be nil as we've + // already panicked on the nil case above. + pointKeySeqNum := base.PointSeqNumForLevel(manifest.LevelToInt(opts.level)) + pcIter := pointCollapsingIterator{ + comparer: dbOpts.opts.Comparer, + merge: dbOpts.opts.Merge, + seqNum: math.MaxUint64, + elideRangeDeletes: true, + fixedSeqNum: pointKeySeqNum, + } + // Open a second rangedel iter. This is solely for the interleaving iter to + // be able to efficiently delete covered range deletes. We don't need to fix + // the sequence number in this iter, as these range deletes will not be + // exposed to anything other than the interleaving iter and + // pointCollapsingIter. + rangeDelIter, err := v.reader.NewRawRangeDelIter() + if err != nil { + c.unrefValue(v) + return nil, nil, err + } + if rangeDelIter == nil { + rangeDelIter = emptyKeyspanIter + } + pcIter.iter.Init(dbOpts.opts.Comparer, iter, rangeDelIter, nil /* mask */, opts.LowerBound, opts.UpperBound) + pcSSTIter := pcSSTIterPool.Get().(*pointCollapsingSSTIterator) + *pcSSTIter = pointCollapsingSSTIterator{pointCollapsingIterator: pcIter, childIter: iter} + iter = pcSSTIter + } c.iterCount.Add(1) dbOpts.iterCount.Add(1) @@ -482,7 +541,7 @@ func (c *tableCacheShard) newIters( } func (c *tableCacheShard) newRangeKeyIter( - file *manifest.FileMetadata, opts *keyspan.SpanIterOptions, dbOpts *tableCacheOpts, + file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts, ) (keyspan.FragmentIterator, error) { // Calling findNode gives us the responsibility of decrementing v's // refCount. If opening the underlying table resulted in error, then we @@ -501,7 +560,7 @@ func (c *tableCacheShard) newRangeKeyIter( // file's range key blocks may surface deleted range keys below. This is // done here, rather than deferring to the block-property collector in order // to maintain parity with point keys and the treatment of RANGEDELs. - if opts != nil && v.reader.Properties.NumRangeKeyDels == 0 { + if v.reader.Properties.NumRangeKeyDels == 0 { ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil) } if err != nil { @@ -539,6 +598,29 @@ func (c *tableCacheShard) newRangeKeyIter( return emptyKeyspanIter, nil } + objMeta, err := dbOpts.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum) + if err != nil { + return nil, err + } + if dbOpts.objProvider.IsForeign(objMeta) { + if opts.Level == 0 { + panic("unexpected zero level when reading foreign file") + } + transform := &rangekey.ForeignSSTTransformer{ + Comparer: dbOpts.opts.Comparer, + Level: manifest.LevelToInt(opts.Level), + } + if iter == nil { + iter = emptyKeyspanIter + } + transformIter := &keyspan.TransformerIter{ + FragmentIterator: iter, + Transformer: transform, + Compare: dbOpts.opts.Comparer.Compare, + } + return transformIter, nil + } + return iter, nil } diff --git a/table_cache_test.go b/table_cache_test.go index 32f6413656..92ea9a1079 100644 --- a/table_cache_test.go +++ b/table_cache_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage" @@ -657,7 +658,7 @@ func testTableCacheFrequentlyUsedInternal(t *testing.T, rangeIter bool) { m := &fileMetadata{FileNum: FileNum(j)} m.InitPhysicalBacking() if rangeIter { - iter, err = c.newRangeKeyIter(m, nil /* iter options */) + iter, err = c.newRangeKeyIter(m, keyspan.SpanIterOptions{}) } else { iter, _, err = c.newIters(context.Background(), m, nil, internalIterOpts{}) } @@ -758,7 +759,7 @@ func testTableCacheEvictionsInternal(t *testing.T, rangeIter bool) { m := &fileMetadata{FileNum: FileNum(j)} m.InitPhysicalBacking() if rangeIter { - iter, err = c.newRangeKeyIter(m, nil /* iter options */) + iter, err = c.newRangeKeyIter(m, keyspan.SpanIterOptions{}) } else { iter, _, err = c.newIters(context.Background(), m, nil, internalIterOpts{}) } diff --git a/testdata/event_listener b/testdata/event_listener index 4bfb91792d..75619a0861 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -278,7 +278,7 @@ compact 1 2.3 K 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.4 K 11.1% (score == hit-rate) - tcache 1 744 B 40.0% (score == hit-rate) + tcache 1 752 B 40.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) @@ -373,7 +373,7 @@ compact 1 4.7 K 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 16 2.9 K 14.3% (score == hit-rate) - tcache 1 744 B 50.0% (score == hit-rate) + tcache 1 752 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/ingest b/testdata/ingest index a811e12a1e..bc457b22ca 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -48,7 +48,7 @@ compact 0 0 B 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 42.9% (score == hit-rate) - tcache 1 744 B 50.0% (score == hit-rate) + tcache 1 752 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 81981b62e6..539ad9d87d 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -34,7 +34,7 @@ compact 0 0 B 0 B 0 (size == esti zmemtbl 1 256 K ztbl 0 0 B bcache 4 697 B 0.0% (score == hit-rate) - tcache 1 744 B 0.0% (score == hit-rate) + tcache 1 752 B 0.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) @@ -145,7 +145,7 @@ compact 1 0 B 0 B 0 (size == esti zmemtbl 1 256 K ztbl 1 770 B bcache 4 697 B 42.9% (score == hit-rate) - tcache 1 744 B 66.7% (score == hit-rate) + tcache 1 752 B 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) diff --git a/testdata/point_collapsing_iter b/testdata/point_collapsing_iter new file mode 100644 index 0000000000..e46c70f543 --- /dev/null +++ b/testdata/point_collapsing_iter @@ -0,0 +1,173 @@ + +define +a.SET.5:foo +b.SET.6:foo +b.DEL.4: +c.SET.7:bar +c.SET.5:foo +---- + +iter +first +next +next +prev +next +next +next +prev +---- +a#5,1:foo +b#6,1:foo +c#7,1:bar +b#6,1:foo +c#7,1:bar +. +. +c#7,1:bar + +iter +last +prev +prev +prev +prev +prev +---- +c#7,1:bar +b#6,1:foo +a#5,1:foo +. +. +. + +# Ensure that we pause at (and return) rangedel start points correctly. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.3:c +b.SET.6:foo +b.DEL.4: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-ge b +next +next +prev +prev +prev +---- +b#72057594037927935,15: +b#6,1:foo +c#7,1:bar +b#6,1:foo +b#72057594037927935,15: +a#5,1:foo + +iter elide-range-dels=true +seek-ge b +next +next +prev +prev +prev +---- +b#6,1:foo +c#7,1:bar +. +c#7,1:bar +b#6,1:foo +a#5,1:foo + +# Ensure that the merge stops at the rangedel for b. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.4:c +b.MERGE.8:bar +b.MERGE.6:foobaz +b.SET.3:foo +b.DEL.2: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-ge a +next +next +next +next +---- +a#72057594037927935,15: +a#5,1:foo +b#72057594037927935,15: +b#8,2:foobazbar +c#7,1:bar + +iter elide-range-dels=true +first +next +next +next +---- +a#5,1:foo +b#8,2:foobazbar +c#7,1:bar +. + +# Reverse iteration tests with rangedels. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.4:c +b.SET.6:foobazbar +b.SET.3:foo +b.DEL.2: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-lt cc +prev +prev +prev +next +prev +prev +prev +next +next +---- +c#7,1:bar +b#6,1:foobazbar +b#72057594037927935,15: +a#5,1:foo +b#72057594037927935,15: +a#5,1:foo +a#72057594037927935,15: +. +a#72057594037927935,15: +a#5,1:foo + +iter elide-range-dels=true +seek-lt cc +prev +prev +next +prev +prev +---- +c#7,1:bar +b#6,1:foobazbar +a#5,1:foo +b#6,1:foobazbar +a#5,1:foo +.