Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: add writeKey method to fileSSTSink #137565

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 286 additions & 17 deletions pkg/backup/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,15 @@ type fileSSTSink struct {
// flush. This counter resets on each flush.
completedSpans int32

elideMode execinfrapb.ElidePrefix
elidePrefix roachpb.Key
elideMode execinfrapb.ElidePrefix

// elidedPrefix represents the elided prefix of the last exportSpan/key written to the sink.
// This resets on each flush.
elidedPrefix roachpb.Key

// prevKey represents the last key written using writeKey. When writing a new key, this helps determine
// if the last key written was mid-row. This resets on each flush.
prevKey roachpb.Key

// stats contain statistics about the actions of the fileSSTSink over its
// entire lifespan.
Expand Down Expand Up @@ -131,10 +138,20 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error {
return nil
}

var lastFile *backuppb.BackupManifest_File
if len(s.flushedFiles) > 0 {
lastFile = &s.flushedFiles[len(s.flushedFiles)-1]
}
// If writeKey was used, it is possible that the last written key was mid-row, but because no keys were written
// after, s.midRow was not updated. To ensure this, we update midRow using the end of the span that last key belonged
// to
if err := s.setMidRowForPrevKey(lastFile.Span.EndKey); err != nil {
return err
}
if s.midRow {
var lastKey roachpb.Key
if len(s.flushedFiles) > 0 {
lastKey = s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if lastFile != nil {
lastKey = lastFile.Span.EndKey
}
return errors.AssertionFailedf("backup closed file ending mid-key in %q", lastKey)
}
Expand Down Expand Up @@ -174,10 +191,11 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error {
}

s.flushedFiles = nil
s.elidePrefix = s.elidePrefix[:0]
s.elidedPrefix = s.elidedPrefix[:0]
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0
s.prevKey = nil

return nil
}
Expand Down Expand Up @@ -234,7 +252,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
// since it overlaps but SSTWriter demands writes in-order.
if len(s.flushedFiles) > 0 {
last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) {
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidedPrefix) {
log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s",
s.outName, s.flushedSize, span, last,
)
Expand All @@ -251,7 +269,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
return nil, err
}
}
s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...)
s.elidedPrefix = append(s.elidedPrefix[:0], spanPrefix...)

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

Expand Down Expand Up @@ -331,11 +349,217 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
return resp.resumeKey, err
}

// adjustFileEndKey checks if the export respsonse end key can be used as a
// split point during restore. If the end key is not splitable (i.e. it splits
// writeKey writes a single key to the SST file. The key should be the full key. span refers to the span that the key
// belongs to. start and end are the time bounds of the span being backed up. The writing of each key could
// potentially split up the span of the previously written key (if both keys were part of the same span and the span
// had to be split). As a consequence the span that is recorded for the new key is returned, as it may be a subspan of
// the span passed in for the key.
//
// flush should be called after the last key is written to ensure that the SST is written to the destination.
Comment on lines +352 to +358
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the biggest hurdles for me writing this was determining some of the implicit assumptions we make about the data, which wasn't laid out clearly in write.

I think this could maybe use some additional elaborations? The behavior I have laid out mentally is as so:

writeKey in its most simplest form will write a key such that the previous key was part of the same span, in which case it just adds to that previous BackupManifest_File. In the event that the previous file has reached the fileSpanByteLimit, the previous file will have its span exclusive end updated to this new key, and then create a new file with the new key as its span start, and the end is left untouched.

Another scenario is writeKey receives a span that contiguously extends the last BackupManifest_File's span (i.e. span.Key == lastFile.Span.EndKey). In that case, we can extend the previous backup manifest file by updating its end key to the new span's end key and continue writing. In the event we are unable to do this because of size constraints, we don't do any extension and just treat the span as a new file.

These are the main cases and what I expect to encounter in a compaction. I did add some guardrails against edge cases, namely where the span precedes the previous BackupManifest_File or if the key being written precedes the last written key. The behavior here is to just flush. From a backup perspective, this could potentially result in BackupManifest_Files overlapping within the same backup. However, I think that's fine as this still creates valid BackupManifest_Files with the correct backing SSTs.

