diff --git a/compaction.go b/compaction.go index eed6382441..f37e889bc1 100644 --- a/compaction.go +++ b/compaction.go @@ -930,7 +930,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r f := c.flushing[0] iter := f.newFlushIter(nil, &c.bytesIterated) if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil { - return newMergingIter(c.logger, c.cmp, iter, rangeDelIter), nil + return newMergingIter(c.logger, c.cmp, nil, iter, rangeDelIter), nil } return iter, nil } @@ -943,7 +943,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r iters = append(iters, rangeDelIter) } } - return newMergingIter(c.logger, c.cmp, iters...), nil + return newMergingIter(c.logger, c.cmp, nil, iters...), nil } // Check that the LSM ordering invariants are ok in order to prevent @@ -1096,7 +1096,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r if err != nil { return nil, err } - return newMergingIter(c.logger, c.cmp, iters...), nil + return newMergingIter(c.logger, c.cmp, nil, iters...), nil } func (c *compaction) String() string { diff --git a/db.go b/db.go index 2ca8de4176..0e46aa5458 100644 --- a/db.go +++ b/db.go @@ -817,7 +817,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator { addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) } - buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...) + buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, finalMLevels...) buf.merging.snapshot = seqNum buf.merging.elideRangeTombstones = true return dbi diff --git a/iterator_test.go b/iterator_test.go index a738f3e1bf..bfe766fa8c 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -414,7 +415,7 @@ func TestIterator(t *testing.T) { equal := DefaultComparer.Equal split := func(a []byte) int { return len(a) } // NB: Use a mergingIter to filter entries newer than seqNum. - iter := newMergingIter(nil /* logger */, cmp, &fakeIter{ + iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{ lower: opts.GetLowerBound(), upper: opts.GetUpperBound(), keys: keys, @@ -988,3 +989,86 @@ func BenchmarkIteratorPrev(b *testing.B) { iter.Prev() } } + +// BenchmarkIteratorSeqSeekPrefixGENotFound exercises the case of SeekPrefixGE +// specifying monotonic keys all of which precede actual keys present in L6 of +// the DB. Moreover, with-tombstone=true exercises the sub-case where those +// actual keys are deleted using a range tombstone that has not physically +// deleted those keys due to the presence of a snapshot that needs to see +// those keys. This sub-case needs to be efficient in (a) avoiding iteration +// over all those deleted keys, including repeated iteration, (b) using the +// next optimization, since the seeks are monotonic. +func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) { + const blockSize = 32 << 10 + const restartInterval = 16 + const levelCount = 5 + const keyOffset = 100000 + readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, keyOffset, false) + readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, 1, keyOffset, true) + // We will not be seeking to the keys that were written but instead to + // keys before the written keys. This is to validate that the optimization + // to use Next still functions when mergingIter checks for the prefix + // match, and that mergingIter can avoid iterating over all the keys + // deleted by a range tombstone when there is no possibility of matching + // the prefix. + var keys [][]byte + for i := 0; i < keyOffset; i++ { + keys = append(keys, []byte(fmt.Sprintf("%08d", i))) + } + for _, skip := range []int{1, 2, 4} { + for _, withTombstone := range []bool{false, true} { + b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone), + func(b *testing.B) { + readers := readers + levelSlices := levelSlices + if withTombstone { + readers = readersWithTombstone + levelSlices = levelSlicesWithTombstone + } + m := buildMergingIter(readers, levelSlices) + iter := Iterator{ + cmp: DefaultComparer.Compare, + equal: DefaultComparer.Equal, + split: func(a []byte) int { return len(a) }, + merge: DefaultMerger.Merge, + iter: m, + } + pos := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + // When withTombstone=true, and prior to the + // optimization to stop early due to a range + // tombstone, the iteration would continue into the + // next file, and not be able to use Next at the lower + // level in the next SeekPrefixGE call. So we would + // incur the cost of iterating over all the deleted + // keys for every seek. Note that it is not possible + // to do a noop optimization in Iterator for the + // prefix case, unlike SeekGE/SeekLT, since we don't + // know if the iterators inside mergingIter are all + // appropriately positioned -- some may not be due to + // bloom filters not matching. + valid := iter.SeekPrefixGE(keys[pos]) + if valid { + b.Fatalf("key should not be found") + } + pos += skip + if pos >= keyOffset { + pos = 0 + } + } + b.StopTimer() + iter.Close() + }) + } + } + for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} { + for i := range r { + for j := range r[i] { + r[i][j].Close() + } + } + } +} diff --git a/merging_iter.go b/merging_iter.go index b2716909c3..8389f871b7 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -212,6 +212,7 @@ type mergingIterLevel struct { // heap and range-del iterator positioning). type mergingIter struct { logger Logger + split Split dir int snapshot uint64 levels []mergingIterLevel @@ -232,23 +233,29 @@ var _ base.InternalIterator = (*mergingIter)(nil) // newMergingIter returns an iterator that merges its input. Walking the // resultant iterator will return all key/value pairs of all input iterators -// in strictly increasing key order, as defined by cmp. +// in strictly increasing key order, as defined by cmp. It is permissible to +// pass a nil split parameter if the caller is never going to call +// SeekPrefixGE. // // The input's key ranges may overlap, but there are assumed to be no duplicate // keys: if iters[i] contains a key k then iters[j] will not contain that key k. // // None of the iters may be nil. -func newMergingIter(logger Logger, cmp Compare, iters ...internalIterator) *mergingIter { +func newMergingIter( + logger Logger, cmp Compare, split Split, iters ...internalIterator, +) *mergingIter { m := &mergingIter{} levels := make([]mergingIterLevel, len(iters)) for i := range levels { levels[i].iter = iters[i] } - m.init(&IterOptions{logger: logger}, cmp, levels...) + m.init(&IterOptions{logger: logger}, cmp, split, levels...) return m } -func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIterLevel) { +func (m *mergingIter) init( + opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel, +) { m.err = nil // clear cached iteration error m.logger = opts.getLogger() if opts != nil { @@ -258,6 +265,7 @@ func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIter m.snapshot = InternalKeySeqNumMax m.levels = levels m.heap.cmp = cmp + m.split = split if cap(m.heap.items) < len(levels) { m.heap.items = make([]mergingIterItem, 0, len(levels)) } else { @@ -620,6 +628,19 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) { break } if m.isNextEntryDeleted(item) { + // For prefix iteration, stop if we are past the prefix. We could + // amortize the cost of this comparison, by doing it only after we + // have iterated in this for loop a few times. But unless we find + // a performance benefit to that, we do the simple thing and + // compare each time. Note that isNextEntryDeleted already did at + // least 4 key comparisons in order to return true, and + // additionally at least one heap comparison to step to the next + // entry. + if m.prefix != nil { + if n := m.split(item.key.UserKey); !bytes.Equal(m.prefix, item.key.UserKey[:n]) { + return nil, nil + } + } continue } if item.key.Visible(m.snapshot) && diff --git a/merging_iter_test.go b/merging_iter_test.go index 38d674a44a..37272cac7d 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -22,7 +22,8 @@ import ( func TestMergingIter(t *testing.T) { newFunc := func(iters ...internalIterator) internalIterator { - return newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + return newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) } testIterator(t, newFunc, func(r *rand.Rand) [][]string { // Shuffle testKeyValuePairs into one or more splits. Each individual @@ -57,7 +58,8 @@ func TestMergingIterSeek(t *testing.T) { iters = append(iters, f) } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -72,19 +74,19 @@ func TestMergingIterNextPrev(t *testing.T) { // iterators differently. This data must match the definition in // testdata/internal_iter_next. iterCases := [][]string{ - []string{ + { "a.SET.2:2 a.SET.1:1 b.SET.2:2 b.SET.1:1 c.SET.2:2 c.SET.1:1", }, - []string{ + { "a.SET.2:2 b.SET.2:2 c.SET.2:2", "a.SET.1:1 b.SET.1:1 c.SET.1:1", }, - []string{ + { "a.SET.2:2 b.SET.2:2", "a.SET.1:1 b.SET.1:1", "c.SET.2:2 c.SET.1:1", }, - []string{ + { "a.SET.2:2", "a.SET.1:1", "b.SET.2:2", @@ -114,7 +116,8 @@ func TestMergingIterNextPrev(t *testing.T) { } } - iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) defer iter.Close() return runInternalIterCmd(d, iter) @@ -251,7 +254,7 @@ func TestMergingIterCornerCases(t *testing.T) { li.initIsSyntheticIterBoundsKey(&levelIters[i].isSyntheticIterBoundsKey) } miter := &mergingIter{} - miter.init(nil /* opts */, cmp, levelIters...) + miter.init(nil /* opts */, cmp, func(a []byte) int { return len(a) }, levelIters...) defer miter.Close() return runInternalIterCmd(d, miter, iterCmdVerboseKey) default: @@ -346,7 +349,8 @@ func BenchmarkMergingIterSeekGE(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -377,7 +381,8 @@ func BenchmarkMergingIterNext(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -411,7 +416,8 @@ func BenchmarkMergingIterPrev(b *testing.B) { iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */) require.NoError(b, err) } - m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...) + m := newMergingIter(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, iters...) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -435,11 +441,19 @@ func BenchmarkMergingIterPrev(b *testing.B) { // only the first and last key of file 0 of the aforementioned level -- this // simulates sparseness of data, but not necessarily of file width, in higher // levels. File 1 in other levels is similar to File 1 in the aforementioned level -// since it is only for stepping into. +// since it is only for stepping into. If writeRangeTombstoneToLowestLevel is +// true, a range tombstone is written to the first lowest level file that +// deletes all the keys in it, and no other levels should be written. func buildLevelsForMergingIterSeqSeek( - b *testing.B, blockSize, restartInterval, levelCount int, + b *testing.B, + blockSize, restartInterval, levelCount int, + keyOffset int, + writeRangeTombstoneToLowestLevel bool, ) ([][]*sstable.Reader, []manifest.LevelSlice, [][]byte) { mem := vfs.NewMem() + if writeRangeTombstoneToLowestLevel && levelCount != 1 { + panic("expect to write only 1 level") + } files := make([][]vfs.File, levelCount) for i := range files { for j := 0; j < 2; j++ { @@ -463,7 +477,7 @@ func buildLevelsForMergingIterSeqSeek( } var keys [][]byte - var i int + i := keyOffset const targetSize = 2 << 20 w := writers[0][0] for ; w.EstimatedSize() < targetSize; i++ { @@ -472,6 +486,10 @@ func buildLevelsForMergingIterSeqSeek( ikey := base.MakeInternalKey(key, 0, InternalKeyKindSet) w.Add(ikey, nil) } + if writeRangeTombstoneToLowestLevel { + tombstoneKey := base.MakeInternalKey(keys[0], 1, InternalKeyKindRangeDelete) + w.Add(tombstoneKey, []byte(fmt.Sprintf("%08d", i))) + } for j := 1; j < len(files); j++ { for _, k := range []int{0, len(keys) - 1} { ikey := base.MakeInternalKey(keys[k], uint64(j), InternalKeyKindSet) @@ -540,7 +558,15 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS ) (internalIterator, internalIterator, error) { iter, err := readers[levelIndex][file.FileNum].NewIter( opts.LowerBound, opts.UpperBound) - return iter, nil, err + if err != nil { + return nil, nil, err + } + rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter() + if err != nil { + iter.Close() + return nil, nil, err + } + return iter, rdIter, err } l := newLevelIter(IterOptions{}, DefaultComparer.Compare, newIters, levelSlices[i].Iter(), manifest.Level(level), nil) @@ -552,7 +578,8 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS mils[level].iter = l } m := &mergingIter{} - m.init(nil /* logger */, DefaultComparer.Compare, mils...) + m.init(nil /* logger */, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, mils...) return m } @@ -570,8 +597,8 @@ func BenchmarkMergingIterSeqSeekGEWithBounds(b *testing.B) { for _, levelCount := range []int{5} { b.Run(fmt.Sprintf("levelCount=%d", levelCount), func(b *testing.B) { - readers, levelSlices, keys := - buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount) + readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) m := buildMergingIter(readers, levelSlices) keyCount := len(keys) b.ResetTimer() @@ -598,8 +625,8 @@ func BenchmarkMergingIterSeqSeekPrefixGE(b *testing.B) { const blockSize = 32 << 10 const restartInterval = 16 const levelCount = 5 - readers, levelSlices, keys := - buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount) + readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) for _, skip := range []int{1, 2, 4, 8, 16} { for _, useNext := range []bool{false, true} { diff --git a/testdata/merging_iter b/testdata/merging_iter index 69464998ea..3486b165ef 100644 --- a/testdata/merging_iter +++ b/testdata/merging_iter @@ -502,3 +502,53 @@ next ---- iwoeionch#792,1:792 . + +# Exercise the early stopping behavior for prefix iteration when encountering +# range deletion tombstones. Keys a, d are not deleted, while the rest are. +define +L +a.SET.10 d.SET.10 +a.SET.10:a10 b.SET.10:b10 c.SET.10:c10 d.SET.10:d10 b.RANGEDEL.12:d +---- +1: + 000025:[a#10,SET-d#10,SET] + +iter +first +next +next +---- +a#10,1:a10 +d#10,1:d10 +. + +# The seek to c finds d since iteration cannot stop at c as it matches the +# prefix, and when it steps to d, it finds d is not deleted. Note that +# mergingIter is an InternalIterator and does not need to guarantee prefix +# match -- that is job of the higher-level Iterator. So "seek-prefix-ge c" is +# allowed to return d. +iter +seek-prefix-ge a false +seek-prefix-ge aa true +seek-prefix-ge b true +seek-prefix-ge c true +seek-prefix-ge d true +---- +a#10,1:a10 +. +. +d#10,1:d10 +d#10,1:d10 + +iter +seek-prefix-ge a false +next +seek-prefix-ge b false +seek-prefix-ge d true +next +---- +a#10,1:a10 +. +. +d#10,1:d10 +.