Skip to content

Commit

Permalink
*: Implement iterators and seqnum substitution for foreign SSTs
Browse files Browse the repository at this point in the history
When sstables written to by other stores in shared storage are
read, we need to collapse range keys/range dels/point keys within
each SST and then return keys at fixed sequence numbers reserved
for that level. This change implements that path for all sstables
that are in shared storage according to objstorage.Provider
but have a creator ID that doesn't match ours.
  • Loading branch information
itsbilal committed Apr 27, 2023
1 parent 10ab0c9 commit 64960e7
Show file tree
Hide file tree
Showing 19 changed files with 900 additions and 88 deletions.
15 changes: 15 additions & 0 deletions internal/base/seqnums.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package base

import "fmt"

// This file defines sequence numbers that are reserved for foreign keys i.e.
// internal keys coming from other Pebble instances and existing in shared
// storage, as those will "slot below" any internal keys added by our own Pebble
Expand Down Expand Up @@ -52,3 +54,16 @@ const (
// ourselves.
SeqNumStart = uint64(10)
)

// PointSeqNumForLevel returns the appropriate reserved sequence number for
// point keys in foreign sstables at the specified level.
func PointSeqNumForLevel(level int) uint64 {
switch level {
case 5:
return SeqNumL5Point
case 6:
return SeqNumL6Point
default:
panic(fmt.Sprintf("unexpected foreign sstable at level %d", level))
}
}
3 changes: 3 additions & 0 deletions internal/keyspan/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type SpanIterOptions struct {
// RangeKeyFilters can be used to avoid scanning tables and blocks in tables
// when iterating over range keys.
RangeKeyFilters []base.BlockPropertyFilter
// Level specifies the level where this sstable is being read. Must be
// specified for foreign (i.e. shared not-created-by-this-instance) sstables.
Level manifest.Level
}

// Iter is an iterator over a set of fragmented spans.
Expand Down
43 changes: 0 additions & 43 deletions internal/keyspan/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,6 @@ import (
// MergingIter implementation, but will require a bit of plumbing to thread the
// Equal function.

// Transformer defines a transformation to be applied to a Span.
type Transformer interface {
// Transform takes a Span as input and writes the transformed Span to the
// provided output *Span pointer. The output Span's Keys slice may be reused
// by Transform to reduce allocations.
Transform(cmp base.Compare, in Span, out *Span) error
}

// The TransformerFunc type is an adapter to allow the use of ordinary functions
// as Transformers. If f is a function with the appropriate signature,
// TransformerFunc(f) is a Transformer that calls f.
type TransformerFunc func(base.Compare, Span, *Span) error

// Transform calls f(cmp, in, out).
func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error {
return tf(cmp, in, out)
}

var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = append(dst.Keys[:0], s.Keys...)
return nil
})

// VisibleTransform filters keys that are invisible at the provided snapshot
// sequence number.
func VisibleTransform(snapshot uint64) Transformer {
return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = dst.Keys[:0]
for _, k := range s.Keys {
// NB: The InternalKeySeqNumMax value is used for the batch snapshot
// because a batch's visible span keys are filtered when they're
// fragmented. There's no requirement to enforce visibility at
// iteration time.
if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) {
dst.Keys = append(dst.Keys, k)
}
}
return nil
})
}

// MergingIter merges spans across levels of the LSM, exposing an iterator over
// spans that yields sets of spans fragmented at unique user key boundaries.
//
Expand Down
149 changes: 149 additions & 0 deletions internal/keyspan/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package keyspan

import "github.com/cockroachdb/pebble/internal/base"

// Transformer defines a transformation to be applied to a Span.
type Transformer interface {
// Transform takes a Span as input and writes the transformed Span to the
// provided output *Span pointer. The output Span's Keys slice may be reused
// by Transform to reduce allocations.
Transform(cmp base.Compare, in Span, out *Span) error
}

// The TransformerFunc type is an adapter to allow the use of ordinary functions
// as Transformers. If f is a function with the appropriate signature,
// TransformerFunc(f) is a Transformer that calls f.
type TransformerFunc func(base.Compare, Span, *Span) error

