Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Implement iterators and seqnum substitution for foreign SSTs #2455

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func (c *compaction) newInputIter(
}
if hasRangeKeys {
li := &keyspan.LevelIter{}
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, err := newRangeKeyIter(file, iterOptions)
if iter != nil {
// Ensure that the range key iter is not closed until the compaction is
Expand Down
2 changes: 1 addition & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) i
}

func (s *ingestedFlushable) constructRangeDelIter(
file *manifest.FileMetadata, _ *keyspan.SpanIterOptions,
file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
// Note that the keyspan level iter expects a non-nil iterator to be
// returned even if there is an error. So, we return the emptyKeyspanIter.
Expand Down
15 changes: 15 additions & 0 deletions internal/base/seqnums.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package base

import "fmt"

// This file defines sequence numbers that are reserved for foreign keys i.e.
// internal keys coming from other Pebble instances and existing in shared
// storage, as those will "slot below" any internal keys added by our own Pebble
Expand Down Expand Up @@ -52,3 +54,16 @@ const (
// ourselves.
SeqNumStart = uint64(10)
)

// PointSeqNumForLevel returns the appropriate reserved sequence number for
// point keys in foreign sstables at the specified level.
func PointSeqNumForLevel(level int) uint64 {
switch level {
case 5:
return SeqNumL5Point
case 6:
return SeqNumL6Point
default:
panic(fmt.Sprintf("unexpected foreign sstable at level %d", level))
}
}
5 changes: 4 additions & 1 deletion internal/keyspan/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ type FragmentIterator interface {

// TableNewSpanIter creates a new iterator for range key spans for the given
// file.
type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error)
type TableNewSpanIter func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error)

// SpanIterOptions is a subset of IterOptions that are necessary to instantiate
// per-sstable span iterators.
type SpanIterOptions struct {
// RangeKeyFilters can be used to avoid scanning tables and blocks in tables
// when iterating over range keys.
RangeKeyFilters []base.BlockPropertyFilter
// Level specifies the level where this sstable is being read. Must be
// specified for foreign (i.e. shared not-created-by-this-instance) sstables.
Level manifest.Level
}

// Iter is an iterator over a set of fragmented spans.
Expand Down
2 changes: 1 addition & 1 deletion internal/keyspan/level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
return noFileLoaded
}
if indicator != fileAlreadyLoaded {
l.iter, l.err = l.newIter(file, &l.tableOpts)
l.iter, l.err = l.newIter(file, l.tableOpts)
indicator = newFileLoaded
}
if l.err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestLevelIterEquivalence(t *testing.T) {
metas = append(metas, meta)
}