func (s *fileSSTSink) writeKey(
ctx context.Context,
key storage.MVCCKey,
value []byte,
span roachpb.Span,
startTime hlc.Timestamp,
endTime hlc.Timestamp,
) (roachpb.Span, error) {
if key.Key.Compare(span.Key) < 0 || key.Key.Compare(span.EndKey) >= 0 {
return roachpb.Span{}, errors.AssertionFailedf("key %s outside of span %v", key.Key, span)
}
if err := s.maybeFlushOOOSpans(ctx, span, startTime, endTime, true /* skipSameSpan */); err != nil {
return roachpb.Span{}, err
}
if err := s.maybeFlushOOOKey(ctx, key.Key); err != nil {
return roachpb.Span{}, err
}
if err := s.setMidRowForPrevKey(key.Key); err != nil {
return roachpb.Span{}, err
}

span, err := s.maybeDoSizeFlushWithNextKey(ctx, key.Key, span, startTime, endTime)
if err != nil {
return roachpb.Span{}, err
}

// At this point, we can make the following assumptions about the new key and the span it belongs to:
// - The new key comes after the previous key
// - The new span comes after the previous span, although it may not contiguously extend the previous span
if s.out == nil {
if err := s.open(ctx); err != nil {
return roachpb.Span{}, err
}
}

elidedKey, prefix, err := elideMVCCKeyPrefix(key, s.elideMode)
if err != nil {
return roachpb.Span{}, err
}
s.elidedPrefix = append(s.elidedPrefix[:0], prefix...)
if err := s.sst.PutRawMVCC(elidedKey, value); err != nil {
return roachpb.Span{}, err
}

extend := s.shouldExtendLastFile(span, startTime, endTime, true /* extendSameSpan */)
keyAsRowCount := roachpb.RowCount{
DataSize: int64(len(key.Key)) + int64(len(value)),
}

var lastFile *backuppb.BackupManifest_File
if len(s.flushedFiles) > 0 {
lastFile = &s.flushedFiles[len(s.flushedFiles)-1]
}

if extend {
lastFile.Span.EndKey = span.EndKey
span = lastFile.Span
lastFile.EntryCounts.Add(keyAsRowCount)
s.stats.spanGrows++
} else {
if lastFile != nil && sameAsFileSpan(span, endTime, startTime, lastFile) {
// The new span covers the same span as the last BackupManifest_File, yet we are not extending. That means we
// are splitting due to the fileSpanByteLimit. As such, the current key can serve as an exclusive end key for the
// last file and an inclusive start key for the new file.
lastFile.Span.EndKey = key.Key
span.Key = key.Key
}
f := backuppb.BackupManifest_File{
Span: span,
EntryCounts: roachpb.RowCount{
DataSize: int64(len(key.Key)) + int64(len(value)),
},
StartTime: startTime,
EndTime: endTime,
Path: s.outName,
}
s.flushedFiles = append(s.flushedFiles, f)
s.completedSpans++
}

s.flushedSize += keyAsRowCount.DataSize
s.prevKey = append(s.prevKey[:0], key.Key...)

return span, nil
}

// maybeFlushOOOSpans checks if the new span's startKey precedes the last spans' endKey. If it does, the file is
// flushed. If `skipSameSpan` is true, the file is not flushed if the new span is the same as the last span with the
// same time bounds.
func (s *fileSSTSink) maybeFlushOOOSpans(
ctx context.Context, newSpan roachpb.Span, startTime, endTime hlc.Timestamp, skipSameSpan bool,
) error {
if len(s.flushedFiles) == 0 {
return nil
}
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
if skipSameSpan && sameAsFileSpan(newSpan, startTime, endTime, lastFile) {
return nil
}
if newSpan.Key.Compare(lastFile.Span.EndKey) < 0 {
log.VEventf(
ctx, 1, "flushing backup file %s of size %d because span %v overlaps with %v",
s.outName, s.flushedSize, newSpan, lastFile.Span,
)
s.stats.oooFlushes++
if err := s.flushFile(s.ctx); err != nil {
return err
}
}
return nil
}

// maybeFlushOOOKey checks if the new key is out of order with respect to the last key written using writeKey
// to the SST file. If it is, the file is flushed and a new one is created.
func (s *fileSSTSink) maybeFlushOOOKey(ctx context.Context, newKey roachpb.Key) error {
if s.prevKey == nil {
return nil
}
keyPrefix, err := elidedPrefix(newKey, s.elideMode)
if err != nil {
return err
}
if newKey.Compare(s.prevKey) < 0 || !bytes.Equal(keyPrefix, s.elidedPrefix) {
log.VEventf(
ctx, 1, "flushing backup file %s of size %d because key %s cannot append before %s",
s.outName, s.flushedSize, newKey, s.prevKey,
)
s.stats.oooFlushes++
if err := s.flushFile(s.ctx); err != nil {
return err
}
}
return nil
}

// maybeDoSizeFlush checks if the size of the current backup file exceeds the targetFileSize and flushes the file
// if it does. This function should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) maybeDoSizeFlush(ctx context.Context) error {
if s.shouldDoSizeFlush() {
return s.doSizeFlush(ctx)
} else {
log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize)
return nil
}
}

