Skip to content

Commit

Permalink
db: rework merging iterator range deletion handling
Browse files Browse the repository at this point in the history
Rework the merging iterator's handling of range deletions to use the
interleaved range deletion boundary keys to determine when the iterator is
positioned within a level's range deletion. This removes the direct
manipulation of a range deletion keyspan.FragmentIterator from the mergingIter,
delegating that to the child iterator's keyspan.InterleavingIter.

This factoring is a little cleaner and decouples the mergingIter from the
details of range deletion iterators, and in particular, the levelIter's
individual sstables. It also should reduce key comparisons, especially during
scans, by avoiding unnecessary key comparisons with range deletions that are
loaded but outside the keyspace being iterated over.

Close #2863.
  • Loading branch information
jbowens committed May 10, 2024
1 parent cff81a8 commit e9f5888
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 658 deletions.
20 changes: 9 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -1431,30 +1432,26 @@ func (i *Iterator) constructPointIter(
} else {
i.batch.initInternalIter(&i.opts, &i.batchPointIter)
i.batch.initRangeDelIter(&i.opts, &i.batchRangeDelIter, i.batchSeqNum)
mil := mergingIterLevel{iter: &i.batchPointIter, getTombstone: nil}
// Only include the batch's rangedel iterator if it's non-empty.
// This requires some subtle logic in the case a rangedel is later
// written to the batch and the view of the batch is refreshed
// during a call to SetOptions—in this case, we need to reconstruct
// the point iterator to add the batch rangedel iterator.
var rangeDelIter keyspan.FragmentIterator
if i.batchRangeDelIter.Count() > 0 {
rangeDelIter = &i.batchRangeDelIter
mil.iter, mil.getTombstone = rangedel.Interleave(&i.comparer, &i.batchPointIter, &i.batchRangeDelIter)
}
mlevels = append(mlevels, mergingIterLevel{
iter: &i.batchPointIter,
rangeDelIter: rangeDelIter,
})
mlevels = append(mlevels, mil)
}
}

if !i.batchOnlyIter {
// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts),
rangeDelIter: mem.newRangeDelIter(&i.opts),
})
mil := mergingIterLevel{}
mil.iter, mil.getTombstone = rangedel.Interleave(&i.comparer, mem.newIter(&i.opts), mem.newRangeDelIter(&i.opts))
mlevels = append(mlevels, mil)
}

// Next are the file levels: L0 sub-levels followed by lower levels.
Expand All @@ -1467,10 +1464,11 @@ func (i *Iterator) constructPointIter(
li := &levels[levelsIndex]

li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
li.interleaveRangeDeletions = true
li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
mlevels[mlevelsIndex].levelIter = li
mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
mlevels[mlevelsIndex].getTombstone = li.getTombstone

levelsIndex++
mlevelsIndex++
Expand Down
10 changes: 6 additions & 4 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/sstable"
)

Expand Down Expand Up @@ -212,10 +213,11 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
if err != nil {
return nil, err
}
mlevels = append(mlevels, mergingIterLevel{
iter: pointIter,
rangeDelIter: rangeDelIter,
})
mil := mergingIterLevel{iter: pointIter, getTombstone: nil}
if rangeDelIter != nil {
mil.iter, mil.getTombstone = rangedel.Interleave(&it.comparer, mil.iter, rangeDelIter)
}
mlevels = append(mlevels, mil)
}
}

Expand Down
11 changes: 11 additions & 0 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,14 @@ func (kv *InternalKV) Visible(snapshot, batchSnapshot uint64) bool {
func (kv *InternalKV) IsExclusiveSentinel() bool {
return kv.K.IsExclusiveSentinel()
}

// String returns a string representation of the kv pair.
func (kv *InternalKV) String() string {
if kv == nil {
return "<nil>"
}
if kv.V.Fetcher != nil {
return fmt.Sprintf("%s=<valblk:%x>", kv.K, kv.V.ValueOrHandle)
}
return fmt.Sprintf("%s:%s", kv.K, FormatBytes(kv.V.ValueOrHandle))
}
7 changes: 4 additions & 3 deletions internal/keyspan/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,11 @@ func (s Span) Visible(snapshot uint64) Span {
// VisibleAt requires the Span's keys be in ByTrailerDesc order. It panics if
// the span's keys are sorted in a different order.
func (s *Span) VisibleAt(snapshot uint64) bool {
if s.KeysOrder != ByTrailerDesc {
if s == nil {
return false
} else if s.KeysOrder != ByTrailerDesc {
panic("pebble: span's keys unexpectedly not in trailer order")
}
if len(s.Keys) == 0 {
} else if len(s.Keys) == 0 {
return false
} else if first := s.Keys[0].SeqNum(); first&base.InternalKeySeqNumBatch != 0 {
// Only visible batch keys are included when an Iterator's batch spans
Expand Down
48 changes: 48 additions & 0 deletions internal/rangedel/rangedel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package rangedel

import (
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
)
Expand Down Expand Up @@ -41,3 +43,49 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) keyspan.Span {
}),
}
}

// Interleave takes a point iterator and a range deletion iterator, returning an
// iterator that interleaves range deletion boundary keys at the maximal
// sequence number among the stream of point keys with SPANSTART and SPANEND key
// kinds.
//
// In addition, Interleave returns a function that may be used to retrieve the
// range tombstone overlapping the current iterator position, if any.
//
// The returned iterator must only be closed once.
func Interleave(
comparer *base.Comparer, iter base.InternalIterator, rangeDelIter keyspan.FragmentIterator,
) (base.InternalIterator, func() *keyspan.Span) {
// If there is no range deletion iterator, don't bother using an interleaving
// iterator. We can return iter verbatim and a func that unconditionally
// returns nil.
if rangeDelIter == nil {
return iter, nil
}

ii := interleavingIterPool.Get().(*interleavingIter)
ii.Init(comparer, iter, rangeDelIter, keyspan.InterleavingIterOpts{
InterleaveEndKeys: true,
UseBoundaryKeyKinds: true,
})
return ii, ii.Span
}

var interleavingIterPool = sync.Pool{
New: func() interface{} {
return &interleavingIter{}
},
}

type interleavingIter struct {
keyspan.InterleavingIter
}

// Close closes the interleaving iterator and returns the interleaving iterator
// to the pool.
func (i *interleavingIter) Close() error {
err := i.InterleavingIter.Close()
*i = interleavingIter{}
interleavingIterPool.Put(i)
return err
}
4 changes: 2 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,8 +1820,8 @@ func (i *Iterator) nextPrefix() IterValidityState {
return i.iterValidityState
}
if invariants.Enabled && !i.equal(i.iterKV.K.UserKey, i.key) {
i.opts.getLogger().Fatalf("pebble: invariant violation: Nexting internal iterator from iterPosPrev landed on %q, not %q",
i.iterKV.K.UserKey, i.key)
panic(errors.AssertionFailedf("pebble: invariant violation: Nexting internal iterator from iterPosPrev landed on %q, not %q",
i.iterKV.K.UserKey, i.key))
}
}
// The internal iterator is now positioned at i.key. Advance to the next
Expand Down
57 changes: 21 additions & 36 deletions level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangedel"
)

