Skip to content

Commit

Permalink
manifest: support ve encoding/decoding for virtual sstables
Browse files Browse the repository at this point in the history
We introduce three new tags for virtual sstable encoding/decoding.

tagCreatedBackingTable, tagRemovedBackingTable are used to encode
VersionEdit.CreatedBackingTables and VersionEdit.RemovedBackingTables.
For CreatedBackingTables, we just need to encode the DiskFileNum and
the Size, and for RemovedBackingTable, we just need to encode the
DiskFileNum.

We introduce a customTag customTagVirtual, to encode whether the
FileMetadata belongs to a virtual sstable. We only use this tag for
virtual sstables. If this tag is absent, then the FileMetadata belongs
to a physical sstable. This tag is also used to encode
FileMetadata.FileBacking.DiskFileNum for virtual sstables.

Note that the Decode function doesn't have the information necessary
to populate the FileBacking for a virtual sstable. We populate this
information in BulkVersionEdit.Accumulate. This is similar to the
FileMetadata of a deleted file getting populated in
BulkVersionEdit.Accumulate.
  • Loading branch information
bananabrick committed Jun 1, 2023
1 parent 1880f18 commit ad14b30
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 17 deletions.
107 changes: 95 additions & 12 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ const (
tagMaxColumnFamily = 203

// Pebble tags.
tagNewFile5 = 104 // Range keys.
tagNewFile5 = 104 // Range keys.
tagCreatedBackingTable = 105
tagRemovedBackingTable = 106

// The custom tags sub-format used by tagNewFile4 and above.
customTagTerminate = 1
customTagNeedsCompaction = 2
customTagCreationTime = 6
customTagPathID = 65
customTagNonSafeIgnoreMask = 1 << 6
customTagVirtual = 66
)