// Transform calls f(cmp, in, out).
func (tf TransformerFunc) Transform(cmp base.Compare, in Span, out *Span) error {
return tf(cmp, in, out)
}

var noopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = append(dst.Keys[:0], s.Keys...)
return nil
})

// VisibleTransform filters keys that are invisible at the provided snapshot
// sequence number.
func VisibleTransform(snapshot uint64) Transformer {
return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error {
dst.Start, dst.End = s.Start, s.End
dst.Keys = dst.Keys[:0]
for _, k := range s.Keys {
// NB: The InternalKeySeqNumMax value is used for the batch snapshot
// because a batch's visible span keys are filtered when they're
// fragmented. There's no requirement to enforce visibility at
// iteration time.
if base.Visible(k.SeqNum(), snapshot, base.InternalKeySeqNumMax) {
dst.Keys = append(dst.Keys, k)
}
}
return nil
})
}

// TransformerIter is a FragmentIterator that applies a Transformer on all
// returned keys. Used for when a caller needs to apply a transformer on an
// iterator but does not otherwise need the mergingiter's merging ability.
type TransformerIter struct {
FragmentIterator

// Transformer is applied on every Span returned by this iterator.
Transformer Transformer
// Comparer in use for this keyspace.
Compare base.Compare

span Span
err error
}

func (t *TransformerIter) applyTransform(span *Span) *Span {
t.span = Span{
Start: t.span.Start[:0],
End: t.span.End[:0],
Keys: t.span.Keys[:0],
}
if err := t.Transformer.Transform(t.Compare, *span, &t.span); err != nil {
t.err = err
return nil
}
return &t.span
}

// SeekGE implements the FragmentIterator interface.
func (t *TransformerIter) SeekGE(key []byte) *Span {
span := t.FragmentIterator.SeekGE(key)
if span == nil {
return nil
}
return t.applyTransform(span)
}

// SeekLT implements the FragmentIterator interface.
func (t *TransformerIter) SeekLT(key []byte) *Span {
span := t.FragmentIterator.SeekLT(key)
if span == nil {
return nil
}
return t.applyTransform(span)
}

