diff --git a/compaction.go b/compaction.go index 77c95e1ea4..1c17d2423d 100644 --- a/compaction.go +++ b/compaction.go @@ -1474,7 +1474,7 @@ func (c *compaction) newInputIter( } if hasRangeKeys { li := &keyspan.LevelIter{} - newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { + newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { iter, err := newRangeKeyIter(file, iterOptions) if iter != nil { // Ensure that the range key iter is not closed until the compaction is diff --git a/flushable.go b/flushable.go index 0a5b2ae2bf..43bab4e3a1 100644 --- a/flushable.go +++ b/flushable.go @@ -184,7 +184,7 @@ func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) i } func (s *ingestedFlushable) constructRangeDelIter( - file *manifest.FileMetadata, _ *keyspan.SpanIterOptions, + file *manifest.FileMetadata, _ keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { // Note that the keyspan level iter expects a non-nil iterator to be // returned even if there is an error. So, we return the emptyKeyspanIter. diff --git a/internal/base/seqnums.go b/internal/base/seqnums.go index f563c1fbb9..aa38226a17 100644 --- a/internal/base/seqnums.go +++ b/internal/base/seqnums.go @@ -4,6 +4,8 @@ package base +import "fmt" + // This file defines sequence numbers that are reserved for foreign keys i.e. // internal keys coming from other Pebble instances and existing in shared // storage, as those will "slot below" any internal keys added by our own Pebble @@ -52,3 +54,16 @@ const ( // ourselves. SeqNumStart = uint64(10) ) + +// PointSeqNumForLevel returns the appropriate reserved sequence number for +// point keys in foreign sstables at the specified level. +func PointSeqNumForLevel(level int) uint64 { + switch level { + case 5: + return SeqNumL5Point + case 6: + return SeqNumL6Point + default: + panic(fmt.Sprintf("unexpected foreign sstable at level %d", level)) + } +} diff --git a/internal/keyspan/iter.go b/internal/keyspan/iter.go index 4798561ac6..1b08126b2a 100644 --- a/internal/keyspan/iter.go +++ b/internal/keyspan/iter.go @@ -62,7 +62,7 @@ type FragmentIterator interface { // TableNewSpanIter creates a new iterator for range key spans for the given // file. -type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error) +type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error) // SpanIterOptions is a subset of IterOptions that are necessary to instantiate // per-sstable span iterators. @@ -70,6 +70,9 @@ type SpanIterOptions struct { // RangeKeyFilters can be used to avoid scanning tables and blocks in tables // when iterating over range keys. RangeKeyFilters []base.BlockPropertyFilter + // Level specifies the level where this sstable is being read. Must be + // specified for foreign (i.e. shared not-created-by-this-instance) sstables. + Level manifest.Level } // Iter is an iterator over a set of fragmented spans. diff --git a/internal/keyspan/level_iter.go b/internal/keyspan/level_iter.go index 047c6a24ab..718e58fffa 100644 --- a/internal/keyspan/level_iter.go +++ b/internal/keyspan/level_iter.go @@ -179,7 +179,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur return noFileLoaded } if indicator != fileAlreadyLoaded { - l.iter, l.err = l.newIter(file, &l.tableOpts) + l.iter, l.err = l.newIter(file, l.tableOpts) indicator = newFileLoaded } if l.err != nil { diff --git a/internal/keyspan/level_iter_test.go b/internal/keyspan/level_iter_test.go index fe3cd33729..d9897dc28b 100644 --- a/internal/keyspan/level_iter_test.go +++ b/internal/keyspan/level_iter_test.go @@ -299,7 +299,7 @@ func TestLevelIterEquivalence(t *testing.T) { metas = append(metas, meta) } - tableNewIters := func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error) { + tableNewIters := func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error) { return NewIter(base.DefaultComparer.Compare, tc.levels[j][file.FileNum-1]), nil } // Add all the fileMetadatas to L6. @@ -433,7 +433,7 @@ func TestLevelIter(t *testing.T) { } if iter == nil { var lastFileNum base.FileNum - tableNewIters := func(file *manifest.FileMetadata, _ *SpanIterOptions) (FragmentIterator, error) { + tableNewIters := func(file *manifest.FileMetadata, _ SpanIterOptions) (FragmentIterator, error) { keyType := keyType spans := level[file.FileNum-1] if keyType == manifest.KeyTypePoint { diff --git a/internal/keyspan/merging_iter.go b/internal/keyspan/merging_iter.go index 247602423e..c73ba59b32 100644 --- a/internal/keyspan/merging_iter.go +++ b/internal/keyspan/merging_iter.go @@ -25,49 +25,6 @@ import ( // MergingIter implementation, but will require a bit of plumbing to thread the // Equal function. -// Transformer defines a transformation to be applied to a Span. -type Transformer interface { - // Transform takes a Span as input and writes the transformed Span to the - // provided output *Span pointer. The output Span's Keys slice may be reused - // by Transform to reduce allocations. - Transform(cmp base.Compare, in Span, out *Span) error -} - -// The TransformerFunc type is an adapter to allow the use of ordinary functions -// as Transformers. If f is a function with the appropriate signature, -// TransformerFunc(f) is a Transformer that calls f. -type TransformerFunc func(base.Compare, Span, *Span) error - -// Transform calls f(cmp, in, out). -func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error { - return tf(cmp, in, out) -} - -var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { - dst.Start, dst.End = s.Start, s.End - dst.Keys = append(dst.Keys[:0], s.Keys...) - return nil -}) - -// VisibleTransform filters keys that are invisible at the provided snapshot -// sequence number. -func VisibleTransform(snapshot uint64) Transformer { - return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { - dst.Start, dst.End = s.Start, s.End - dst.Keys = dst.Keys[:0] - for _, k := range s.Keys { - // NB: The InternalKeySeqNumMax value is used for the batch snapshot - // because a batch's visible span keys are filtered when they're - // fragmented. There's no requirement to enforce visibility at - // iteration time. - if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { - dst.Keys = append(dst.Keys, k) - } - } - return nil - }) -} - // MergingIter merges spans across levels of the LSM, exposing an iterator over // spans that yields sets of spans fragmented at unique user key boundaries. // diff --git a/internal/keyspan/transformer.go b/internal/keyspan/transformer.go new file mode 100644 index 0000000000..e0152cf4d6 --- /dev/null +++ b/internal/keyspan/transformer.go @@ -0,0 +1,149 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package keyspan + +import "github.com/cockroachdb/pebble/internal/base" + +// Transformer defines a transformation to be applied to a Span. +type Transformer interface { + // Transform takes a Span as input and writes the transformed Span to the + // provided output *Span pointer. The output Span's Keys slice may be reused + // by Transform to reduce allocations. + Transform(cmp base.Compare, in Span, out *Span) error +} + +// The TransformerFunc type is an adapter to allow the use of ordinary functions +// as Transformers. If f is a function with the appropriate signature, +// TransformerFunc(f) is a Transformer that calls f. +type TransformerFunc func(base.Compare, Span, *Span) error + +// Transform calls f(cmp, in, out). +func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error { + return tf(cmp, in, out) +} + +var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { + dst.Start, dst.End = s.Start, s.End + dst.Keys = append(dst.Keys[:0], s.Keys...) + return nil +}) + +// VisibleTransform filters keys that are invisible at the provided snapshot +// sequence number. +func VisibleTransform(snapshot uint64) Transformer { + return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { + dst.Start, dst.End = s.Start, s.End + dst.Keys = dst.Keys[:0] + for _, k := range s.Keys { + // NB: The InternalKeySeqNumMax value is used for the batch snapshot + // because a batch's visible span keys are filtered when they're + // fragmented. There's no requirement to enforce visibility at + // iteration time. + if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) { + dst.Keys = append(dst.Keys, k) + } + } + return nil + }) +} + +// TransformerIter is a FragmentIterator that applies a Transformer on all +// returned keys. Used for when a caller needs to apply a transformer on an +// iterator but does not otherwise need the mergingiter's merging ability. +type TransformerIter struct { + FragmentIterator + + // Transformer is applied on every Span returned by this iterator. + Transformer Transformer + // Comparer in use for this keyspace. + Compare base.Compare + + span Span + err error +} + +func (t *TransformerIter) applyTransform(span *Span) *Span { + t.span = Span{ + Start: t.span.Start[:0], + End: t.span.End[:0], + Keys: t.span.Keys[:0], + } + if err := t.Transformer.Transform(t.Compare, *span, &t.span); err != nil { + t.err = err + return nil + } + return &t.span +} + +// SeekGE implements the FragmentIterator interface. +func (t *TransformerIter) SeekGE(key []byte) *Span { + span := t.FragmentIterator.SeekGE(key) + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// SeekLT implements the FragmentIterator interface. +func (t *TransformerIter) SeekLT(key []byte) *Span { + span := t.FragmentIterator.SeekLT(key) + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// First implements the FragmentIterator interface. +func (t *TransformerIter) First() *Span { + span := t.FragmentIterator.First() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Last implements the FragmentIterator interface. +func (t *TransformerIter) Last() *Span { + span := t.FragmentIterator.Last() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Next implements the FragmentIterator interface. +func (t *TransformerIter) Next() *Span { + span := t.FragmentIterator.Next() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Prev implements the FragmentIterator interface. +func (t *TransformerIter) Prev() *Span { + span := t.FragmentIterator.Prev() + if span == nil { + return nil + } + return t.applyTransform(span) +} + +// Error implements the FragmentIterator interface. +func (t *TransformerIter) Error() error { + if t.err != nil { + return t.err + } + return t.FragmentIterator.Error() +} + +// Close implements the FragmentIterator interface. +func (t *TransformerIter) Close() error { + err := t.FragmentIterator.Close() + if err != nil { + return err + } + return t.err +} diff --git a/internal/rangekey/coalesce.go b/internal/rangekey/coalesce.go index a25b64dfc4..18500e7a03 100644 --- a/internal/rangekey/coalesce.go +++ b/internal/rangekey/coalesce.go @@ -367,3 +367,73 @@ func coalesce( } return nil } + +// ForeignSSTTransformer implements a keyspan.Transformer for range keys in +// foreign sstables (i.e. shared sstables not created by us). It is largely +// similar to the Transform function implemented in UserIteratorConfig in that +// it calls coalesce to remove range keys shadowed by other range keys, but also +// retains the range key that does the shadowing. In addition, it outputs range +// keys with sequence numbers that match reserved sequence numbers for that +// level (i.e. SeqNumL5RangeKeySet for L5 sets, while L6 unsets/dels are elided). +type ForeignSSTTransformer struct { + Comparer *base.Comparer + Level int + sortBuf keyspan.KeysBySuffix +} + +// Transform implements the Transformer interface. +func (f *ForeignSSTTransformer) Transform( + cmp base.Compare, s keyspan.Span, dst *keyspan.Span, +) error { + // Apply shadowing of keys. + dst.Start = s.Start + dst.End = s.End + f.sortBuf = keyspan.KeysBySuffix{ + Cmp: cmp, + Keys: f.sortBuf.Keys[:0], + } + if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil { + return err + } + keys := f.sortBuf.Keys + dst.Keys = dst.Keys[:0] + for i := range keys { + var seqNum uint64 + switch keys[i].Kind() { + case base.InternalKeyKindRangeKeySet: + if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { + panic("pebble: keys unexpectedly not in ascending suffix order") + } + switch f.Level { + case 5: + seqNum = base.SeqNumL5RangeKeySet + case 6: + seqNum = base.SeqNumL6RangeKey + } + case base.InternalKeyKindRangeKeyUnset: + if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { + panic("pebble: keys unexpectedly not in ascending suffix order") + } + fallthrough + case base.InternalKeyKindRangeKeyDelete: + switch f.Level { + case 5: + seqNum = base.SeqNumL5RangeKeyUnsetDel + case 6: + // Skip this key, as foreign sstable in L6 do not need to emit range key + // unsets/dels as they do not apply to any other sstables. + continue + } + default: + return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind()) + } + dst.Keys = append(dst.Keys, keyspan.Key{ + Trailer: base.MakeTrailer(seqNum, keys[i].Kind()), + Suffix: keys[i].Suffix, + Value: keys[i].Value, + }) + } + // coalesce results in dst.Keys being sorted by Suffix. + dst.KeysOrder = keyspan.BySuffixAsc + return nil +} diff --git a/level_checker.go b/level_checker.go index 96321380fa..4cec440c30 100644 --- a/level_checker.go +++ b/level_checker.go @@ -379,7 +379,7 @@ func checkRangeTombstones(c *checkConfig) error { atomicUnit, _ := expandToAtomicUnit(c.cmp, lf.Slice(), true /* disableIsCompacting */) lower, upper := manifest.KeyRange(c.cmp, atomicUnit.Iter()) iterToClose, iter, err := c.newIters( - context.Background(), lf.FileMetadata, nil, internalIterOpts{}) + context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{}) if err != nil { return err } diff --git a/level_iter.go b/level_iter.go index feb809d064..10ee903338 100644 --- a/level_iter.go +++ b/level_iter.go @@ -36,9 +36,9 @@ type tableNewIters func( // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter // for the rangedel iterator returned by tableNewIters. func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter { - return func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { + return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { iter, rangeDelIter, err := newIters( - ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters}, internalIterOpts{}) + ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters, level: iterOptions.Level}, internalIterOpts{}) if iter != nil { _ = iter.Close() } diff --git a/objstorage/objstorage.go b/objstorage/objstorage.go index 268eb80d11..ec96e814a6 100644 --- a/objstorage/objstorage.go +++ b/objstorage/objstorage.go @@ -216,6 +216,11 @@ type Provider interface { // Cannot be called if shared storage is not configured for the provider. SetCreatorID(creatorID CreatorID) error + // IsForeign returns whether this object is owned by a different node. Return + // value undefined if creator ID is not set yet, or if this object does not + // exist in this provider. + IsForeign(meta ObjectMetadata) bool + // SharedObjectBacking encodes the shared object metadata. SharedObjectBacking(meta *ObjectMetadata) (SharedObjectBackingHandle, error) diff --git a/objstorage/objstorageprovider/shared.go b/objstorage/objstorageprovider/shared.go index 2adc5dcefd..cb8cd66105 100644 --- a/objstorage/objstorageprovider/shared.go +++ b/objstorage/objstorageprovider/shared.go @@ -115,6 +115,14 @@ func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error { return nil } +// IsForeign is part of the objstorage.Provider interface. +func (p *provider) IsForeign(meta objstorage.ObjectMetadata) bool { + if !p.shared.initialized.Load() { + return false + } + return meta.IsShared() && p.shared.creatorID != meta.Shared.CreatorID +} + func (p *provider) sharedCheckInitialized() error { if p.sharedStorage() == nil { return errors.Errorf("shared object support not configured") diff --git a/options.go b/options.go index 58d09aabbd..235fb36f5a 100644 --- a/options.go +++ b/options.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/humanize" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/shared" "github.com/cockroachdb/pebble/sstable" @@ -222,6 +223,17 @@ func (o *IterOptions) getLogger() Logger { return o.logger } +// SpanIterOptions creates a SpanIterOptions from this IterOptions. +func (o *IterOptions) SpanIterOptions(level manifest.Level) keyspan.SpanIterOptions { + if o == nil { + return keyspan.SpanIterOptions{Level: level} + } + return keyspan.SpanIterOptions{ + RangeKeyFilters: o.RangeKeyFilters, + Level: level, + } +} + // scanInternalOptions is similar to IterOptions, meant for use with // scanInternalIterator. type scanInternalOptions struct { @@ -1616,6 +1628,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions { readerOpts.Comparer = o.Comparer readerOpts.Filters = o.Filters if o.Merger != nil { + readerOpts.Merge = o.Merger.Merge readerOpts.MergerName = o.Merger.Name } readerOpts.LoggerAndTracer = o.LoggerAndTracer diff --git a/range_keys.go b/range_keys.go index 30549d5ca3..6cd3e5875e 100644 --- a/range_keys.go +++ b/range_keys.go @@ -63,8 +63,7 @@ func (i *Iterator) constructRangeKeyIter() { // around Key Trailer order. iter := current.RangeKeyLevels[0].Iter() for f := iter.Last(); f != nil; f = iter.Prev() { - spanIterOpts := &keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} - spanIter, err := i.newIterRangeKey(f, spanIterOpts) + spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) if err != nil { i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) continue @@ -78,7 +77,7 @@ func (i *Iterator) constructRangeKeyIter() { continue } li := i.rangeKey.iterConfig.NewLevelIter() - spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} + spanIterOpts := i.opts.SpanIterOptions(manifest.Level(level)) li.Init(spanIterOpts, i.cmp, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange) i.rangeKey.iterConfig.AddLevel(li) diff --git a/scan_internal.go b/scan_internal.go index 76d20d29ed..beee42004f 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -7,12 +7,15 @@ package pebble import ( "context" "fmt" + "sync" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/sstable" ) const ( @@ -92,8 +95,41 @@ type pcIterPos int const ( pcIterPosCur pcIterPos = iota pcIterPosNext + pcIterPosPrev ) +// pointCollapsingSSTIterator implements sstable.Iterator while composing +// pointCollapsingIterator. +type pointCollapsingSSTIterator struct { + pointCollapsingIterator + childIter sstable.Iterator +} + +var pcSSTIterPool = sync.Pool{ + New: func() interface{} { + return &pointCollapsingSSTIterator{} + }, +} + +// MaybeFilteredKeys implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) MaybeFilteredKeys() bool { + return p.childIter.MaybeFilteredKeys() +} + +// SetCloseHook implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) SetCloseHook(fn func(i sstable.Iterator) error) { + p.childIter.SetCloseHook(fn) +} + +// Close implements the sstable.Iterator interface. +func (p *pointCollapsingSSTIterator) Close() error { + err := p.pointCollapsingIterator.Close() + p.pointCollapsingIterator = pointCollapsingIterator{} + p.childIter = nil + pcSSTIterPool.Put(p) + return err +} + // pointCollapsingIterator is an internalIterator that collapses point keys and // returns at most one point internal key for each user key. Merges are merged, // sets are emitted as-is, and SingleDeletes are collapsed with the next point @@ -112,32 +148,54 @@ type pointCollapsingIterator struct { merge base.Merge err error seqNum uint64 - // The current position of `iter`. Could be backed by savedKey (i.e. iterKey == - // &savedKey) or could be owned by `iter`. findNextEntry and similar methods - // are expected to save the current value of this to savedKey if they're - // iterating away from the current key but still need to retain it. See - // comments in findNextEntry on how this field is used. + // The current position of `iter`. Always owned by the underlying iter. + iterKey *InternalKey + // The last saved key. findNextEntry and similar methods are expected to save + // the current value of iterKey to savedKey if they're iterating away from the + // current key but still need to retain it. See comments in findNextEntry on + // how this field is used. // // At the end of a positioning call: // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned // by `iter` while savedKey is holding a copy to our current key. - // - If pos == pcIterPosCur, iterKey is pointing to either a savedKey-backed - // copy of the current key, or an `iter`-owned current key in which case - // the value of savedKey is undefined. - iterKey *InternalKey - savedKey InternalKey + // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current + // key, and savedKey is either undefined or pointing to a version of the + // current key owned by this iterator (i.e. backed by savedKeyBuf). + // - if pos == pcIterPosPrev, iterKey is pointing to the key before + // p.savedKey. p.savedKey is treated as the current key and is owned by + // this iterator, while p.iterKey is the previous key that is the current + // position of the child iterator. + savedKey InternalKey + savedKeyBuf []byte + // Saved key for substituting sequence numbers. Reused to avoid an allocation. + seqNumKey InternalKey + // elideRangeDeletes ignores range deletes returned by the interleaving + // iterator if true. + elideRangeDeletes bool // Value at the current iterator position, at iterKey. - value base.LazyValue + iterValue base.LazyValue + // Saved value backed by valueBuf, if set. Used in reverse iteration, and + // for merges. + savedValue base.LazyValue // Used for Merge keys only. valueMerger ValueMerger valueBuf []byte + // If fixedSeqNum is non-zero, all emitted points have this fixed sequence + // number. + fixedSeqNum uint64 } // SeekPrefixGE implements the InternalIterator interface. func (p *pointCollapsingIterator) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.SeekPrefixGE(prefix, key, flags) + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findNextEntry() } // SeekGE implements the InternalIterator interface. @@ -145,7 +203,7 @@ func (p *pointCollapsingIterator) SeekGE( key []byte, flags base.SeekGEFlags, ) (*base.InternalKey, base.LazyValue) { p.resetKey() - p.iterKey, p.value = p.iter.SeekGE(key, flags) + p.iterKey, p.iterValue = p.iter.SeekGE(key, flags) p.pos = pcIterPosCur if p.iterKey == nil { return nil, base.LazyValue{} @@ -157,71 +215,89 @@ func (p *pointCollapsingIterator) SeekGE( func (p *pointCollapsingIterator) SeekLT( key []byte, flags base.SeekLTFlags, ) (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.SeekLT(key, flags) + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findPrevEntry() } func (p *pointCollapsingIterator) resetKey() { - p.savedKey.UserKey = p.savedKey.UserKey[:0] + p.savedKey.UserKey = p.savedKeyBuf[:0] p.savedKey.Trailer = 0 + p.seqNumKey = InternalKey{} p.valueMerger = nil p.valueBuf = p.valueBuf[:0] p.iterKey = nil p.pos = pcIterPosCur } -// findNextEntry is called to return the next key. p.iter must be positioned at -// the start of the first user key we are interested in. -func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) { - finishAndReturnMerge := func() (*base.InternalKey, base.LazyValue) { - value, closer, err := p.valueMerger.Finish(true /* includesBase */) - if err != nil { - p.err = err - return nil, base.LazyValue{} - } - p.valueBuf = append(p.valueBuf[:0], value...) - if closer != nil { - _ = closer.Close() - } - p.valueMerger = nil - newValue := base.MakeInPlaceValue(value) - return &p.savedKey, newValue +func (p *pointCollapsingIterator) subSeqNum(key *base.InternalKey) *base.InternalKey { + if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete { + return key } + // Reuse seqNumKey. This avoids an allocation. + p.seqNumKey.UserKey = key.UserKey + p.seqNumKey.Trailer = base.MakeTrailer(p.fixedSeqNum, key.Kind()) + return &p.seqNumKey +} + +// finishAndReturnMerge finishes off the valueMerger and returns the saved key. +func (p *pointCollapsingIterator) finishAndReturnMerge() (*base.InternalKey, base.LazyValue) { + value, closer, err := p.valueMerger.Finish(true /* includesBase */) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + p.valueBuf = append(p.valueBuf[:0], value...) + if closer != nil { + _ = closer.Close() + } + p.valueMerger = nil + val := base.MakeInPlaceValue(p.valueBuf) + return p.subSeqNum(&p.savedKey), val +} - // saveKey sets p.iterKey (if not-nil) to &p.savedKey. We can use this equality - // as a proxy to determine if we're at the first internal key for a user key. +// findNextEntry is called to return the next key. p.iter must be positioned at the +// start of the first user key we are interested in. +func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) { p.saveKey() + // Saves a comparison in the fast path + firstIteration := true for p.iterKey != nil { - // NB: p.savedKey is either the current key (iff p.iterKey == &p.savedKey), + // NB: p.savedKey is either the current key (iff p.iterKey == firstKey), // or the previous key. - if p.iterKey != &p.savedKey && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { if p.valueMerger != nil { if p.savedKey.Kind() != InternalKeyKindMerge { panic(fmt.Sprintf("expected key %s to have MERGE kind", p.iterKey)) } p.pos = pcIterPosNext - return finishAndReturnMerge() + return p.finishAndReturnMerge() } p.saveKey() continue } + firstIteration = false if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) { // All future keys for this user key must be deleted. if p.valueMerger != nil { - return finishAndReturnMerge() + return p.finishAndReturnMerge() } else if p.savedKey.Kind() == InternalKeyKindSingleDelete { panic("cannot process singledel key in point collapsing iterator") } // Fast forward to the next user key. p.saveKey() - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() for p.iterKey != nil && p.savedKey.SeqNum() >= p.iterKey.SeqNum() && p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() } continue } switch p.savedKey.Kind() { case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized: - p.saveKey() // Note that we return SETs directly, even if they would otherwise get // compacted into a Del to turn into a SetWithDelete. This is a fast // path optimization that can break SINGLEDEL determinism. To lead to @@ -240,14 +316,14 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV // of blocks and can determine user key changes without doing key saves // or comparisons. p.pos = pcIterPosCur - return p.iterKey, p.value + return p.subSeqNum(p.iterKey), p.iterValue case InternalKeyKindSingleDelete: // Panic, as this iterator is not expected to observe single deletes. panic("cannot process singledel key in point collapsing iterator") case InternalKeyKindMerge: if p.valueMerger == nil { // Set up merger. This is the first Merge key encountered. - value, callerOwned, err := p.value.Value(p.valueBuf[:0]) + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) if err != nil { p.err = err return nil, base.LazyValue{} @@ -263,13 +339,13 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV return nil, base.LazyValue{} } p.saveKey() - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() continue } switch p.iterKey.Kind() { case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete: // Merge into key. - value, callerOwned, err := p.value.Value(p.valueBuf[:0]) + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) if err != nil { p.err = err return nil, base.LazyValue{} @@ -287,31 +363,144 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV if p.iterKey.Kind() != InternalKeyKindMerge { p.savedKey.SetKind(p.iterKey.Kind()) p.pos = pcIterPosCur - return finishAndReturnMerge() + return p.finishAndReturnMerge() } - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() case InternalKeyKindRangeDelete: + if p.elideRangeDeletes { + // Skip this range delete, and process any point after it. + p.iterKey, p.iterValue = p.iter.Next() + p.saveKey() + continue + } // These are interleaved by the interleaving iterator ahead of all points. // We should pass them as-is, but also account for any points ahead of // them. p.pos = pcIterPosCur - return p.iterKey, p.value + return p.subSeqNum(p.iterKey), p.iterValue default: panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind())) } } if p.valueMerger != nil { p.pos = pcIterPosNext - return finishAndReturnMerge() + return p.finishAndReturnMerge() } p.resetKey() return nil, base.LazyValue{} } +// findPrevEntry finds the relevant point key to return for the previous user key +// (i.e. in reverse iteration). Requires that the iterator is already positioned +// at the first-in-reverse (i.e. rightmost / largest) internal key encountered +// for that user key. +func (p *pointCollapsingIterator) findPrevEntry() (*base.InternalKey, base.LazyValue) { + if p.iterKey == nil { + p.pos = pcIterPosCur + return nil, base.LazyValue{} + } + + p.saveKey() + firstIteration := true + for p.iterKey != nil { + if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue + } + firstIteration = false + if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) { + // Skip this key. + p.iterKey, p.iterValue = p.iter.Prev() + p.saveKey() + continue + } + switch p.iterKey.Kind() { + case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete: + // Instead of calling saveKey(), we take advantage of the invariant that + // p.savedKey.UserKey == p.iterKey.UserKey (otherwise we'd have gone into + // the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) case at + // the top of this for loop). That allows us to just save the trailer + // and move on. + if invariants.Enabled && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + panic("unexpected inequality between p.iterKey and p.savedKey") + } + p.savedKey.Trailer = p.iterKey.Trailer + // Copy value into p.savedValue. + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + if !callerOwned { + p.valueBuf = append(p.valueBuf[:0], value...) + } else { + p.valueBuf = value + } + p.valueMerger = nil + p.savedValue = base.MakeInPlaceValue(p.valueBuf) + p.iterKey, p.iterValue = p.iter.Prev() + continue + case InternalKeyKindSingleDelete: + // Panic, as this iterator is not expected to observe single deletes. + panic("cannot process singledel key in point collapsing iterator") + case InternalKeyKindMerge: + panic("cannot process merge key in point collapsing iterator in reverse iteration") + case InternalKeyKindRangeDelete: + if p.elideRangeDeletes { + // Skip this range delete, and process any point before it. + p.iterKey, p.iterValue = p.iter.Prev() + continue + } + // These are interleaved by the interleaving iterator behind all points. + if p.savedKey.Kind() != InternalKeyKindRangeDelete { + // If the previous key was not a rangedel, we need to return it. Pretend that we're at the + // previous user key (i.e. with p.pos = pcIterPosPrev) even if we're not, so on the next + // Prev() we encounter and return this rangedel. For now return the point ahead of + // this range del (if any). + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue + } + // We take advantage of the fact that a Prev() *on* a RangeDel iterKey + // always takes us to a different user key, so on the next iteration + // we will fall into the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) + // case. + // + // Instead of calling saveKey(), we take advantage of the invariant that + // p.savedKey.UserKey == p.iterKey.UserKey (otherwise we'd have gone into + // the !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) case at + // the top of this for loop). That allows us to just save the trailer + // and move on. + if invariants.Enabled && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { + panic("unexpected inequality between p.iterKey and p.savedKey") + } + p.savedKey.Trailer = p.iterKey.Trailer + // Copy value into p.savedValue. + value, callerOwned, err := p.iterValue.Value(p.valueBuf[:0]) + if err != nil { + p.err = err + return nil, base.LazyValue{} + } + if !callerOwned { + p.valueBuf = append(p.valueBuf[:0], value...) + } else { + p.valueBuf = value + } + p.valueMerger = nil + p.savedValue = base.MakeInPlaceValue(p.valueBuf) + p.iterKey, p.iterValue = p.iter.Prev() + continue + default: + panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind())) + } + } + p.pos = pcIterPosPrev + return p.subSeqNum(&p.savedKey), p.savedValue +} + // First implements the InternalIterator interface. func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) { p.resetKey() - p.iterKey, p.value = p.iter.First() + p.iterKey, p.iterValue = p.iter.First() p.pos = pcIterPosCur if p.iterKey == nil { return nil, base.LazyValue{} @@ -321,41 +510,67 @@ func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) { // Last implements the InternalIterator interface. func (p *pointCollapsingIterator) Last() (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + p.resetKey() + p.iterKey, p.iterValue = p.iter.Last() + p.pos = pcIterPosCur + if p.iterKey == nil { + return nil, base.LazyValue{} + } + return p.findPrevEntry() } func (p *pointCollapsingIterator) saveKey() { if p.iterKey == nil { - p.savedKey = InternalKey{UserKey: p.savedKey.UserKey[:0]} - return - } else if p.iterKey == &p.savedKey { - // Do nothing. + p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]} return } - p.savedKey.CopyFrom(*p.iterKey) - p.iterKey = &p.savedKey + p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKey.UserKey...) + p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKey.Trailer} } // Next implements the InternalIterator interface. func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { switch p.pos { + case pcIterPosPrev: + p.saveKey() + if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + p.iterKey, p.iterValue = p.iter.Next() + p.pos = pcIterPosCur + } else { + // Fast forward to the next user key. + key, val := p.iter.Next() + // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to + // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has + // changed, without needing to do the full key comparison. + for key != nil && p.savedKey.SeqNum() >= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + key, val = p.iter.Next() + } + if key == nil { + // There are no keys to return. + p.resetKey() + return nil, base.LazyValue{} + } + p.iterKey, p.iterValue = key, val + p.pos = pcIterPosCur + } + fallthrough case pcIterPosCur: p.saveKey() - if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete { + if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { // Step over the interleaved range delete and process the very next // internal key, even if it's at the same user key. This is because a // point for that user key has not been returned yet. - p.iterKey, p.value = p.iter.Next() + p.iterKey, p.iterValue = p.iter.Next() break } - // Fast forward to the next user key. p.iterKey stayed at p.savedKey for - // this loop. + // Fast forward to the next user key. key, val := p.iter.Next() // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has // changed, without needing to do the full key comparison. - for p.iterKey != nil && key != nil && p.iterKey.SeqNum() >= key.SeqNum() && - p.comparer.Equal(p.iterKey.UserKey, key.UserKey) { + for key != nil && p.savedKey.SeqNum() >= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { key, val = p.iter.Next() } if key == nil { @@ -363,7 +578,7 @@ func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { p.resetKey() return nil, base.LazyValue{} } - p.iterKey, p.value = key, val + p.iterKey, p.iterValue = key, val case pcIterPosNext: p.pos = pcIterPosCur } @@ -376,12 +591,56 @@ func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) { // NextPrefix implements the InternalIterator interface. func (p *pointCollapsingIterator) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) { + // TODO(bilal): Implement this. It'll be similar to SeekGE, except we'll call + // the child iterator's NextPrefix, and have some special logic in case pos + // is pcIterPosNext. panic("unimplemented") } // Prev implements the InternalIterator interface. func (p *pointCollapsingIterator) Prev() (*base.InternalKey, base.LazyValue) { - panic("unimplemented") + switch p.pos { + case pcIterPosNext: + // Rewind backwards to the previous iter key. + p.saveKey() + key, val := p.iter.Prev() + for key != nil && p.savedKey.SeqNum() <= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + if key.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + // We need to pause at this range delete and return it as-is, as "cur" + // is referencing the point key after it, not the range delete. + break + } + key, val = p.iter.Prev() + } + p.iterKey = key + p.iterValue = val + p.pos = pcIterPosCur + fallthrough + case pcIterPosCur: + p.saveKey() + key, val := p.iter.Prev() + for key != nil && p.savedKey.SeqNum() <= key.SeqNum() && + p.comparer.Equal(p.savedKey.UserKey, key.UserKey) { + if key.Kind() == InternalKeyKindRangeDelete && !p.elideRangeDeletes { + // We need to pause at this range delete and return it as-is, as "cur" + // is referencing the point key after it, not the range delete. + break + } + key, val = p.iter.Prev() + } + p.iterKey = key + p.iterValue = val + p.pos = pcIterPosCur + case pcIterPosPrev: + // Do nothing. + p.pos = pcIterPosCur + } + if p.iterKey == nil { + p.resetKey() + return nil, base.LazyValue{} + } + return p.findPrevEntry() } // Error implements the InternalIterator interface. @@ -497,7 +756,7 @@ func (d *DB) truncateSharedFile( ) defer rangeDelIter.Close() } - rangeKeyIter, err := d.tableNewRangeKeyIter(file, nil /* spanIterOptions */) + rangeKeyIter, err := d.tableNewRangeKeyIter(file, keyspan.SpanIterOptions{Level: manifest.Level(level)}) if err != nil { return nil, false, err } @@ -804,8 +1063,7 @@ func (i *scanInternalIterator) constructRangeKeyIter() { // around Key Trailer order. iter := current.RangeKeyLevels[0].Iter() for f := iter.Last(); f != nil; f = iter.Prev() { - spanIterOpts := &keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} - spanIter, err := i.newIterRangeKey(f, spanIterOpts) + spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions(manifest.Level(0))) if err != nil { i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) continue @@ -822,7 +1080,7 @@ func (i *scanInternalIterator) constructRangeKeyIter() { continue } li := i.rangeKey.iterConfig.NewLevelIter() - spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters} + spanIterOpts := i.opts.SpanIterOptions(manifest.Level(level)) li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange) i.rangeKey.iterConfig.AddLevel(li) diff --git a/scan_internal_test.go b/scan_internal_test.go index 997470320d..a009fd7616 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -7,6 +7,7 @@ package pebble import ( "context" "fmt" + "math" "strconv" "strings" "testing" @@ -251,3 +252,62 @@ func TestScanInternal(t *testing.T) { } }) } + +func TestPointCollapsingIter(t *testing.T) { + var def string + datadriven.RunTest(t, "testdata/point_collapsing_iter", func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "define": + def = d.Input + return "" + + case "iter": + var elideRangeDels bool + for i := range d.CmdArgs { + switch d.CmdArgs[i].Key { + case "elide-range-dels": + var err error + elideRangeDels, err = strconv.ParseBool(d.CmdArgs[i].Vals[0]) + if err != nil { + return err.Error() + } + } + } + f := &fakeIter{} + var spans []keyspan.Span + for _, line := range strings.Split(def, "\n") { + for _, key := range strings.Fields(line) { + j := strings.Index(key, ":") + k := base.ParseInternalKey(key[:j]) + v := []byte(key[j+1:]) + if k.Kind() == InternalKeyKindRangeDelete { + spans = append(spans, keyspan.Span{ + Start: k.UserKey, + End: v, + Keys: []keyspan.Key{{Trailer: k.Trailer}}, + KeysOrder: 0, + }) + continue + } + f.keys = append(f.keys, k) + f.vals = append(f.vals, v) + } + } + + ksIter := keyspan.NewIter(base.DefaultComparer.Compare, spans) + pcIter := &pointCollapsingIterator{ + comparer: base.DefaultComparer, + merge: base.DefaultMerger.Merge, + seqNum: math.MaxUint64, + elideRangeDeletes: elideRangeDels, + } + pcIter.iter.Init(base.DefaultComparer, f, ksIter, nil /* mask */, nil, nil) + defer pcIter.Close() + + return runInternalIterCmd(t, d, pcIter, iterCmdVerboseKey) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} diff --git a/sstable/block.go b/sstable/block.go index 27cf77ff8c..40ad4ec1bc 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -1455,6 +1455,10 @@ type fragmentBlockIter struct { err error dir int8 closeHook func(i keyspan.FragmentIterator) error + + // elideSameSeqnum, if true, returns only the first-occurring (in forward + // order) Key for each sequence number. + elideSameSeqnum bool } func (i *fragmentBlockIter) resetForReuse() fragmentBlockIter { @@ -1483,6 +1487,24 @@ func (i *fragmentBlockIter) decodeSpanKeys(k *InternalKey, internalValue []byte) } } +func (i *fragmentBlockIter) elideKeysOfSameSeqNum() { + if invariants.Enabled { + if !i.elideSameSeqnum || len(i.span.Keys) == 0 { + panic("elideKeysOfSameSeqNum called when it should not be") + } + } + lastSeqNum := i.span.Keys[0].SeqNum() + k := 1 + for j := 1; j < len(i.span.Keys); j++ { + if lastSeqNum != i.span.Keys[j].SeqNum() { + lastSeqNum = i.span.Keys[j].SeqNum() + i.span.Keys[k] = i.span.Keys[j] + k++ + } + } + i.span.Keys = i.span.Keys[:k] +} + // gatherForward gathers internal keys with identical bounds. Keys defined over // spans of the keyspace are fragmented such that any overlapping key spans have // identical bounds. When these spans are persisted to a range deletion or range @@ -1532,6 +1554,9 @@ func (i *fragmentBlockIter) gatherForward(k *InternalKey, lazyValue base.LazyVal k, lazyValue = i.blockIter.Next() internalValue = lazyValue.InPlaceValue() } + if i.elideSameSeqnum && len(i.span.Keys) > 0 { + i.elideKeysOfSameSeqNum() + } // i.blockIter is positioned over the first internal key for the next span. return &i.span } @@ -1591,6 +1616,9 @@ func (i *fragmentBlockIter) gatherBackward(k *InternalKey, lazyValue base.LazyVa // Backwards iteration encounters internal keys in the wrong order. keyspan.SortKeysByTrailer(&i.span.Keys) + if i.elideSameSeqnum && len(i.span.Keys) > 0 { + i.elideKeysOfSameSeqNum() + } return &i.span } diff --git a/sstable/options.go b/sstable/options.go index 66dbdd4d0a..869e69b17c 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -107,6 +107,9 @@ type ReaderOptions struct { // The default value uses the same ordering as bytes.Compare. Comparer *Comparer + // Merge defines the Merge function in use for this keyspace. + Merge base.Merge + // Filters is a map from filter policy name to filter policy. It is used for // debugging tools which may be used on multiple databases configured with // different filter policies. It is not necessary to populate this filters @@ -126,6 +129,9 @@ func (o ReaderOptions) ensureDefaults() ReaderOptions { if o.Comparer == nil { o.Comparer = base.DefaultComparer } + if o.Merge == nil { + o.Merge = base.DefaultMerger.Merge + } if o.MergerName == "" { o.MergerName = base.DefaultMerger.Name } diff --git a/sstable/reader.go b/sstable/reader.go index 710198edd5..b37e6f0186 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -2943,6 +2943,26 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { ), nil } +// NewFixedSeqnumRangeDelIter wraps Reader.NewFixedSeqnumRangeDelIter. +func (v *VirtualReader) NewFixedSeqnumRangeDelIter( + seqNum uint64, +) (keyspan.FragmentIterator, error) { + iter, err := v.reader.NewFixedSeqnumRangeDelIter(seqNum) + if err != nil { + return nil, err + } + if iter == nil { + return nil, nil + } + + // There should be no spans which cross virtual sstable bounds. So, no + // truncation should occur. + return keyspan.Truncate( + v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey, + &v.vState.lower, &v.vState.upper, true, /* panicOnPartialOverlap */ + ), nil +} + // NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter. func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { iter, err := v.reader.NewRawRangeKeyIter() @@ -3168,6 +3188,17 @@ func (r *Reader) newCompactionIter( // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing // iterator. Add WithContext methods since the existing ones are public. func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { + return r.NewFixedSeqnumRangeDelIter(r.Properties.GlobalSeqNum) +} + +// NewFixedSeqnumRangeDelIter returns an internal iterator for the contents of +// the range-del block of the table, with a custom sequence number to be used as +// the global sequence number for this block. Returns nil if the table does not +// contain any range deletions. +// +// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing +// iterator. Add WithContext methods since the existing ones are public. +func (r *Reader) NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) { if r.rangeDelBH.Length == 0 { return nil, nil } @@ -3175,8 +3206,8 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { if err != nil { return nil, err } - i := &fragmentBlockIter{} - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { + i := &fragmentBlockIter{elideSameSeqnum: true} + if err := i.blockIter.initHandle(r.Compare, h, seqNum); err != nil { return nil, err } return i, nil diff --git a/table_cache.go b/table_cache.go index 9a3c5b5724..186b928d9b 100644 --- a/table_cache.go +++ b/table_cache.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "io" + "math" "runtime/debug" "runtime/pprof" "sync" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" + "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "github.com/cockroachdb/pebble/sstable" @@ -138,7 +140,7 @@ func (c *tableCacheContainer) newIters( } func (c *tableCacheContainer) newRangeKeyIter( - file *manifest.FileMetadata, opts *keyspan.SpanIterOptions, + file *manifest.FileMetadata, opts keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts) } @@ -386,6 +388,7 @@ func (c *tableCacheShard) newIters( } type iterCreator interface { + NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) NewRawRangeDelIter() (keyspan.FragmentIterator, error) NewIterWithBlockPropertyFiltersAndContext( ctx context.Context, @@ -410,11 +413,38 @@ func (c *tableCacheShard) newIters( ic = &virtualReader } + provider := dbOpts.objProvider + // Check if this file is a foreign file. + objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum) + if err != nil { + return nil, nil, err + } + // NB: range-del iterator does not maintain a reference to the table, nor // does it need to read from it after creation. var rangeDelIter keyspan.FragmentIterator - rangeDelIter, err = ic.NewRawRangeDelIter() - + if provider.IsForeign(objMeta) { + if opts == nil { + panic("unexpected nil opts when reading foreign file") + } + if !file.Virtual { + // Foreign sstables must be virtual by definition. + panic(fmt.Sprintf("sstable is foreign but not virtual: %s", file.FileNum)) + } + switch manifest.LevelToInt(opts.level) { + case 5: + rangeDelIter, err = ic.NewFixedSeqnumRangeDelIter(base.SeqNumL5RangeDel) + case 6: + // Let rangeDelIter remain nil. We don't need to return rangedels from + // this file as they will not apply to any other files. For the purpose + // of collapsing rangedels within this file, we create another rangeDelIter + // below for use with the interleaving iter. + default: + panic(fmt.Sprintf("unexpected level for foreign sstable: %d", manifest.LevelToInt(opts.level))) + } + } else { + rangeDelIter, err = ic.NewRawRangeDelIter() + } if err != nil { c.unrefValue(v) return nil, nil, err @@ -470,6 +500,35 @@ func (c *tableCacheShard) newIters( // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take // care to avoid introducing an allocation here by adding a closure. iter.SetCloseHook(v.closeHook) + if provider.IsForeign(objMeta) { + // NB: IsForeign() guarantees IsShared, so opts must not be nil as we've + // already panicked on the nil case above. + pointKeySeqNum := base.PointSeqNumForLevel(manifest.LevelToInt(opts.level)) + pcIter := pointCollapsingIterator{ + comparer: dbOpts.opts.Comparer, + merge: dbOpts.opts.Merge, + seqNum: math.MaxUint64, + elideRangeDeletes: true, + fixedSeqNum: pointKeySeqNum, + } + // Open a second rangedel iter. This is solely for the interleaving iter to + // be able to efficiently delete covered range deletes. We don't need to fix + // the sequence number in this iter, as these range deletes will not be + // exposed to anything other than the interleaving iter and + // pointCollapsingIter. + rangeDelIter, err := v.reader.NewRawRangeDelIter() + if err != nil { + c.unrefValue(v) + return nil, nil, err + } + if rangeDelIter == nil { + rangeDelIter = emptyKeyspanIter + } + pcIter.iter.Init(dbOpts.opts.Comparer, iter, rangeDelIter, nil /* mask */, opts.LowerBound, opts.UpperBound) + pcSSTIter := pcSSTIterPool.Get().(*pointCollapsingSSTIterator) + *pcSSTIter = pointCollapsingSSTIterator{pointCollapsingIterator: pcIter, childIter: iter} + iter = pcSSTIter + } c.iterCount.Add(1) dbOpts.iterCount.Add(1) @@ -482,7 +541,7 @@ func (c *tableCacheShard) newIters( } func (c *tableCacheShard) newRangeKeyIter( - file *manifest.FileMetadata, opts *keyspan.SpanIterOptions, dbOpts *tableCacheOpts, + file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts, ) (keyspan.FragmentIterator, error) { // Calling findNode gives us the responsibility of decrementing v's // refCount. If opening the underlying table resulted in error, then we @@ -501,7 +560,7 @@ func (c *tableCacheShard) newRangeKeyIter( // file's range key blocks may surface deleted range keys below. This is // done here, rather than deferring to the block-property collector in order // to maintain parity with point keys and the treatment of RANGEDELs. - if opts != nil && v.reader.Properties.NumRangeKeyDels == 0 { + if v.reader.Properties.NumRangeKeyDels == 0 { ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil) } if err != nil { @@ -539,6 +598,29 @@ func (c *tableCacheShard) newRangeKeyIter( return emptyKeyspanIter, nil } + objMeta, err := dbOpts.objProvider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum) + if err != nil { + return nil, err + } + if dbOpts.objProvider.IsForeign(objMeta) { + if opts.Level == 0 { + panic("unexpected zero level when reading foreign file") + } + transform := &rangekey.ForeignSSTTransformer{ + Comparer: dbOpts.opts.Comparer, + Level: manifest.LevelToInt(opts.Level), + } + if iter == nil { + iter = emptyKeyspanIter + } + transformIter := &keyspan.TransformerIter{ + FragmentIterator: iter, + Transformer: transform, + Compare: dbOpts.opts.Comparer.Compare, + } + return transformIter, nil + } + return iter, nil } diff --git a/table_cache_test.go b/table_cache_test.go index 32f6413656..92ea9a1079 100644 --- a/table_cache_test.go +++ b/table_cache_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage" @@ -657,7 +658,7 @@ func testTableCacheFrequentlyUsedInternal(t *testing.T, rangeIter bool) { m := &fileMetadata{FileNum: FileNum(j)} m.InitPhysicalBacking() if rangeIter { - iter, err = c.newRangeKeyIter(m, nil /* iter options */) + iter, err = c.newRangeKeyIter(m, keyspan.SpanIterOptions{}) } else { iter, _, err = c.newIters(context.Background(), m, nil, internalIterOpts{}) } @@ -758,7 +759,7 @@ func testTableCacheEvictionsInternal(t *testing.T, rangeIter bool) { m := &fileMetadata{FileNum: FileNum(j)} m.InitPhysicalBacking() if rangeIter { - iter, err = c.newRangeKeyIter(m, nil /* iter options */) + iter, err = c.newRangeKeyIter(m, keyspan.SpanIterOptions{}) } else { iter, _, err = c.newIters(context.Background(), m, nil, internalIterOpts{}) } diff --git a/testdata/event_listener b/testdata/event_listener index 4bfb91792d..75619a0861 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -278,7 +278,7 @@ compact 1 2.3 K 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.4 K 11.1% (score == hit-rate) - tcache 1 744 B 40.0% (score == hit-rate) + tcache 1 752 B 40.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) @@ -373,7 +373,7 @@ compact 1 4.7 K 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 16 2.9 K 14.3% (score == hit-rate) - tcache 1 744 B 50.0% (score == hit-rate) + tcache 1 752 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/ingest b/testdata/ingest index a811e12a1e..bc457b22ca 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -48,7 +48,7 @@ compact 0 0 B 0 B 0 (size == esti zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 42.9% (score == hit-rate) - tcache 1 744 B 50.0% (score == hit-rate) + tcache 1 752 B 50.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 81981b62e6..539ad9d87d 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -34,7 +34,7 @@ compact 0 0 B 0 B 0 (size == esti zmemtbl 1 256 K ztbl 0 0 B bcache 4 697 B 0.0% (score == hit-rate) - tcache 1 744 B 0.0% (score == hit-rate) + tcache 1 752 B 0.0% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) @@ -145,7 +145,7 @@ compact 1 0 B 0 B 0 (size == esti zmemtbl 1 256 K ztbl 1 770 B bcache 4 697 B 42.9% (score == hit-rate) - tcache 1 744 B 66.7% (score == hit-rate) + tcache 1 752 B 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 1 filter - - 0.0% (score == utility) diff --git a/testdata/point_collapsing_iter b/testdata/point_collapsing_iter new file mode 100644 index 0000000000..e46c70f543 --- /dev/null +++ b/testdata/point_collapsing_iter @@ -0,0 +1,173 @@ + +define +a.SET.5:foo +b.SET.6:foo +b.DEL.4: +c.SET.7:bar +c.SET.5:foo +---- + +iter +first +next +next +prev +next +next +next +prev +---- +a#5,1:foo +b#6,1:foo +c#7,1:bar +b#6,1:foo +c#7,1:bar +. +. +c#7,1:bar + +iter +last +prev +prev +prev +prev +prev +---- +c#7,1:bar +b#6,1:foo +a#5,1:foo +. +. +. + +# Ensure that we pause at (and return) rangedel start points correctly. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.3:c +b.SET.6:foo +b.DEL.4: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-ge b +next +next +prev +prev +prev +---- +b#72057594037927935,15: +b#6,1:foo +c#7,1:bar +b#6,1:foo +b#72057594037927935,15: +a#5,1:foo + +iter elide-range-dels=true +seek-ge b +next +next +prev +prev +prev +---- +b#6,1:foo +c#7,1:bar +. +c#7,1:bar +b#6,1:foo +a#5,1:foo + +# Ensure that the merge stops at the rangedel for b. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.4:c +b.MERGE.8:bar +b.MERGE.6:foobaz +b.SET.3:foo +b.DEL.2: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-ge a +next +next +next +next +---- +a#72057594037927935,15: +a#5,1:foo +b#72057594037927935,15: +b#8,2:foobazbar +c#7,1:bar + +iter elide-range-dels=true +first +next +next +next +---- +a#5,1:foo +b#8,2:foobazbar +c#7,1:bar +. + +# Reverse iteration tests with rangedels. + +define +a.RANGEDEL.4:b +a.SET.5:foo +b.RANGEDEL.4:c +b.SET.6:foobazbar +b.SET.3:foo +b.DEL.2: +c.SET.7:bar +c.SET.5:foo +---- + +iter +seek-lt cc +prev +prev +prev +next +prev +prev +prev +next +next +---- +c#7,1:bar +b#6,1:foobazbar +b#72057594037927935,15: +a#5,1:foo +b#72057594037927935,15: +a#5,1:foo +a#72057594037927935,15: +. +a#72057594037927935,15: +a#5,1:foo + +iter elide-range-dels=true +seek-lt cc +prev +prev +next +prev +prev +---- +c#7,1:bar +b#6,1:foobazbar +a#5,1:foo +b#6,1:foobazbar +a#5,1:foo +.