// DeletedFileEntry holds the state for a file deletion from a level. The file
Expand All @@ -73,6 +76,9 @@ type DeletedFileEntry struct {
type NewFileEntry struct {
Level int
Meta *FileMetadata
// BackingFileNum is only set during manifest replay, and only for virtual
// sstables.
BackingFileNum base.DiskFileNum
}

// VersionEdit holds the state for an edit to a Version along with other
Expand Down Expand Up @@ -143,7 +149,9 @@ type VersionEdit struct {

// Decode decodes an edit from the specified reader.
//
// TODO(bananabrick): Support decoding of virtual sstable state.
// Note that the Decode step will not set the FileBacking for virtual sstables
// and the responsibility is left to the caller. However, the Decode step will
// populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
func (v *VersionEdit) Decode(r io.Reader) error {
br, ok := r.(byteReader)
if !ok {
Expand Down Expand Up @@ -196,6 +204,28 @@ func (v *VersionEdit) Decode(r io.Reader) error {
}
// NB: RocksDB does not use compaction pointers anymore.

case tagRemovedBackingTable:
n, err := d.readUvarint()
if err != nil {
return err
}
v.RemovedBackingTables = append(
v.RemovedBackingTables, base.FileNum(n).DiskFileNum(),
)
case tagCreatedBackingTable:
dfn, err := d.readUvarint()
if err != nil {
return err
}
size, err := d.readUvarint()
if err != nil {
return err
}
fileBacking := &FileBacking{
DiskFileNum: base.FileNum(dfn).DiskFileNum(),
Size: size,
}
v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
case tagDeletedFile:
level, err := d.readLevel()
if err != nil {
Expand Down Expand Up @@ -301,6 +331,10 @@ func (v *VersionEdit) Decode(r io.Reader) error {
}
var markedForCompaction bool
var creationTime uint64
virtualState := struct {
virtual bool
backingFileNum uint64
}{}
if tag == tagNewFile4 || tag == tagNewFile5 {
for {
customTag, err := d.readUvarint()
Expand All @@ -309,7 +343,16 @@ func (v *VersionEdit) Decode(r io.Reader) error {
}
if customTag == customTagTerminate {
break
} else if customTag == customTagVirtual {
virtualState.virtual = true
n, err := d.readUvarint()
if err != nil {
return err
}
virtualState.backingFileNum = n
continue
}

field, err := d.readBytes()
if err != nil {
return err
Expand Down Expand Up @@ -345,6 +388,7 @@ func (v *VersionEdit) Decode(r io.Reader) error {
SmallestSeqNum: smallestSeqNum,
LargestSeqNum: largestSeqNum,
MarkedForCompaction: markedForCompaction,
Virtual: virtualState.virtual,
}
if tag != tagNewFile5 { // no range keys present
m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
Expand Down Expand Up @@ -376,11 +420,18 @@ func (v *VersionEdit) Decode(r io.Reader) error {
}
}
m.boundsSet = true
m.InitPhysicalBacking()
v.NewFiles = append(v.NewFiles, NewFileEntry{
if !virtualState.virtual {
m.InitPhysicalBacking()
}

nfe := NewFileEntry{
Level: level,
Meta: m,
})
}
if virtualState.virtual {
nfe.BackingFileNum = base.FileNum(virtualState.backingFileNum).DiskFileNum()
}
v.NewFiles = append(v.NewFiles, nfe)

case tagPrevLogNumber:
n, err := d.readUvarint()
Expand Down Expand Up @@ -442,8 +493,6 @@ func (v *VersionEdit) String() string {
}

// Encode encodes an edit to the specified writer.
//
// TODO(bananabrick): Support encoding of virtual sstable state.
func (v *VersionEdit) Encode(w io.Writer) error {
e := versionEditEncoder{new(bytes.Buffer)}

Expand All @@ -463,6 +512,15 @@ func (v *VersionEdit) Encode(w io.Writer) error {
e.writeUvarint(tagNextFileNumber)
e.writeUvarint(uint64(v.NextFileNum))
}
for _, dfn := range v.RemovedBackingTables {
e.writeUvarint(tagRemovedBackingTable)
e.writeUvarint(uint64(dfn.FileNum()))
}
for _, fileBacking := range v.CreatedBackingTables {
e.writeUvarint(tagCreatedBackingTable)
e.writeUvarint(uint64(fileBacking.DiskFileNum.FileNum()))
e.writeUvarint(fileBacking.Size)
}
// RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
// even though its value is zero. We detect this by encoding LastSeqNum when
// ComparerName is set.
Expand All @@ -476,7 +534,7 @@ func (v *VersionEdit) Encode(w io.Writer) error {
e.writeUvarint(uint64(x.FileNum))
}
for _, x := range v.NewFiles {
customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0
customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
var tag uint64
switch {
case x.Meta.HasRangeKeys:
Expand Down Expand Up @@ -529,13 +587,18 @@ func (v *VersionEdit) Encode(w io.Writer) error {
e.writeUvarint(customTagNeedsCompaction)
e.writeBytes([]byte{1})
}
if x.Meta.Virtual {
e.writeUvarint(customTagVirtual)
e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum.FileNum()))
}
e.writeUvarint(customTagTerminate)
}
}
_, err := w.Write(e.Bytes())
return err
}

// versionEditDecoder should be used to decode version edits.
type versionEditDecoder struct {
byteReader
}
Expand Down Expand Up @@ -634,7 +697,9 @@ type BulkVersionEdit struct {
Added [NumLevels]map[base.FileNum]*FileMetadata
Deleted [NumLevels]map[base.FileNum]*FileMetadata

AddedFileBacking []*FileBacking
// AddedFileBacking is a map to support lookup so that we can populate the
// FileBacking of virtual sstables during manifest replay.
AddedFileBacking map[base.DiskFileNum]*FileBacking
RemovedFileBacking []base.DiskFileNum

// AddedByFileNum maps file number to file metadata for all added files
Expand Down Expand Up @@ -697,6 +762,16 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
}
}

// Generate state for Added backing files. Note that these must be generated
// before we loop through the NewFiles, because we need to populate the
// FileBackings which might be used by the NewFiles loop.
if b.AddedFileBacking == nil {
b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
}
for _, fb := range ve.CreatedBackingTables {
b.AddedFileBacking[fb.DiskFileNum] = fb
}

for _, nf := range ve.NewFiles {
// A new file should not have been deleted in this or a preceding
// VersionEdit at the same level (though files can move across levels).
Expand All @@ -705,6 +780,17 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
}
}
if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
// FileBacking for a virtual sstable must only be nil if we're performing
// manifest replay.
nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
if nf.Meta.FileBacking == nil {
return errors.Errorf("FileBacking for virtual sstable must not be nil")
}
} else if nf.Meta.FileBacking == nil {
return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
}

if b.Added[nf.Level] == nil {
b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
}
Expand All @@ -717,9 +803,6 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
}
}

// Generate state for the backing files.
b.AddedFileBacking = append(b.AddedFileBacking, ve.CreatedBackingTables...)

// Since a file can be removed from backing files in exactly one version
// edit it is safe to just append without any de-duplication.
b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
Expand Down
109 changes: 109 additions & 0 deletions internal/manifest/version_edit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,77 @@ func checkRoundTrip(e0 VersionEdit) error {
return nil
}

// Version edits with virtual sstables will not be the same after a round trip
// as the Decode function will not set the FileBacking for a virtual sstable.
// We test round trip + bve accumulation here, after which the virtual sstable
// FileBacking should be set.
func TestVERoundTripAndAccumulate(t *testing.T) {
cmp := base.DefaultComparer.Compare
m1 := (&FileMetadata{
FileNum: 810,
Size: 8090,
CreationTime: 809060,
SmallestSeqNum: 9,
LargestSeqNum: 11,
}).ExtendPointKeyBounds(
cmp,
base.MakeInternalKey([]byte("a"), 0, base.InternalKeyKindSet),
base.MakeInternalKey([]byte("m"), 0, base.InternalKeyKindSet),
).ExtendRangeKeyBounds(
cmp,
base.MakeInternalKey([]byte("l"), 0, base.InternalKeyKindRangeKeySet),
base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeySet, []byte("z")),
)
m1.InitPhysicalBacking()

m2 := (&FileMetadata{
FileNum: 812,
Size: 8090,
CreationTime: 809060,
SmallestSeqNum: 9,
LargestSeqNum: 11,
Virtual: true,
FileBacking: m1.FileBacking,
}).ExtendPointKeyBounds(
cmp,
base.MakeInternalKey([]byte("a"), 0, base.InternalKeyKindSet),
base.MakeInternalKey([]byte("c"), 0, base.InternalKeyKindSet),
)

ve1 := VersionEdit{
ComparerName: "11",
MinUnflushedLogNum: 22,
ObsoletePrevLogNum: 33,
NextFileNum: 44,
LastSeqNum: 55,
CreatedBackingTables: []*FileBacking{m1.FileBacking},
NewFiles: []NewFileEntry{
{
Level: 4,
Meta: m2,
// Only set for the test.
BackingFileNum: m2.FileBacking.DiskFileNum,
},
},
}
var err error
buf := new(bytes.Buffer)
if err = ve1.Encode(buf); err != nil {
t.Error(err)
}
var ve2 VersionEdit
if err = ve2.Decode(buf); err != nil {
t.Error(err)
}
// Perform accumulation to set the FileBacking on the files in the Decoded
// version edit.
var bve BulkVersionEdit
require.NoError(t, bve.Accumulate(&ve2))
if diff := pretty.Diff(ve1, ve2); diff != nil {
t.Error(errors.Errorf("%s", strings.Join(diff, "\n")))
}
}