// First implements the FragmentIterator interface.
func (t *TransformerIter) First() *Span {
span := t.FragmentIterator.First()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Last implements the FragmentIterator interface.
func (t *TransformerIter) Last() *Span {
span := t.FragmentIterator.Last()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Next implements the FragmentIterator interface.
func (t *TransformerIter) Next() *Span {
span := t.FragmentIterator.Next()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Prev implements the FragmentIterator interface.
func (t *TransformerIter) Prev() *Span {
span := t.FragmentIterator.Prev()
if span == nil {
return nil
}
return t.applyTransform(span)
}

// Error implements the FragmentIterator interface.
func (t *TransformerIter) Error() error {
if t.err != nil {
return t.err
}
return t.FragmentIterator.Error()
}

// Close implements the FragmentIterator interface.
func (t *TransformerIter) Close() error {
err := t.FragmentIterator.Close()
if err != nil {
return err
}
return t.err
}
70 changes: 70 additions & 0 deletions internal/rangekey/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,73 @@ func coalesce(
}
return nil
}

// ForeignSSTTransformer implements a keyspan.Transformer for range keys in
// foreign sstables (i.e. shared sstables not created by us). It is largely
// similar to the Transform function implemented in UserIteratorConfig in that
// it calls coalesce to remove range keys shadowed by other range keys, but also
// retains the range key that does the shadowing. In addition, it outputs range
// keys with sequence numbers that match reserved sequence numbers for that
// level (i.e. SeqNumL5RangeKeySet for L5 sets, while L6 unsets/dels are elided).
type ForeignSSTTransformer struct {
Comparer *base.Comparer
Level int
sortBuf keyspan.KeysBySuffix
}

// Transform implements the Transformer interface.
func (f *ForeignSSTTransformer) Transform(
cmp base.Compare, s keyspan.Span, dst *keyspan.Span,
) error {
// Apply shadowing of keys.
dst.Start = s.Start
dst.End = s.End
f.sortBuf = keyspan.KeysBySuffix{
Cmp: cmp,
Keys: f.sortBuf.Keys[:0],
}
if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
return err
}
keys := f.sortBuf.Keys
dst.Keys = dst.Keys[:0]
for i := range keys {
var seqNum uint64
switch keys[i].Kind() {
case base.InternalKeyKindRangeKeySet:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
switch f.Level {
case 5:
seqNum = base.SeqNumL5RangeKeySet
case 6:
seqNum = base.SeqNumL6RangeKey
}
case base.InternalKeyKindRangeKeyUnset:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
fallthrough
case base.InternalKeyKindRangeKeyDelete:
switch f.Level {
case 5:
seqNum = base.SeqNumL5RangeKeyUnsetDel
case 6:
// Skip this key, as foreign sstable in L6 do not need to emit range key
// unsets/dels as they do not apply to any other sstables.
continue
}
default:
return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind())
}
dst.Keys = append(dst.Keys, keyspan.Key{
Trailer: base.MakeTrailer(seqNum, keys[i].Kind()),
Suffix: keys[i].Suffix,
Value: keys[i].Value,
})
}
// coalesce results in dst.Keys being sorted by Suffix.
dst.KeysOrder = keyspan.BySuffixAsc
return nil
}
2 changes: 1 addition & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type tableNewIters func(
func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter {
return func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(
ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters}, internalIterOpts{})
ctx, file, &IterOptions{RangeKeyFilters: iterOptions.RangeKeyFilters, level: iterOptions.Level}, internalIterOpts{})
if iter != nil {
_ = iter.Close()
}
Expand Down
5 changes: 5 additions & 0 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ type Provider interface {
// Cannot be called if shared storage is not configured for the provider.
SetCreatorID(creatorID CreatorID) error

// IsForeign returns whether this object is owned by a different node. Return
// value undefined if creator ID is not set yet, or if this object does not
// exist in this provider.
IsForeign(meta ObjectMetadata) bool

// SharedObjectBacking encodes the shared object metadata.
SharedObjectBacking(meta *ObjectMetadata) (SharedObjectBackingHandle, error)

Expand Down
8 changes: 8 additions & 0 deletions objstorage/objstorageprovider/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
return nil
}

// IsForeign is part of the objstorage.Provider interface.
func (p *provider) IsForeign(meta objstorage.ObjectMetadata) bool {
if !p.shared.initialized.Load() {
return false
}
return meta.IsShared() && p.shared.creatorID != meta.Shared.CreatorID
}

func (p *provider) sharedCheckInitialized() error {
if p.sharedStorage() == nil {
return errors.Errorf("shared object support not configured")
Expand Down
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage/shared"
"github.com/cockroachdb/pebble/sstable"
Expand Down Expand Up @@ -222,6 +223,17 @@ func (o *IterOptions) getLogger() Logger {
return o.logger
}

// SpanIterOptions creates a SpanIterOptions from this IterOptions.
func (o *IterOptions) SpanIterOptions(level manifest.Level) *keyspan.SpanIterOptions {
if o == nil {
return &keyspan.SpanIterOptions{Level: level}
}
return &keyspan.SpanIterOptions{
RangeKeyFilters: o.RangeKeyFilters,
Level: level,
}
}

// scanInternalOptions is similar to IterOptions, meant for use with
// scanInternalIterator.
type scanInternalOptions struct {
Expand Down Expand Up @@ -1602,6 +1614,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
readerOpts.Comparer = o.Comparer
readerOpts.Filters = o.Filters
if o.Merger != nil {
readerOpts.Merge = o.Merger.Merge
readerOpts.MergerName = o.Merger.Name
}
readerOpts.LoggerAndTracer = o.LoggerAndTracer
Expand Down
Loading

0 comments on commit 64960e7

Please sign in to comment.