// This file implements DB.CheckLevels() which checks that every entry in the
Expand Down Expand Up @@ -47,11 +48,10 @@ import (

// The per-level structure used by simpleMergingIter.
type simpleMergingIterLevel struct {
iter internalIterator
rangeDelIter keyspan.FragmentIterator

iterKV *base.InternalKV
tombstone *keyspan.Span
iter internalIterator
getTombstone func() *keyspan.Span
iterKV *base.InternalKV
iterTombstone *keyspan.Span
}

type simpleMergingIter struct {
Expand Down Expand Up @@ -102,22 +102,6 @@ func (m *simpleMergingIter) init(
if m.heap.len() == 0 {
return
}
m.positionRangeDels()
}

// Positions all the rangedel iterators at or past the current top of the
// heap, using SeekGE().
func (m *simpleMergingIter) positionRangeDels() {
item := &m.heap.items[0]
for i := range m.levels {
l := &m.levels[i]
if l.rangeDelIter == nil {
continue
}
t, err := l.rangeDelIter.SeekGE(item.key.UserKey)
m.err = firstError(m.err, err)
l.tombstone = t
}
}

// Returns true if not yet done.
Expand All @@ -128,7 +112,12 @@ func (m *simpleMergingIter) step() bool {
item := &m.heap.items[0]
l := &m.levels[item.index]
// Sentinels are not relevant for this point checking.
if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
if item.key.IsExclusiveSentinel() {
l.iterTombstone = nil
if item.key.Kind() != base.InternalKeyKindSpanEnd {
l.iterTombstone = l.getTombstone()
}
} else if item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
// This is a visible point key.
if !m.handleVisiblePoint(item, l) {
return false
Expand Down Expand Up @@ -187,7 +176,6 @@ func (m *simpleMergingIter) step() bool {
}
return false
}
m.positionRangeDels()
return true
}

Expand Down Expand Up @@ -269,12 +257,12 @@ func (m *simpleMergingIter) handleVisiblePoint(
// iterators must be positioned at a key > item.key.
for level := item.index + 1; level < len(m.levels); level++ {
lvl := &m.levels[level]
if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
if lvl.iterTombstone.Empty() {
continue
}
if lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
if lvl.iterTombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.iterTombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
lvl.iterTombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
l.iter)
return false
}
Expand Down Expand Up @@ -593,20 +581,15 @@ func checkLevelsInternal(c *checkConfig) (err error) {
err = firstError(err, l.iter.Close())
l.iter = nil
}
if l.rangeDelIter != nil {
err = firstError(err, l.rangeDelIter.Close())
l.rangeDelIter = nil
}
}
}()

memtables := c.readState.memtables
for i := len(memtables) - 1; i >= 0; i-- {
mem := memtables[i]
mlevels = append(mlevels, simpleMergingIterLevel{
iter: mem.newIter(nil),
rangeDelIter: mem.newRangeDelIter(nil),
})
var smil simpleMergingIterLevel
smil.iter, smil.getTombstone = rangedel.Interleave(c.comparer, mem.newIter(nil), mem.newRangeDelIter(nil))
mlevels = append(mlevels, smil)
}

current := c.readState.current
Expand Down Expand Up @@ -636,8 +619,9 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li := &levelIter{}
li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
manifest.L0Sublevel(sublevel), internalIterOpts{})
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.interleaveRangeDeletions = true
mlevelAlloc[0].iter = li
mlevelAlloc[0].getTombstone = li.getTombstone
mlevelAlloc = mlevelAlloc[1:]
}
for level := 1; level < len(current.Levels); level++ {
Expand All @@ -649,8 +633,9 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li := &levelIter{}
li.init(context.Background(), iterOpts, c.comparer, c.newIters,
current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.interleaveRangeDeletions = true
mlevelAlloc[0].iter = li
mlevelAlloc[0].getTombstone = li.getTombstone
mlevelAlloc = mlevelAlloc[1:]
}

Expand Down
Loading

0 comments on commit e9f5888

Please sign in to comment.