Skip to content

Commit

Permalink
db: mergingIter.SeekPrefixGE stops early if prefix cannot match
Browse files Browse the repository at this point in the history
This is motivated by the CockroachDB issues discussed
in cockroachdb#1070 where
range tombstones in L6 cause the iterator to go through
all the deleted data. The situation is even worse in that
each successive SeekPrefixGE in a batch request (which is
in sorted order) will typically have to start all over again
since the file at which the seek finally ended its work is
probably later than the file which contains the relevant key.
Note that CockroachDB does not set bounds when using
SeekPrefixGE because the assumption is that the underlying
Pebble implementation can imply bounds, and we don't want
to change this behavior in CockroachDB. Since CockroachDB
primarily uses range tombstones when deleting CockroachDB
ranges, the SeekPrefixGE calls that were slow were probably
on a preceding CockroachDB range, hence stopping early,
as done in this PR, should fix the issue. If range tombstones
were used within the CockroachDB range that is being read
using SeekPrefixGE, the seek could land on a deleted key
and will need to iterate through all its MVCC versions.
Even in that case, the work is bounded by the number of
deleted versions of a single MVCC key.

The code stops early in prefix iteration mode, both for
SeekPrefixGE and Next.

BenchmarkIteratorSeqSeekPrefixGENotFound demonstrates the
existing problem when with-tombstone=true.

name                                                            old time/op    new time/op    delta
IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=false-16     446ns ± 0%     433ns ± 0%    -2.91%
IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=true-16     10.3ms ± 0%     0.0ms ± 0%   -99.99%
IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=false-16     416ns ± 0%     429ns ± 0%    +3.12%
IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=true-16     10.6ms ± 0%     0.0ms ± 0%   -99.99%
IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=false-16     414ns ± 0%     437ns ± 0%    +5.56%
IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=true-16     10.5ms ± 0%     0.0ms ± 0%   -99.99%
MergingIterSeqSeekPrefixGE/skip=1/use-next=false-16               1.65µs ± 0%    1.75µs ± 0%    +5.75%
MergingIterSeqSeekPrefixGE/skip=1/use-next=true-16                 463ns ± 0%     459ns ± 0%    -0.86%
MergingIterSeqSeekPrefixGE/skip=2/use-next=false-16               1.61µs ± 0%    1.67µs ± 0%    +3.73%
MergingIterSeqSeekPrefixGE/skip=2/use-next=true-16                 476ns ± 0%     475ns ± 0%    -0.21%
MergingIterSeqSeekPrefixGE/skip=4/use-next=false-16               1.62µs ± 0%    1.77µs ± 0%    +9.26%
MergingIterSeqSeekPrefixGE/skip=4/use-next=true-16                 513ns ± 0%     525ns ± 0%    +2.34%
MergingIterSeqSeekPrefixGE/skip=8/use-next=false-16               1.71µs ± 0%    1.84µs ± 0%    +7.65%
MergingIterSeqSeekPrefixGE/skip=8/use-next=true-16                1.10µs ± 0%    1.16µs ± 0%    +5.27%
MergingIterSeqSeekPrefixGE/skip=16/use-next=false-16              1.80µs ± 0%    1.86µs ± 0%    +2.99%
MergingIterSeqSeekPrefixGE/skip=16/use-next=true-16               1.34µs ± 0%    1.20µs ± 0%   -10.23%

name                                                            old alloc/op   new alloc/op   delta
IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=false-16     0.00B          0.00B          0.00%
IteratorSeqSeekPrefixGENotFound/skip=1/with-tombstone=true-16       300B ± 0%        0B       -100.00%
IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=false-16     0.00B          0.00B          0.00%
IteratorSeqSeekPrefixGENotFound/skip=2/with-tombstone=true-16       300B ± 0%        0B       -100.00%
IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=false-16     0.00B          0.00B          0.00%
IteratorSeqSeekPrefixGENotFound/skip=4/with-tombstone=true-16       292B ± 0%        0B       -100.00%
MergingIterSeqSeekPrefixGE/skip=1/use-next=false-16                0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=1/use-next=true-16                 0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=2/use-next=false-16                0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=2/use-next=true-16                 0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=4/use-next=false-16                0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=4/use-next=true-16                 0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=8/use-next=false-16                0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=8/use-next=true-16                 0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=16/use-next=false-16               0.00B          0.00B          0.00%
MergingIterSeqSeekPrefixGE/skip=16/use-next=true-16                0.00B          0.00B          0.00%