func TestVersionEditRoundTrip(t *testing.T) {
cmp := base.DefaultComparer.Compare
m1 := (&FileMetadata{
Expand Down Expand Up @@ -93,6 +164,40 @@ func TestVersionEditRoundTrip(t *testing.T) {
)
m4.InitPhysicalBacking()

m5 := (&FileMetadata{
FileNum: 810,
Size: 8090,
CreationTime: 809060,
SmallestSeqNum: 9,
LargestSeqNum: 11,
}).ExtendPointKeyBounds(
cmp,
base.MakeInternalKey([]byte("a"), 0, base.InternalKeyKindSet),
base.MakeInternalKey([]byte("m"), 0, base.InternalKeyKindSet),
).ExtendRangeKeyBounds(
cmp,
base.MakeInternalKey([]byte("l"), 0, base.InternalKeyKindRangeKeySet),
base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeySet, []byte("z")),
)
m5.InitPhysicalBacking()

m6 := (&FileMetadata{
FileNum: 811,
Size: 8090,
CreationTime: 809060,
SmallestSeqNum: 9,
LargestSeqNum: 11,
}).ExtendPointKeyBounds(
cmp,
base.MakeInternalKey([]byte("a"), 0, base.InternalKeyKindSet),
base.MakeInternalKey([]byte("m"), 0, base.InternalKeyKindSet),
).ExtendRangeKeyBounds(
cmp,
base.MakeInternalKey([]byte("l"), 0, base.InternalKeyKindRangeKeySet),
base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeySet, []byte("z")),
)
m6.InitPhysicalBacking()

testCases := []VersionEdit{
// An empty version edit.
{},
Expand All @@ -103,6 +208,10 @@ func TestVersionEditRoundTrip(t *testing.T) {
ObsoletePrevLogNum: 33,
NextFileNum: 44,
LastSeqNum: 55,
RemovedBackingTables: []base.DiskFileNum{
base.FileNum(10).DiskFileNum(), base.FileNum(11).DiskFileNum(),
},
CreatedBackingTables: []*FileBacking{m5.FileBacking, m6.FileBacking},
DeletedFiles: map[DeletedFileEntry]*FileMetadata{
{
Level: 3,
Expand Down
6 changes: 6 additions & 0 deletions table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ func (d *DB) scanReadStateTableStats(
for l, levelMetadata := range rs.current.Levels {
iter := levelMetadata.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if f.Virtual {
// TODO(bananabrick): Support stats collection for virtual
// sstables.
continue
}

// NB: We're not holding d.mu which protects f.Stats, but only the
// active stats collection job updates f.Stats for active files,
// and we ensure only one goroutine runs it at a time through
Expand Down
4 changes: 2 additions & 2 deletions tool/make_incorrect_manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func makeManifest1() {
ve.NextFileNum = 5
ve.LastSeqNum = 20
ve.NewFiles = []manifest.NewFileEntry{
{6, &manifest.FileMetadata{
{Level: 6, Meta: &manifest.FileMetadata{
FileNum: 1, SmallestSeqNum: 2, LargestSeqNum: 5}}}
writeVE(writer, &ve)

ve.MinUnflushedLogNum = 3
ve.NewFiles = []manifest.NewFileEntry{
{6, &manifest.FileMetadata{
{Level: 6, Meta: &manifest.FileMetadata{
FileNum: 2, SmallestSeqNum: 1, LargestSeqNum: 4}}}
writeVE(writer, &ve)

Expand Down
Loading

0 comments on commit ad14b30

Please sign in to comment.