tableNewIters := func(file *manifest.FileMetadata, iterOptions *SpanIterOptions) (FragmentIterator, error) {
tableNewIters := func(file *manifest.FileMetadata, iterOptions SpanIterOptions) (FragmentIterator, error) {
return NewIter(base.DefaultComparer.Compare, tc.levels[j][file.FileNum-1]), nil
}
// Add all the fileMetadatas to L6.
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestLevelIter(t *testing.T) {
}
if iter == nil {
var lastFileNum base.FileNum
tableNewIters := func(file *manifest.FileMetadata, _ *SpanIterOptions) (FragmentIterator, error) {
tableNewIters := func(file *manifest.FileMetadata, _ SpanIterOptions) (FragmentIterator, error) {
keyType := keyType
spans := level[file.FileNum-1]
if keyType == manifest.KeyTypePoint {
Expand Down
43 changes: 0 additions & 43 deletions internal/keyspan/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,6 @@ import (
// MergingIter implementation, but will require a bit of plumbing to thread the
// Equal function.

// Transformer defines a transformation to be applied to a Span.
type Transformer interface {
// Transform takes a Span as input and writes the transformed Span to the
// provided output *Span pointer. The output Span's Keys slice may be reused
// by Transform to reduce allocations.
Transform(cmp base.Compare, in Span, out *Span) error
}

// The TransformerFunc type is an adapter to allow the use of ordinary functions
// as Transformers. If f is a function with the appropriate signature,
// TransformerFunc(f) is a Transformer that calls f.
type TransformerFunc func(base.Compare, Span, *Span) error

// Transform calls f(cmp, in, out).
func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error {
return tf(cmp, in, out)
}

var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = append(dst.Keys[:0], s.Keys...)
return nil
})

// VisibleTransform filters keys that are invisible at the provided snapshot
// sequence number.
func VisibleTransform(snapshot uint64) Transformer {
return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = dst.Keys[:0]
for _, k := range s.Keys {
// NB: The InternalKeySeqNumMax value is used for the batch snapshot
// because a batch's visible span keys are filtered when they're
// fragmented. There's no requirement to enforce visibility at
// iteration time.
if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) {
dst.Keys = append(dst.Keys, k)
}
}
return nil
})
}

// MergingIter merges spans across levels of the LSM, exposing an iterator over
// spans that yields sets of spans fragmented at unique user key boundaries.
//
Expand Down
149 changes: 149 additions & 0 deletions internal/keyspan/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package keyspan

import "github.com/cockroachdb/pebble/internal/base"

// Transformer defines a transformation to be applied to a Span.
type Transformer interface {
// Transform takes a Span as input and writes the transformed Span to the
// provided output *Span pointer. The output Span's Keys slice may be reused
// by Transform to reduce allocations.
Transform(cmp base.Compare, in Span, out *Span) error
}

// The TransformerFunc type is an adapter to allow the use of ordinary functions
// as Transformers. If f is a function with the appropriate signature,
// TransformerFunc(f) is a Transformer that calls f.
type TransformerFunc func(base.Compare, Span, *Span) error

// Transform calls f(cmp, in, out).
func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error {
return tf(cmp, in, out)
}

var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = append(dst.Keys[:0], s.Keys...)
return nil
})

// VisibleTransform filters keys that are invisible at the provided snapshot
// sequence number.
func VisibleTransform(snapshot uint64) Transformer {
return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = dst.Keys[:0]
for _, k := range s.Keys {
// NB: The InternalKeySeqNumMax value is used for the batch snapshot
// because a batch's visible span keys are filtered when they're
// fragmented. There's no requirement to enforce visibility at
// iteration time.
if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) {
dst.Keys = append(dst.Keys, k)
}
}
return nil
})
}

// TransformerIter is a FragmentIterator that applies a Transformer on all
// returned keys. Used for when a caller needs to apply a transformer on an
// iterator but does not otherwise need the mergingiter's merging ability.
type TransformerIter struct {
FragmentIterator

// Transformer is applied on every Span returned by this iterator.
Transformer Transformer
// Comparer in use for this keyspace.
Compare base.Compare

span Span
err error
}

func (t *TransformerIter) applyTransform(span *Span) *Span {
t.span = Span{
Start: t.span.Start[:0],
End: t.span.End[:0],
Keys: t.span.Keys[:0],
}
if err := t.Transformer.Transform(t.Compare, *span, &t.span); err != nil {
t.err = err
return nil
}
return &t.span
}

// SeekGE implements the FragmentIterator interface.
func (t *TransformerIter) SeekGE(key []byte) *Span {
span := t.FragmentIterator.SeekGE(key)
if span == nil {
return nil
}
return t.applyTransform(span)
}

// SeekLT implements the FragmentIterator interface.
func (t *TransformerIter) SeekLT(key []byte) *Span {
span := t.FragmentIterator.SeekLT(key)
if span == nil {
return nil
}
return t.applyTransform(span)
}

