From 304530571d032c7192438facb5875bbfe0aad1c4 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 5 Mar 2021 17:53:05 -0500 Subject: [PATCH] db: mergingIter.SeekPrefixGE stops early if prefix cannot match MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is motivated by the CockroachDB issues discussed in https://github.com/cockroachdb/pebble/issues/1070 where range tombstones in L6 cause the iterator to go through all the deleted data. The situation is even worse in that each successive SeekPrefixGE in a batch request (which is in sorted order) will typically have to start all over again since the file at which the seek finally ended its work is probably later than the file which contains the relevant key. Note that CockroachDB does not set bounds when using SeekPrefixGE because the assumption is that the underlying Pebble implementation can imply bounds, and we don't want to change this behavior in CockroachDB. Since CockroachDB primarily uses range tombstones when deleting CockroachDB ranges, the SeekPrefixGE calls that were slow were probably on a preceding CockroachDB range, hence stopping early, as done in this PR, should fix the issue. If range tombstones were used within the CockroachDB range that is being read using SeekPrefixGE, the seek could land on a deleted key and will need to iterate through all its MVCC versions. Even in that case, the work is bounded by the number of deleted versions of a single MVCC key. The code stops early in prefix iteration mode, both for SeekPrefixGE and Next. BenchmarkIteratorSeqSeekPrefixGENotFound demonstrates the existing problem when with-tombstone=true. name old time/op new time/op delta IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=false-16 446ns ± 0% 433ns ± 0% -2.91% IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=true-16 10.3ms ± 0% 0.0ms ± 0% -99.99% IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=false-16 416ns ± 0% 429ns ± 0% +3.12% IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=true-16 10.6ms ± 0% 0.0ms ± 0% -99.99% IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=false-16 414ns ± 0% 437ns ± 0% +5.56% IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=true-16 10.5ms ± 0% 0.0ms ± 0% -99.99% MergingIterSeqSeekPrefixGE/skip=1/use-next=false-16 1.65µs ± 0% 1.75µs ± 0% +5.75% MergingIterSeqSeekPrefixGE/skip=1/use-next=true-16 463ns ± 0% 459ns ± 0% -0.86% MergingIterSeqSeekPrefixGE/skip=2/use-next=false-16 1.61µs ± 0% 1.67µs ± 0% +3.73% MergingIterSeqSeekPrefixGE/skip=2/use-next=true-16 476ns ± 0% 475ns ± 0% -0.21% MergingIterSeqSeekPrefixGE/skip=4/use-next=false-16 1.62µs ± 0% 1.77µs ± 0% +9.26% MergingIterSeqSeekPrefixGE/skip=4/use-next=true-16 513ns ± 0% 525ns ± 0% +2.34% MergingIterSeqSeekPrefixGE/skip=8/use-next=false-16 1.71µs ± 0% 1.84µs ± 0% +7.65% MergingIterSeqSeekPrefixGE/skip=8/use-next=true-16 1.10µs ± 0% 1.16µs ± 0% +5.27% MergingIterSeqSeekPrefixGE/skip=16/use-next=false-16 1.80µs ± 0% 1.86µs ± 0% +2.99% MergingIterSeqSeekPrefixGE/skip=16/use-next=true-16 1.34µs ± 0% 1.20µs ± 0% -10.23% name old alloc/op new alloc/op delta IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=false-16 0.00B 0.00B 0.00% IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=true-16 300B ± 0% 0B -100.00% IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=false-16 0.00B 0.00B 0.00% IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=true-16 300B ± 0% 0B -100.00% IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=false-16 0.00B 0.00B 0.00% IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=true-16 292B ± 0% 0B -100.00% MergingIterSeqSeekPrefixGE/skip=1/use-next=false-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=1/use-next=true-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=2/use-next=false-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=2/use-next=true-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=4/use-next=false-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=4/use-next=true-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=8/use-next=false-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=8/use-next=true-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=16/use-next=false-16 0.00B 0.00B 0.00% MergingIterSeqSeekPrefixGE/skip=16/use-next=true-16 0.00B 0.00B 0.00% Informs #1070 --- compaction.go | 6 +-- db.go | 2 +- iterator_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++- merging_iter.go | 29 +++++++++++++-- merging_iter_test.go | 67 +++++++++++++++++++++++---------- testdata/merging_iter | 50 +++++++++++++++++++++++++ 6 files changed, 211 insertions(+), 29 deletions(-) diff --git a/compaction.go b/compaction.go index eed6382441..f37e889bc1 100644 --- a/compaction.go +++ b/compaction.go @@ -930,7 +930,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r f := c.flushing[0] iter := f.newFlushIter(nil, &c.bytesIterated) if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil { - return newMergingIter(c.logger, c.cmp, iter, rangeDelIter), nil + return newMergingIter(c.logger, c.cmp, nil, iter, rangeDelIter), nil } return iter, nil } @@ -943,7 +943,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r iters = append(iters, rangeDelIter) } } - return newMergingIter(c.logger, c.cmp, iters...), nil + return newMergingIter(c.logger, c.cmp, nil, iters...), nil } // Check that the LSM ordering invariants are ok in order to prevent @@ -1096,7 +1096,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r if err != nil { return nil, err } - return newMergingIter(c.logger, c.cmp, iters...), nil + return newMergingIter(c.logger, c.cmp, nil, iters...), nil } func (c *compaction) String() string { diff --git a/db.go b/db.go index 2ca8de4176..0e46aa5458 100644 --- a/db.go +++ b/db.go @@ -817,7 +817,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator { addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } - buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...) + buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, finalMLevels...) buf.merging.snapshot = seqNum buf.merging.elideRangeTombstones = true return dbi diff --git a/iterator_test.go b/iterator_test.go index a738f3e1bf..bfe766fa8c 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -414,7 +415,7 @@ func TestIterator(t *testing.T) { equal := DefaultComparer.Equal split := func(a []byte) int { return len(a) } // NB: Use a mergingIter to filter entries newer than seqNum. - iter := newMergingIter(nil /* logger */, cmp, &fakeIter{ + iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{ lower: opts.GetLowerBound(), upper: opts.GetUpperBound(), keys: keys, @@ -988,3 +989,86 @@ func BenchmarkIteratorPrev(b *testing.B) { iter.Prev() } } + +// BenchmarkIteratorSeqSeekPrefixGENotFound exercises the case of SeekPrefixGE +// specifying monotonic keys all of which precede actual keys present in L6 of +// the DB. Moreover, with-tombstone=true exercises the sub-case where those +// actual keys are deleted using a range tombstone that has not physically +// deleted those keys due to the presence of a snapshot that needs to see +// those keys. This sub-case needs to be efficient in (a) avoiding iteration +// over all those deleted keys, including repeated iteration, (b) using the +// next optimization, since the seeks are monotonic. +func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) { + const blockSize = 32 << 10 + const restartInterval = 16 + const levelCount = 5 + const keyOffset = 100000 + readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, keyOffset, false) + readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, 1, keyOffset, true) + // We will not be seeking to the keys that were written but instead to + // keys before the written keys. This is to validate that the optimization + // to use Next still functions when mergingIter checks for the prefix + // match, and that mergingIter can avoid iterating over all the keys + // deleted by a range tombstone when there is no possibility of matching + // the prefix. + var keys [][]byte + for i := 0; i < keyOffset; i++ { + keys = append(keys, []byte(fmt.Sprintf("%08d", i))) + } + for _, skip := range []int{1, 2, 4} { + for _, withTombstone := range []bool{false, true} { + b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone), + func(b *testing.B) { + readers := readers + levelSlices := levelSlices + if withTombstone { + readers = readersWithTombstone + levelSlices = levelSlicesWithTombstone + } + m := buildMergingIter(readers, levelSlices) + iter := Iterator{ + cmp: DefaultComparer.Compare, + equal: DefaultComparer.Equal, + split: func(a []byte) int { return len(a) }, + merge: DefaultMerger.Merge, + iter: m, + } + pos := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + // When withTombstone=true, and prior to the + // optimization to stop early due to a range + // tombstone, the iteration would continue into the + // next file, and not be able to use Next at the lower + // level in the next SeekPrefixGE call. So we would + // incur the cost of iterating over all the deleted + // keys for every seek. Note that it is not possible + // to do a noop optimization in Iterator for the + // prefix case, unlike SeekGE/SeekLT, since we don't + // know if the iterators inside mergingIter are all + // appropriately positioned -- some may not be due to + // bloom filters not matching. + valid := iter.SeekPrefixGE(keys[pos]) + if valid { + b.Fatalf("key should not be found") + } + pos += skip + if pos >= keyOffset { + pos = 0 + } + } + b.StopTimer() + iter.Close() + }) + } + } + for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} { + for i := range r { + for j := range r[i] { + r[i][j].Close() + } + } + } +} diff --git a/merging_iter.go b/merging_iter.go index b2716909c3..8389f871b7 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -212,6 +212,7 @@ type mergingIterLevel struct { // heap and range-del iterator positioning). type mergingIter struct { logger Logger + split Split dir int snapshot uint64 levels []mergingIterLevel @@ -232,23 +233,29 @@ var _ base.InternalIterator = (*mergingIter)(nil) // newMergingIter returns an iterator that merges its input. Walking the // resultant iterator will return all key/value pairs of all input iterators -// in strictly increasing key order, as defined by cmp. +// in strictly increasing key order, as defined by cmp. It is permissible to +// pass a nil split parameter if the caller is never going to call +// SeekPrefixGE. // // The input's key ranges may overlap, but there are assumed to be no duplicate // keys: if iters[i] contains a key k then iters[j] will not contain that key k. // // None of the iters may be nil. -func newMergingIter(logger Logger, cmp Compare, iters ...internalIterator) *mergingIter { +func newMergingIter( + logger Logger, cmp Compare, split Split, iters ...internalIterator, +) *mergingIter { m := &mergingIter{} levels := make([]mergingIterLevel, len(iters)) for i := range levels { levels[i].iter = iters[i] } - m.init(&IterOptions{logger: logger}, cmp, levels...) + m.init(&IterOptions{logger: logger}, cmp, split, levels...) return m } -func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIterLevel) { +func (m *mergingIter) init( + opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel, +) { m.err = nil // clear cached iteration error m.logger = opts.getLogger() if opts != nil { @@ -258,6 +265,7 @@ func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIter m.snapshot = InternalKeySeqNumMax m.levels = levels m.heap.cmp = cmp + m.split = split if cap(m.heap.items) < len(levels) { m.heap.items = make([]mergingIterItem, 0, len(levels)) } else { @@ -620,6 +628,19 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) { break } if m.isNextEntryDeleted(item) { + // For prefix iteration, stop if we are past the prefix. We could + // amortize the cost of this comparison, by doing it only after we + // have iterated in this for loop a few times. But unless we find + // a performance benefit to that, we do the simple thing and + // compare each time. Note that isNextEntryDeleted already did at + // least 4 key comparisons in order to return true, and + // additionally at least one heap comparison to step to the next + // entry. + if m.prefix != nil { + if n := m.split(item.key.UserKey); !bytes.Equal(m.prefix, item.key.UserKey[:n]) { + return nil, nil + } + } continue } if item.key.Visible(m.snapshot) && diff --git a/merging_iter_test.go b/merging_iter_test.go index 38d674a44a..37272cac7d 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -22,7 +22,8 @@ import ( func TestMergingIter(t *testing.T) { newFunc := func(iters ...internalIterator) internalIterator { - return newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + return newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) } testIterator(t, newFunc, func(r *rand.Rand) [][]string { // Shuffle testKeyValuePairs into one or more splits. Each individual @@ -57,7 +58,8 @@ func TestMergingIterSeek(t *testing.T) { iters = append(iters, f) } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -72,19 +74,19 @@ func TestMergingIterNextPrev(t *testing.T) { // iterators differently. This data must match the definition in // testdata/internal_iter_next. iterCases := [][]string{ - []string{ + { "a.SET.2:2 a.SET.1:1 b.SET.2:2 b.SET.1:1 c.SET.2:2 c.SET.1:1", }, - []string{ + { "a.SET.2:2 b.SET.2:2 c.SET.2:2", "a.SET.1:1 b.SET.1:1 c.SET.1:1", }, - []string{ + { "a.SET.2:2 b.SET.2:2", "a.SET.1:1 b.SET.1:1", "c.SET.2:2 c.SET.1:1", }, - []string{ + { "a.SET.2:2", "a.SET.1:1", "b.SET.2:2", @@ -114,7 +116,8 @@ func TestMergingIterNextPrev(t *testing.T) { } } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -251,7 +254,7 @@ func TestMergingIterCornerCases(t *testing.T) { li.initIsSyntheticIterBoundsKey(&levelIters[i].isSyntheticIterBoundsKey) } miter := &mergingIter{} - miter.init(nil /* opts */, cmp, levelIters...) + miter.init(nil /* opts */, cmp, func(a []byte) int { return len(a) }, levelIters...) defer miter.Close() return runInternalIterCmd(d, miter, iterCmdVerboseKey) default: @@ -346,7 +349,8 @@ func BenchmarkMergingIterSeekGE(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -377,7 +381,8 @@ func BenchmarkMergingIterNext(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -411,7 +416,8 @@ func BenchmarkMergingIterPrev(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -435,11 +441,19 @@ func BenchmarkMergingIterPrev(b *testing.B) { // only the first and last key of file 0 of the aforementioned level -- this // simulates sparseness of data, but not necessarily of file width, in higher // levels. File 1 in other levels is similar to File 1 in the aforementioned level -// since it is only for stepping into. +// since it is only for stepping into. If writeRangeTombstoneToLowestLevel is +// true, a range tombstone is written to the first lowest level file that +// deletes all the keys in it, and no other levels should be written. func buildLevelsForMergingIterSeqSeek( - b *testing.B, blockSize, restartInterval, levelCount int, + b *testing.B, + blockSize, restartInterval, levelCount int, + keyOffset int, + writeRangeTombstoneToLowestLevel bool, ) ([][]*sstable.Reader, []manifest.LevelSlice, [][]byte) { mem := vfs.NewMem() + if writeRangeTombstoneToLowestLevel && levelCount != 1 { + panic("expect to write only 1 level") + } files := make([][]vfs.File, levelCount) for i := range files { for j := 0; j < 2; j++ { @@ -463,7 +477,7 @@ func buildLevelsForMergingIterSeqSeek( } var keys [][]byte - var i int + i := keyOffset const targetSize = 2 << 20 w := writers[0][0] for ; w.EstimatedSize() < targetSize; i++ { @@ -472,6 +486,10 @@ func buildLevelsForMergingIterSeqSeek( ikey := base.MakeInternalKey(key, 0, InternalKeyKindSet) w.Add(ikey, nil) } + if writeRangeTombstoneToLowestLevel { + tombstoneKey := base.MakeInternalKey(keys[0], 1, InternalKeyKindRangeDelete) + w.Add(tombstoneKey, []byte(fmt.Sprintf("%08d", i))) + } for j := 1; j < len(files); j++ { for _, k := range []int{0, len(keys) - 1} { ikey := base.MakeInternalKey(keys[k], uint64(j), InternalKeyKindSet) @@ -540,7 +558,15 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS ) (internalIterator, internalIterator, error) { iter, err := readers[levelIndex][file.FileNum].NewIter( opts.LowerBound, opts.UpperBound) - return iter, nil, err + if err != nil { + return nil, nil, err + } + rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter() + if err != nil { + iter.Close() + return nil, nil, err + } + return iter, rdIter, err } l := newLevelIter(IterOptions{}, DefaultComparer.Compare, newIters, levelSlices[i].Iter(), manifest.Level(level), nil) @@ -552,7 +578,8 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS mils[level].iter = l } m := &mergingIter{} - m.init(nil /* logger */, DefaultComparer.Compare, mils...) + m.init(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, mils...) return m } @@ -570,8 +597,8 @@ func BenchmarkMergingIterSeqSeekGEWithBounds(b *testing.B) { for _, levelCount := range []int{5} { b.Run(fmt.Sprintf("levelCount=%d", levelCount), func(b *testing.B) { - readers, levelSlices, keys := - buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount) + readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) m := buildMergingIter(readers, levelSlices) keyCount := len(keys) b.ResetTimer() @@ -598,8 +625,8 @@ func BenchmarkMergingIterSeqSeekPrefixGE(b *testing.B) { const blockSize = 32 << 10 const restartInterval = 16 const levelCount = 5 - readers, levelSlices, keys := - buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount) + readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) for _, skip := range []int{1, 2, 4, 8, 16} { for _, useNext := range []bool{false, true} { diff --git a/testdata/merging_iter b/testdata/merging_iter index 69464998ea..3486b165ef 100644 --- a/testdata/merging_iter +++ b/testdata/merging_iter @@ -502,3 +502,53 @@ next ---- iwoeionch#792,1:792 . + +# Exercise the early stopping behavior for prefix iteration when encountering +# range deletion tombstones. Keys a, d are not deleted, while the rest are. +define +L +a.SET.10 d.SET.10 +a.SET.10:a10 b.SET.10:b10 c.SET.10:c10 d.SET.10:d10 b.RANGEDEL.12:d +---- +1: + 000025:[a#10,SET-d#10,SET] + +iter +first +next +next +---- +a#10,1:a10 +d#10,1:d10 +. + +# The seek to c finds d since iteration cannot stop at c as it matches the +# prefix, and when it steps to d, it finds d is not deleted. Note that +# mergingIter is an InternalIterator and does not need to guarantee prefix +# match -- that is job of the higher-level Iterator. So "seek-prefix-ge c" is +# allowed to return d. +iter +seek-prefix-ge a false +seek-prefix-ge aa true +seek-prefix-ge b true +seek-prefix-ge c true +seek-prefix-ge d true +---- +a#10,1:a10 +. +. +d#10,1:d10 +d#10,1:d10 + +iter +seek-prefix-ge a false +next +seek-prefix-ge b false +seek-prefix-ge d true +next +---- +a#10,1:a10 +. +. +d#10,1:d10 +.