Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: mergingIter.SeekPrefixGE stops early if prefix cannot match #1085

Merged
merged 1 commit into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
f := c.flushing[0]
iter := f.newFlushIter(nil, &c.bytesIterated)
if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
return newMergingIter(c.logger, c.cmp, iter, rangeDelIter), nil
return newMergingIter(c.logger, c.cmp, nil, iter, rangeDelIter), nil
}
return iter, nil
}
Expand All @@ -943,7 +943,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
iters = append(iters, rangeDelIter)
}
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

// Check that the LSM ordering invariants are ok in order to prevent
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
if err != nil {
return nil, err
}
return newMergingIter(c.logger, c.cmp, iters...), nil
return newMergingIter(c.logger, c.cmp, nil, iters...), nil
}

func (c *compaction) String() string {
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator {
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}

buf.merging.init(&dbi.opts, dbi.cmp, finalMLevels...)
buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
return dbi
Expand Down
86 changes: 85 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -414,7 +415,7 @@ func TestIterator(t *testing.T) {
equal := DefaultComparer.Equal
split := func(a []byte) int { return len(a) }
// NB: Use a mergingIter to filter entries newer than seqNum.
iter := newMergingIter(nil /* logger */, cmp, &fakeIter{
iter := newMergingIter(nil /* logger */, cmp, split, &fakeIter{
lower: opts.GetLowerBound(),
upper: opts.GetUpperBound(),
keys: keys,
Expand Down Expand Up @@ -988,3 +989,86 @@ func BenchmarkIteratorPrev(b *testing.B) {
iter.Prev()
}
}

// BenchmarkIteratorSeqSeekPrefixGENotFound exercises the case of SeekPrefixGE
// specifying monotonic keys all of which precede actual keys present in L6 of
// the DB. Moreover, with-tombstone=true exercises the sub-case where those
// actual keys are deleted using a range tombstone that has not physically
// deleted those keys due to the presence of a snapshot that needs to see
// those keys. This sub-case needs to be efficient in (a) avoiding iteration
// over all those deleted keys, including repeated iteration, (b) using the
// next optimization, since the seeks are monotonic.
func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) {
const blockSize = 32 << 10
const restartInterval = 16
const levelCount = 5
const keyOffset = 100000
readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, keyOffset, false)
readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, 1, keyOffset, true)
// We will not be seeking to the keys that were written but instead to
// keys before the written keys. This is to validate that the optimization
// to use Next still functions when mergingIter checks for the prefix
// match, and that mergingIter can avoid iterating over all the keys
// deleted by a range tombstone when there is no possibility of matching
// the prefix.
var keys [][]byte
for i := 0; i < keyOffset; i++ {
keys = append(keys, []byte(fmt.Sprintf("%08d", i)))
}
for _, skip := range []int{1, 2, 4} {
for _, withTombstone := range []bool{false, true} {
b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone),
func(b *testing.B) {
readers := readers
levelSlices := levelSlices
if withTombstone {
readers = readersWithTombstone
levelSlices = levelSlicesWithTombstone
}
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
equal: DefaultComparer.Equal,
split: func(a []byte) int { return len(a) },
merge: DefaultMerger.Merge,
iter: m,
}
pos := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
// When withTombstone=true, and prior to the
// optimization to stop early due to a range
// tombstone, the iteration would continue into the
// next file, and not be able to use Next at the lower
// level in the next SeekPrefixGE call. So we would
// incur the cost of iterating over all the deleted
// keys for every seek. Note that it is not possible
// to do a noop optimization in Iterator for the
// prefix case, unlike SeekGE/SeekLT, since we don't
// know if the iterators inside mergingIter are all
// appropriately positioned -- some may not be due to
// bloom filters not matching.
valid := iter.SeekPrefixGE(keys[pos])
if valid {
b.Fatalf("key should not be found")
}
pos += skip
if pos >= keyOffset {
pos = 0
}
}
b.StopTimer()
iter.Close()
})
}
}
for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} {
for i := range r {
for j := range r[i] {
r[i][j].Close()
}
}
}
}
29 changes: 25 additions & 4 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ type mergingIterLevel struct {
// heap and range-del iterator positioning).
type mergingIter struct {
logger Logger
split Split
dir int
snapshot uint64
levels []mergingIterLevel
Expand All @@ -232,23 +233,29 @@ var _ base.InternalIterator = (*mergingIter)(nil)

// newMergingIter returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
// in strictly increasing key order, as defined by cmp. It is permissible to
// pass a nil split parameter if the caller is never going to call
// SeekPrefixGE.
//
// The input's key ranges may overlap, but there are assumed to be no duplicate
// keys: if iters[i] contains a key k then iters[j] will not contain that key k.
//
// None of the iters may be nil.
func newMergingIter(logger Logger, cmp Compare, iters ...internalIterator) *mergingIter {
func newMergingIter(
logger Logger, cmp Compare, split Split, iters ...internalIterator,
) *mergingIter {
m := &mergingIter{}
levels := make([]mergingIterLevel, len(iters))
for i := range levels {
levels[i].iter = iters[i]
}
m.init(&IterOptions{logger: logger}, cmp, levels...)
m.init(&IterOptions{logger: logger}, cmp, split, levels...)
return m
}

func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIterLevel) {
func (m *mergingIter) init(
opts *IterOptions, cmp Compare, split Split, levels ...mergingIterLevel,
) {
m.err = nil // clear cached iteration error
m.logger = opts.getLogger()
if opts != nil {
Expand All @@ -258,6 +265,7 @@ func (m *mergingIter) init(opts *IterOptions, cmp Compare, levels ...mergingIter
m.snapshot = InternalKeySeqNumMax
m.levels = levels
m.heap.cmp = cmp
m.split = split
if cap(m.heap.items) < len(levels) {
m.heap.items = make([]mergingIterItem, 0, len(levels))
} else {
Expand Down Expand Up @@ -620,6 +628,19 @@ func (m *mergingIter) findNextEntry() (*InternalKey, []byte) {
break
}
if m.isNextEntryDeleted(item) {
// For prefix iteration, stop if we are past the prefix. We could
// amortize the cost of this comparison, by doing it only after we
// have iterated in this for loop a few times. But unless we find
// a performance benefit to that, we do the simple thing and
// compare each time. Note that isNextEntryDeleted already did at
// least 4 key comparisons in order to return true, and
// additionally at least one heap comparison to step to the next
// entry.
if m.prefix != nil {
if n := m.split(item.key.UserKey); !bytes.Equal(m.prefix, item.key.UserKey[:n]) {
return nil, nil
}
}
continue
}
if item.key.Visible(m.snapshot) &&
Expand Down
67 changes: 47 additions & 20 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (

func TestMergingIter(t *testing.T) {
newFunc := func(iters ...internalIterator) internalIterator {
return newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
return newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
}
testIterator(t, newFunc, func(r *rand.Rand) [][]string {
// Shuffle testKeyValuePairs into one or more splits. Each individual
Expand Down Expand Up @@ -57,7 +58,8 @@ func TestMergingIterSeek(t *testing.T) {
iters = append(iters, f)
}

iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
iter := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
defer iter.Close()
return runInternalIterCmd(d, iter)

Expand All @@ -72,19 +74,19 @@ func TestMergingIterNextPrev(t *testing.T) {
// iterators differently. This data must match the definition in
// testdata/internal_iter_next.
iterCases := [][]string{
[]string{
{
"a.SET.2:2 a.SET.1:1 b.SET.2:2 b.SET.1:1 c.SET.2:2 c.SET.1:1",
},
[]string{
{
"a.SET.2:2 b.SET.2:2 c.SET.2:2",
"a.SET.1:1 b.SET.1:1 c.SET.1:1",
},
[]string{
{
"a.SET.2:2 b.SET.2:2",
"a.SET.1:1 b.SET.1:1",
"c.SET.2:2 c.SET.1:1",
},
[]string{
{
"a.SET.2:2",
"a.SET.1:1",
"b.SET.2:2",
Expand Down Expand Up @@ -114,7 +116,8 @@ func TestMergingIterNextPrev(t *testing.T) {
}
}

iter := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
iter := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
defer iter.Close()
return runInternalIterCmd(d, iter)

Expand Down Expand Up @@ -251,7 +254,7 @@ func TestMergingIterCornerCases(t *testing.T) {
li.initIsSyntheticIterBoundsKey(&levelIters[i].isSyntheticIterBoundsKey)
}
miter := &mergingIter{}
miter.init(nil /* opts */, cmp, levelIters...)
miter.init(nil /* opts */, cmp, func(a []byte) int { return len(a) }, levelIters...)
defer miter.Close()
return runInternalIterCmd(d, miter, iterCmdVerboseKey)
default:
Expand Down Expand Up @@ -346,7 +349,8 @@ func BenchmarkMergingIterSeekGE(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -377,7 +381,8 @@ func BenchmarkMergingIterNext(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -411,7 +416,8 @@ func BenchmarkMergingIterPrev(b *testing.B) {
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
m := newMergingIter(nil /* logger */, DefaultComparer.Compare, iters...)
m := newMergingIter(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, iters...)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -435,11 +441,19 @@ func BenchmarkMergingIterPrev(b *testing.B) {
// only the first and last key of file 0 of the aforementioned level -- this
// simulates sparseness of data, but not necessarily of file width, in higher
// levels. File 1 in other levels is similar to File 1 in the aforementioned level
// since it is only for stepping into.
// since it is only for stepping into. If writeRangeTombstoneToLowestLevel is
// true, a range tombstone is written to the first lowest level file that
// deletes all the keys in it, and no other levels should be written.
func buildLevelsForMergingIterSeqSeek(
b *testing.B, blockSize, restartInterval, levelCount int,
b *testing.B,
blockSize, restartInterval, levelCount int,
keyOffset int,
writeRangeTombstoneToLowestLevel bool,
) ([][]*sstable.Reader, []manifest.LevelSlice, [][]byte) {
mem := vfs.NewMem()
if writeRangeTombstoneToLowestLevel && levelCount != 1 {
panic("expect to write only 1 level")
}
files := make([][]vfs.File, levelCount)
for i := range files {
for j := 0; j < 2; j++ {
Expand All @@ -463,7 +477,7 @@ func buildLevelsForMergingIterSeqSeek(
}

var keys [][]byte
var i int
i := keyOffset
const targetSize = 2 << 20
w := writers[0][0]
for ; w.EstimatedSize() < targetSize; i++ {
Expand All @@ -472,6 +486,10 @@ func buildLevelsForMergingIterSeqSeek(
ikey := base.MakeInternalKey(key, 0, InternalKeyKindSet)
w.Add(ikey, nil)
}
if writeRangeTombstoneToLowestLevel {
tombstoneKey := base.MakeInternalKey(keys[0], 1, InternalKeyKindRangeDelete)
w.Add(tombstoneKey, []byte(fmt.Sprintf("%08d", i)))
}
for j := 1; j < len(files); j++ {
for _, k := range []int{0, len(keys) - 1} {
ikey := base.MakeInternalKey(keys[k], uint64(j), InternalKeyKindSet)
Expand Down Expand Up @@ -540,7 +558,15 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
) (internalIterator, internalIterator, error) {
iter, err := readers[levelIndex][file.FileNum].NewIter(
opts.LowerBound, opts.UpperBound)
return iter, nil, err
if err != nil {
return nil, nil, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter()
if err != nil {
iter.Close()
return nil, nil, err
}
return iter, rdIter, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, levelSlices[i].Iter(), manifest.Level(level), nil)
Expand All @@ -552,7 +578,8 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
mils[level].iter = l
}
m := &mergingIter{}
m.init(nil /* logger */, DefaultComparer.Compare, mils...)
m.init(nil /* logger */, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, mils...)
return m
}

Expand All @@ -570,8 +597,8 @@ func BenchmarkMergingIterSeqSeekGEWithBounds(b *testing.B) {
for _, levelCount := range []int{5} {
b.Run(fmt.Sprintf("levelCount=%d", levelCount),
func(b *testing.B) {
readers, levelSlices, keys :=
buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount)
readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false)
m := buildMergingIter(readers, levelSlices)
keyCount := len(keys)
b.ResetTimer()
Expand All @@ -598,8 +625,8 @@ func BenchmarkMergingIterSeqSeekPrefixGE(b *testing.B) {
const blockSize = 32 << 10
const restartInterval = 16
const levelCount = 5
readers, levelSlices, keys :=
buildLevelsForMergingIterSeqSeek(b, blockSize, restartInterval, levelCount)
readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false)

for _, skip := range []int{1, 2, 4, 8, 16} {
for _, useNext := range []bool{false, true} {
Expand Down
Loading