// First implements the FragmentIterator interface.
func (t *TransformerIter) First() *Span {
span := t.FragmentIterator.First()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Last implements the FragmentIterator interface.
func (t *TransformerIter) Last() *Span {
span := t.FragmentIterator.Last()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Next implements the FragmentIterator interface.
func (t *TransformerIter) Next() *Span {
span := t.FragmentIterator.Next()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Prev implements the FragmentIterator interface.
func (t *TransformerIter) Prev() *Span {
span := t.FragmentIterator.Prev()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Error implements the FragmentIterator interface.
func (t *TransformerIter) Error() error {
if t.err != nil {
return t.err
}
return t.FragmentIterator.Error()
}

// Close implements the FragmentIterator interface.
func (t *TransformerIter) Close() error {
err := t.FragmentIterator.Close()
if err != nil {
return err
}
return t.err
}
70 changes: 70 additions & 0 deletions internal/rangekey/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,73 @@ func coalesce(
}
return nil
}

// ForeignSSTTransformer implements a keyspan.Transformer for range keys in
// foreign sstables (i.e. shared sstables not created by us). It is largely
// similar to the Transform function implemented in UserIteratorConfig in that
// it calls coalesce to remove range keys shadowed by other range keys, but also
// retains the range key that does the shadowing. In addition, it outputs range
// keys with sequence numbers that match reserved sequence numbers for that
// level (i.e. SeqNumL5RangeKeySet for L5 sets, while L6 unsets/dels are elided).
type ForeignSSTTransformer struct {
Comparer *base.Comparer
Level int
sortBuf keyspan.KeysBySuffix
}

// Transform implements the Transformer interface.
func (f *ForeignSSTTransformer) Transform(
cmp base.Compare, s keyspan.Span, dst *keyspan.Span,
) error {
// Apply shadowing of keys.
dst.Start = s.Start
dst.End = s.End
f.sortBuf = keyspan.KeysBySuffix{
Cmp: cmp,
Keys: f.sortBuf.Keys[:0],
}
if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
return err
}
keys := f.sortBuf.Keys
dst.Keys = dst.Keys[:0]
for i := range keys {
var seqNum uint64
switch keys[i].Kind() {
case base.InternalKeyKindRangeKeySet:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
switch f.Level {
case 5:
seqNum = base.SeqNumL5RangeKeySet
case 6:
seqNum = base.SeqNumL6RangeKey
}
case base.InternalKeyKindRangeKeyUnset:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
fallthrough
case base.InternalKeyKindRangeKeyDelete:
switch f.Level {
case 5:
seqNum = base.SeqNumL5RangeKeyUnsetDel
case 6:
// Skip this key, as foreign sstable in L6 do not need to emit range key
// unsets/dels as they do not apply to any other sstables.
continue
}
default:
return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind())
}
dst.Keys = append(dst.Keys, keyspan.Key{
Trailer: base.MakeTrailer(seqNum, keys[i].Kind()),
Suffix: keys[i].Suffix,
Value: keys[i].Value,
})
}
// coalesce results in dst.Keys being sorted by Suffix.
dst.KeysOrder = keyspan.BySuffixAsc
return nil
}
2 changes: 1 addition & 1 deletion level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func checkRangeTombstones(c *checkConfig) error {
atomicUnit, _ := expandToAtomicUnit(c.cmp, lf.Slice(), true /* disableIsCompacting */)
lower, upper := manifest.KeyRange(c.cmp, atomicUnit.Iter())
iterToClose, iter, err := c.newIters(
context.Background(), lf.FileMetadata, nil, internalIterOpts{})
context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type tableNewIters func(
// tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
// for the rangedel iterator returned by tableNewIters.
func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter {
return func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(
ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters}, internalIterOpts{})
ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters, level: iterOptions.Level}, internalIterOpts{})
if iter != nil {
_ = iter.Close()
}
Expand Down
5 changes: 5 additions & 0 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ type Provider interface {
// Cannot be called if shared storage is not configured for the provider.
SetCreatorID(creatorID CreatorID) error

// IsForeign returns whether this object is owned by a different node. Return
// value undefined if creator ID is not set yet, or if this object does not
// exist in this provider.
IsForeign(meta ObjectMetadata) bool

// SharedObjectBacking encodes the shared object metadata.
SharedObjectBacking(meta *ObjectMetadata) (SharedObjectBackingHandle, error)

Expand Down
Loading