Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crl-release-20.2: db: mergingIter.SeekPrefixGE stops early if prefix cannot match #1100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
f := c.flushing[0]
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
return newMergingIter(c.logger, c.cmp, iter, rangeDelIter), nil
return newMergingIter(c.logger, c.cmp, nil, iter, rangeDelIter), nil
}
return iter, nil
}
Expand All @@ -513,7 +513,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
iters = append(iters, rangeDelIter)
}
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

// Check that the LSM ordering invariants are ok in order to prevent
Expand Down Expand Up @@ -666,7 +666,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
if err != nil {
return nil, err
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

func (c *compaction) String() string {
Expand Down
4 changes: 2 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func (d *DB) newIterInternal(
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}

buf.merging.init(&dbi.opts, d.cmp, finalMLevels...)
buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
return dbi
Expand Down Expand Up @@ -930,7 +930,7 @@ func (d *DB) Compact(

iStart := base.MakeInternalKey(start, InternalKeySeqNumMax, InternalKeyKindMax)
iEnd := base.MakeInternalKey(end, 0, 0)
meta := []*fileMetadata{&fileMetadata{Smallest: iStart, Largest: iEnd}}
meta := []*fileMetadata{{Smallest: iStart, Largest: iEnd}}

d.mu.Lock()
maxLevelWithFiles := 1
Expand Down
86 changes: 85 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -409,7 +410,7 @@ func TestIterator(t *testing.T) {
equal := DefaultComparer.Equal
split := func(a []byte) int { return len(a) }
// NB: Use a mergingIter to filter entries newer than seqNum.
iter := newMergingIter(nil /* logger */, cmp, &fakeIter{
iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{
lower: opts.GetLowerBound(),
upper: opts.GetUpperBound(),
keys: keys,
Expand Down Expand Up @@ -694,3 +695,86 @@ func BenchmarkIteratorPrev(b *testing.B) {
iter.Prev()
}
}

// BenchmarkIteratorSeqSeekPrefixGENotFound exercises the case of SeekPrefixGE
// specifying monotonic keys all of which precede actual keys present in L6 of
// the DB. Moreover, with-tombstone=true exercises the sub-case where those
// actual keys are deleted using a range tombstone that has not physically
// deleted those keys due to the presence of a snapshot that needs to see
// those keys. This sub-case needs to be efficient in (a) avoiding iteration
// over all those deleted keys, including repeated iteration, (b) using the
// next optimization, since the seeks are monotonic.
func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) {
const blockSize = 32 << 10
const restartInterval = 16
const levelCount = 5
const keyOffset = 100000
readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, keyOffset, false)
readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, 1, keyOffset, true)
// We will not be seeking to the keys that were written but instead to
// keys before the written keys. This is to validate that the optimization
// to use Next still functions when mergingIter checks for the prefix
// match, and that mergingIter can avoid iterating over all the keys
// deleted by a range tombstone when there is no possibility of matching
// the prefix.
var keys [][]byte
for i := 0; i < keyOffset; i++ {
keys = append(keys, []byte(fmt.Sprintf("%08d", i)))
}
for _, skip := range []int{1, 2, 4} {
for _, withTombstone := range []bool{false, true} {
b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone),
func(b *testing.B) {
readers := readers
levelSlices := levelSlices
if withTombstone {
readers = readersWithTombstone
levelSlices = levelSlicesWithTombstone
}
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
equal: DefaultComparer.Equal,
split: func(a []byte) int { return len(a) },
merge: DefaultMerger.Merge,
iter: m,
}
pos := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
// When withTombstone=true, and prior to the
// optimization to stop early due to a range
// tombstone, the iteration would continue into the
// next file, and not be able to use Next at the lower
// level in the next SeekPrefixGE call. So we would
// incur the cost of iterating over all the deleted
// keys for every seek. Note that it is not possible
// to do a noop optimization in Iterator for the
// prefix case, unlike SeekGE/SeekLT, since we don't
// know if the iterators inside mergingIter are all
// appropriately positioned -- some may not be due to
// bloom filters not matching.
valid := iter.SeekPrefixGE(keys[pos])
if valid {
b.Fatalf("key should not be found")
}
pos += skip
if pos >= keyOffset {
pos = 0
}
}
b.StopTimer()
iter.Close()
})
}
}
for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} {
for i := range r {
for j := range r[i] {
r[i][j].Close()
}
}
}
}
29 changes: 25 additions & 4 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type mergingIterLevel struct {
// heap and range-del iterator positioning).
type mergingIter struct {
logger Logger
split Split
dir int
snapshot uint64
levels []mergingIterLevel
Expand All @@ -231,23 +232,29 @@ var _ base.InternalIterator = (*mergingIter)(nil)

// newMergingIter returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
// in strictly increasing key order, as defined by cmp. It is permissible to
// pass a nil split parameter if the caller is never going to call
// SeekPrefixGE.
//
// The input's key ranges may overlap, but there are assumed to be no duplicate
// keys: if iters[i] contains a key k then iters[j] will not contain that key k.
//
// None of the iters may be nil.
func newMergingIter(logger Logger, cmp Compare, iters ...internalIterator) *mergingIter {
func newMergingIter(
logger Logger, cmp Compare, split Split, iters ...internalIterator,
) *mergingIter {
m := &mergingIter{}
levels := make([]mergingIterLevel, len(iters))
for i := range levels {
levels[i].iter = iters[i]
}
m.init(&IterOptions{logger: logger}, cmp, levels...)
m.init(&IterOptions{logger: logger}, cmp, split, levels...)
return m
}

func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIterLevel) {
func (m *mergingIter) init(
opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel,
) {
m.err = nil // clear cached iteration error
m.logger = opts.getLogger()
if opts != nil {
Expand All @@ -257,6 +264,7 @@ func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIter
m.snapshot = InternalKeySeqNumMax
m.levels = levels
m.heap.cmp = cmp
m.split = split
if cap(m.heap.items) < len(levels) {
m.heap.items = make([]mergingIterItem, 0, len(levels))
} else {
Expand Down Expand Up @@ -611,6 +619,19 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) {
for m.heap.len() > 0 && m.err == nil {
item := &m.heap.items[0]
if m.isNextEntryDeleted(item) {
// For prefix iteration, stop if we are past the prefix. We could
// amortize the cost of this comparison, by doing it only after we
// have iterated in this for loop a few times. But unless we find
// a performance benefit to that, we do the simple thing and
// compare each time. Note that isNextEntryDeleted already did at
// least 4 key comparisons in order to return true, and
// additionally at least one heap comparison to step to the next
// entry.
if m.prefix != nil {
if n := m.split(item.key.UserKey); !bytes.Equal(m.prefix, item.key.UserKey[:n]) {
return nil, nil
}
}
continue
}
if item.key.Visible(m.snapshot) &&
Expand Down
Loading