Informs cockroachdb#1070
  • Loading branch information
sumeerbhola committed Mar 8, 2021
1 parent 3c37882 commit 3045305
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 29 deletions.
6 changes: 3 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
f := c.flushing[0]
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
return newMergingIter(c.logger, c.cmp, iter, rangeDelIter), nil
return newMergingIter(c.logger, c.cmp, nil, iter, rangeDelIter), nil
}
return iter, nil
}
Expand All @@ -943,7 +943,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
iters = append(iters, rangeDelIter)
}
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

// Check that the LSM ordering invariants are ok in order to prevent
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
if err != nil {
return nil, err
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

func (c *compaction) String() string {
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator {
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}

buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...)
buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
return dbi
Expand Down
86 changes: 85 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -414,7 +415,7 @@ func TestIterator(t *testing.T) {
equal := DefaultComparer.Equal
split := func(a []byte) int { return len(a) }
// NB: Use a mergingIter to filter entries newer than seqNum.
iter := newMergingIter(nil /* logger */, cmp, &fakeIter{
iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{
lower: opts.GetLowerBound(),
upper: opts.GetUpperBound(),
keys: keys,
Expand Down Expand Up @@ -988,3 +989,86 @@ func BenchmarkIteratorPrev(b *testing.B) {
iter.Prev()
}
}

// BenchmarkIteratorSeqSeekPrefixGENotFound exercises the case of SeekPrefixGE
// specifying monotonic keys all of which precede actual keys present in L6 of
// the DB. Moreover, with-tombstone=true exercises the sub-case where those
// actual keys are deleted using a range tombstone that has not physically
// deleted those keys due to the presence of a snapshot that needs to see
// those keys. This sub-case needs to be efficient in (a) avoiding iteration
// over all those deleted keys, including repeated iteration, (b) using the
// next optimization, since the seeks are monotonic.
func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) {
const blockSize = 32 << 10
const restartInterval = 16
const levelCount = 5
const keyOffset = 100000
readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, keyOffset, false)
readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, 1, keyOffset, true)
// We will not be seeking to the keys that were written but instead to
// keys before the written keys. This is to validate that the optimization
// to use Next still functions when mergingIter checks for the prefix
// match, and that mergingIter can avoid iterating over all the keys
// deleted by a range tombstone when there is no possibility of matching
// the prefix.
var keys [][]byte
for i := 0; i < keyOffset; i++ {
keys = append(keys, []byte(fmt.Sprintf("%08d", i)))
}
for _, skip := range []int{1, 2, 4} {
for _, withTombstone := range []bool{false, true} {
b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone),
func(b *testing.B) {
readers := readers
levelSlices := levelSlices
if withTombstone {
readers = readersWithTombstone
levelSlices = levelSlicesWithTombstone
}
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
equal: DefaultComparer.Equal,
split: func(a []byte) int { return len(a) },
merge: DefaultMerger.Merge,
iter: m,
}
pos := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
// When withTombstone=true, and prior to the
// optimization to stop early due to a range
// tombstone, the iteration would continue into the
// next file, and not be able to use Next at the lower
// level in the next SeekPrefixGE call. So we would
// incur the cost of iterating over all the deleted
// keys for every seek. Note that it is not possible
// to do a noop optimization in Iterator for the
// prefix case, unlike SeekGE/SeekLT, since we don't
// know if the iterators inside mergingIter are all
// appropriately positioned -- some may not be due to
// bloom filters not matching.
valid := iter.SeekPrefixGE(keys[pos])
if valid {
b.Fatalf("key should not be found")
}
pos += skip
if pos >= keyOffset {
pos = 0
}
}
b.StopTimer()
iter.Close()
})
}
}
for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} {
for i := range r {
for j := range r[i] {
r[i][j].Close()
}
}
}
}
29 changes: 25 additions & 4 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ type mergingIterLevel struct {
// heap and range-del iterator positioning).
type mergingIter struct {
logger Logger
split Split
dir int
snapshot uint64
levels []mergingIterLevel
Expand All @@ -232,23 +233,29 @@ var _ base.InternalIterator = (*mergingIter)(nil)

// newMergingIter returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
// in strictly increasing key order, as defined by cmp. It is permissible to
// pass a nil split parameter if the caller is never going to call
// SeekPrefixGE.
//
// The input's key ranges may overlap, but there are assumed to be no duplicate
// keys: if iters[i] contains a key k then iters[j] will not contain that key k.
//
// None of the iters may be nil.
func newMergingIter(logger Logger, cmp Compare, iters ...internalIterator) *mergingIter {
func newMergingIter(
logger Logger, cmp Compare, split Split, iters ...internalIterator,
) *mergingIter {
m := &mergingIter{}
levels := make([]mergingIterLevel, len(iters))
for i := range levels {
levels[i].iter = iters[i]
}
m.init(&IterOptions{logger: logger}, cmp, levels...)
m.init(&IterOptions{logger: logger}, cmp, split, levels...)
return m
}

func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIterLevel) {
func (m *mergingIter) init(
opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel,
) {
m.err = nil // clear cached iteration error
m.logger = opts.getLogger()
if opts != nil {
Expand All @@ -258,6 +265,7 @@ func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIter
m.snapshot = InternalKeySeqNumMax
m.levels = levels
m.heap.cmp = cmp
m.split = split
if cap(m.heap.items) < len(levels) {
m.heap.items = make([]mergingIterItem, 0, len(levels))
} else {
Expand Down Expand Up @@ -620,6 +628,19 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) {
break
}
if m.isNextEntryDeleted(item) {
// For prefix iteration, stop if we are past the prefix. We could
// amortize the cost of this comparison, by doing it only after we
// have iterated in this for loop a few times. But unless we find
// a performance benefit to that, we do the simple thing and
// compare each time. Note that isNextEntryDeleted already did at
// least 4 key comparisons in order to return true, and
// additionally at least one heap comparison to step to the next
// entry.
if m.prefix != nil {
if n := m.split(item.key.UserKey); !bytes.Equal(m.prefix, item.key.UserKey[:n]) {
return nil, nil
}
}
continue
}
if item.key.Visible(m.snapshot) &&
Expand Down
67 changes: 47 additions & 20 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (

func TestMergingIter(t *testing.T) {
newFunc := func(iters ...internalIterator) internalIterator {
return newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
return newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
}
testIterator(t, newFunc, func(r *rand.Rand) [][]string {
// Shuffle testKeyValuePairs into one or more splits. Each individual
Expand Down Expand Up @@ -57,7 +58,8 @@ func TestMergingIterSeek(t *testing.T) {
iters = append(iters, f)
}

iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
iter := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
defer iter.Close()
return runInternalIterCmd(d, iter)

Expand All @@ -72,19 +74,19 @@ func TestMergingIterNextPrev(t *testing.T) {
// iterators differently. This data must match the definition in
// testdata/internal_iter_next.
iterCases := [][]string{
[]string{
{
"a.SET.2:2 a.SET.1:1 b.SET.2:2 b.SET.1:1 c.SET.2:2 c.SET.1:1",
},
[]string{
{
"a.SET.2:2 b.SET.2:2 c.SET.2:2",
"a.SET.1:1 b.SET.1:1 c.SET.1:1",
},
[]string{
{
"a.SET.2:2 b.SET.2:2",
"a.SET.1:1 b.SET.1:1",
"c.SET.2:2 c.SET.1:1",
},
[]string{
{
"a.SET.2:2",
"a.SET.1:1",
"b.SET.2:2",
Expand Down Expand Up @@ -114,7 +116,8 @@ func TestMergingIterNextPrev(t *testing.T) {
}
}

iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
iter := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
defer iter.Close()
return runInternalIterCmd(d, iter)

Expand Down Expand Up @@ -251,7 +254,7 @@ func TestMergingIterCornerCases(t *testing.T) {
li.initIsSyntheticIterBoundsKey(&levelIters[i].isSyntheticIterBoundsKey)
}
miter := &mergingIter{}
miter.init(nil /* opts */, cmp, levelIters...)
miter.init(nil /* opts */, cmp, func(a []byte) int { return len(a) }, levelIters...)
defer miter.Close()
return runInternalIterCmd(d, miter, iterCmdVerboseKey)
default:
Expand Down Expand Up @@ -346,7 +349,8 @@ func BenchmarkMergingIterSeekGE(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -377,7 +381,8 @@ func BenchmarkMergingIterNext(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -411,7 +416,8 @@ func BenchmarkMergingIterPrev(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -435,11 +441,19 @@ func BenchmarkMergingIterPrev(b *testing.B) {
// only the first and last key of file 0 of the aforementioned level -- this
// simulates sparseness of data, but not necessarily of file width, in higher
// levels. File 1 in other levels is similar to File 1 in the aforementioned level
// since it is only for stepping into.
// since it is only for stepping into. If writeRangeTombstoneToLowestLevel is
// true, a range tombstone is written to the first lowest level file that
// deletes all the keys in it, and no other levels should be written.
func buildLevelsForMergingIterSeqSeek(
b *testing.B, blockSize, restartInterval, levelCount int,
b *testing.B,
blockSize, restartInterval, levelCount int,
keyOffset int,
writeRangeTombstoneToLowestLevel bool,
) ([][]*sstable.Reader, []manifest.LevelSlice, [][]byte) {
mem := vfs.NewMem()
if writeRangeTombstoneToLowestLevel && levelCount != 1 {
panic("expect to write only 1 level")
}
files := make([][]vfs.File, levelCount)
for i := range files {
for j := 0; j < 2; j++ {
Expand All @@ -463,7 +477,7 @@ func buildLevelsForMergingIterSeqSeek(
}

var keys [][]byte
var i int
i := keyOffset
const targetSize = 2 << 20
w := writers[0][0]
for ; w.EstimatedSize() < targetSize; i++ {
Expand All @@ -472,6 +486,10 @@ func buildLevelsForMergingIterSeqSeek(
ikey := base.MakeInternalKey(key, 0, InternalKeyKindSet)
w.Add(ikey, nil)
}
if writeRangeTombstoneToLowestLevel {
tombstoneKey := base.MakeInternalKey(keys[0], 1, InternalKeyKindRangeDelete)
w.Add(tombstoneKey, []byte(fmt.Sprintf("%08d", i)))
}
for j := 1; j < len(files); j++ {
for _, k := range []int{0, len(keys) - 1} {
ikey := base.MakeInternalKey(keys[k], uint64(j), InternalKeyKindSet)
Expand Down Expand Up @@ -540,7 +558,15 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
) (internalIterator, internalIterator, error) {
iter, err := readers[levelIndex][file.FileNum].NewIter(
opts.LowerBound, opts.UpperBound)
return iter, nil, err
if err != nil {
return nil, nil, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter()
if err != nil {
iter.Close()
return nil, nil, err
}
return iter, rdIter, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, levelSlices[i].Iter(), manifest.Level(level), nil)
Expand All @@ -552,7 +578,8 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
mils[level].iter = l
}
m := &mergingIter{}
m.init(nil /* logger */, DefaultComparer.Compare, mils...)
m.init(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, mils...)
return m
}

Expand All @@ -570,8 +597,8 @@ func BenchmarkMergingIterSeqSeekGEWithBounds(b *testing.B) {
for _, levelCount := range []int{5} {
b.Run(fmt.Sprintf("levelCount=%d", levelCount),
func(b *testing.B) {
readers, levelSlices, keys :=
buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount)
readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false)
m := buildMergingIter(readers, levelSlices)
keyCount := len(keys)
b.ResetTimer()
Expand All @@ -598,8 +625,8 @@ func BenchmarkMergingIterSeqSeekPrefixGE(b *testing.B) {
const blockSize = 32 << 10
const restartInterval = 16
const levelCount = 5
readers, levelSlices, keys :=
buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount)
readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false)

for _, skip := range []int{1, 2, 4, 8, 16} {
for _, useNext := range []bool{false, true} {
Expand Down
Loading

0 comments on commit 3045305

Please sign in to comment.