// maybeDoSizeFlushWithNextKey is identical to maybeDoSizeFlush, except information about the next key to be written
// is provided. This is used to determine if the next key can act as an exclusive end key for the last
// BackupManifest_File. The updated span of the next key is returned, as it may be set to contiguously extend the last
// span. This function should only be called after s.midRow has been updated for the last key/span
// written.
func (s *fileSSTSink) maybeDoSizeFlushWithNextKey(
ctx context.Context, nextKey roachpb.Key, nextSpan roachpb.Span, startTime, endTime hlc.Timestamp,
) (roachpb.Span, error) {
if len(s.flushedFiles) == 0 || !s.shouldDoSizeFlush() {
log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize)
return nextSpan, nil
}
// If the span of the next key to be written covers the same span as the last BackupManifest_File, then the next key
// can act as an exclusive end key for the last file's span. Additionally, the span of the nextKey will be updated
// so that it contiguously extends with the last span.
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
if sameAsFileSpan(nextSpan, startTime, endTime, lastFile) {
lastFile.Span.EndKey = nextKey
nextSpan.Key = nextKey
}
return nextSpan, s.doSizeFlush(ctx)
}

// shouldDoSizeFlush returns true if the current backup file exceeds the targetFileSize and we are currently not mid-row.
// Should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) shouldDoSizeFlush() bool {
return s.flushedSize > targetFileSize.Get(s.conf.settings) && !s.midRow
}

// doSizeFlush flushes the buffered files with size exceeded as the reason.
func (s *fileSSTSink) doSizeFlush(ctx context.Context) error {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
return s.flushFile(ctx)
}

// extendsLastSpan returns true if the new span can be added to the last span in the flushedFiles slice.
// If the last written key was mid-row, the new span is always considered to extend the last span.
// If the new span is a contiguous extension of the last span, it is also considered to extend the last span.
// extendSameSpan determines if an identical span should be considered an extension of the last span.
// This should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) shouldExtendLastFile(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: might just make this a regular non-receiver function like the rest of them below.

span roachpb.Span, startTime, endTime hlc.Timestamp, extendSameSpan bool,
) bool {
if len(s.flushedFiles) == 0 {
return false
}
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
return s.midRow ||
extendsFileSpan(span, startTime, endTime, lastFile) ||
(extendSameSpan && sameAsFileSpan(span, startTime, endTime, lastFile))
}

// adjustFileEndKey checks if the export response end key can be used as a
// split point during restore. If the end key is not splittable (i.e. it splits
// two column families in the same row), the function will attempt to adjust the
// endkey to become splitable. The function returns the potentially adjusted
// end key and whether this end key is mid row/unsplitable (i.e. splits a 2
// end key to become splittable. The function returns the potentially adjusted
// end key and whether this end key is mid-row/un-splittable (i.e. splits a 2
// column families or mvcc versions).
func adjustFileEndKey(endKey, maxPointKey, maxRangeEnd roachpb.Key) (roachpb.Key, bool) {
maxKey := maxPointKey
Expand Down Expand Up @@ -398,9 +622,9 @@ func (s *fileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
break
}
k := iter.UnsafeKey()
suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix)
suffix, ok := bytes.CutPrefix(k.Key, s.elidedPrefix)
if !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix)
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidedPrefix)
}
k.Key = suffix

Expand Down Expand Up @@ -479,11 +703,11 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) {
maxKey = append(maxKey[:0], rk.EndKey...)
}
var ok bool
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix)
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidedPrefix)
}
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix)
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidedPrefix)
}
if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil {
return nil, err
Expand All @@ -493,6 +717,20 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) {
return maxKey, nil
}

// setMidRowForPrevKey checks if the last key written using writeKey was mid-row by using the next key to be written or
// the end key of its span and sets s.midRow accordingly. If no key was previously written using writeKey,
// this is a no-op.
func (s *fileSSTSink) setMidRowForPrevKey(endKey roachpb.Key) error {
if s.prevKey != nil {
endRowKey, err := keys.EnsureSafeSplitKey(endKey)
if err != nil {
return err
}
s.midRow = s.prevKey.Compare(endRowKey) > 0
}
return nil
}

func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
Expand All @@ -518,3 +756,34 @@ func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error)
}
return nil, nil
}

func elideMVCCKeyPrefix(
key storage.MVCCKey, mode execinfrapb.ElidePrefix,
) (storage.MVCCKey, []byte, error) {
prefix, err := elidedPrefix(key.Key, mode)
if err != nil {
return storage.MVCCKey{}, nil, err
}
cutKey, ok := bytes.CutPrefix(key.Key, prefix)
if !ok {
return storage.MVCCKey{}, nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", key.Key, prefix)
}
key.Key = cutKey
return key, prefix, nil
}

// sameAsFileSpan returns true if the span, start, and end match the span, start, and end of the BackupManifest_File.
func sameAsFileSpan(
span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File,
) bool {
return backupFile.Span.Equal(span) && backupFile.StartTime.Equal(start) && backupFile.EndTime.Equal(end)
}

// extendsFileSpan returns true if a span can successfully contiguously extend the span of a BackupManifest_File.
func extendsFileSpan(
span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File,
) bool {
return backupFile.Span.EndKey.Equal(span.Key) &&
backupFile.StartTime.Equal(start) &&
backupFile.EndTime.Equal(end)
}
Loading
Loading