Skip to content

Commit

Permalink
backupccl: add writeKey method to fileSSTSink
Browse files Browse the repository at this point in the history
To support backup compactions, we need to be able to write MVCC keys to
a file sink one key at a time. The current `fileSinkSST` only supports
writing KV export responses, or essentially one span at a time. This
commit adds support for writing key by key.

Epic: none

Release note: None
  • Loading branch information
kev-cao committed Dec 19, 2024
1 parent 026d669 commit 4921ba7
Show file tree
Hide file tree
Showing 2 changed files with 457 additions and 17 deletions.
303 changes: 286 additions & 17 deletions pkg/backup/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,15 @@ type fileSSTSink struct {
// flush. This counter resets on each flush.
completedSpans int32

elideMode execinfrapb.ElidePrefix
elidePrefix roachpb.Key
elideMode execinfrapb.ElidePrefix

// elidedPrefix represents the elided prefix of the last exportSpan/key written to the sink.
// This resets on each flush.
elidedPrefix roachpb.Key

// prevKey represents the last key written using writeKey. When writing a new key, this helps determine
// if the last key written was mid-row. This resets on each flush.
prevKey roachpb.Key

// stats contain statistics about the actions of the fileSSTSink over its
// entire lifespan.
Expand Down Expand Up @@ -131,10 +138,20 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error {
return nil
}

var lastFile *backuppb.BackupManifest_File
if len(s.flushedFiles) > 0 {
lastFile = &s.flushedFiles[len(s.flushedFiles)-1]
}
// If writeKey was used, it is possible that the last written key was mid-row, but because no keys were written
// after, s.midRow was not updated. To ensure this, we update midRow using the end of the span that last key belonged
// to
if err := s.setMidRowForPrevKey(lastFile.Span.EndKey); err != nil {
return err
}
if s.midRow {
var lastKey roachpb.Key
if len(s.flushedFiles) > 0 {
lastKey = s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if lastFile != nil {
lastKey = lastFile.Span.EndKey
}
return errors.AssertionFailedf("backup closed file ending mid-key in %q", lastKey)
}
Expand Down Expand Up @@ -174,10 +191,11 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error {
}

s.flushedFiles = nil
s.elidePrefix = s.elidePrefix[:0]
s.elidedPrefix = s.elidedPrefix[:0]
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0
s.prevKey = nil

return nil
}
Expand Down Expand Up @@ -234,7 +252,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
// since it overlaps but SSTWriter demands writes in-order.
if len(s.flushedFiles) > 0 {
last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) {
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidedPrefix) {
log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s",
s.outName, s.flushedSize, span, last,
)
Expand All @@ -251,7 +269,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
return nil, err
}
}
s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...)
s.elidedPrefix = append(s.elidedPrefix[:0], spanPrefix...)

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

Expand Down Expand Up @@ -331,11 +349,217 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key
return resp.resumeKey, err
}

// adjustFileEndKey checks if the export respsonse end key can be used as a
// split point during restore. If the end key is not splitable (i.e. it splits
// writeKey writes a single key to the SST file. The key should be the full key. span refers to the span that the key
// belongs to. start and end are the time bounds of the span being backed up. The writing of each key could
// potentially split up the span of the previously written key (if both keys were part of the same span and the span
// had to be split). As a consequence the span that is recorded for the new key is returned, as it may be a subspan of
// the span passed in for the key.
//
// flush should be called after the last key is written to ensure that the SST is written to the destination.
func (s *fileSSTSink) writeKey(
ctx context.Context,
key storage.MVCCKey,
value []byte,
span roachpb.Span,
startTime hlc.Timestamp,
endTime hlc.Timestamp,
) (roachpb.Span, error) {
if key.Key.Compare(span.Key) < 0 || key.Key.Compare(span.EndKey) >= 0 {
return roachpb.Span{}, errors.AssertionFailedf("key %s outside of span %v", key.Key, span)
}
if err := s.maybeFlushOOOSpans(ctx, span, startTime, endTime, true /* skipSameSpan */); err != nil {
return roachpb.Span{}, err
}
if err := s.maybeFlushOOOKey(ctx, key.Key); err != nil {
return roachpb.Span{}, err
}
if err := s.setMidRowForPrevKey(key.Key); err != nil {
return roachpb.Span{}, err
}

span, err := s.maybeDoSizeFlushWithNextKey(ctx, key.Key, span, startTime, endTime)
if err != nil {
return roachpb.Span{}, err
}

// At this point, we can make the following assumptions about the new key and the span it belongs to:
// - The new key comes after the previous key
// - The new span comes after the previous span, although it may not contiguously extend the previous span
if s.out == nil {
if err := s.open(ctx); err != nil {
return roachpb.Span{}, err
}
}

elidedKey, prefix, err := elideMVCCKeyPrefix(key, s.elideMode)
if err != nil {
return roachpb.Span{}, err
}
s.elidedPrefix = append(s.elidedPrefix[:0], prefix...)
if err := s.sst.PutRawMVCC(elidedKey, value); err != nil {
return roachpb.Span{}, err
}

extend := s.shouldExtendLastFile(span, startTime, endTime, true /* extendSameSpan */)
keyAsRowCount := roachpb.RowCount{
DataSize: int64(len(key.Key)) + int64(len(value)),
}

var lastFile *backuppb.BackupManifest_File
if len(s.flushedFiles) > 0 {
lastFile = &s.flushedFiles[len(s.flushedFiles)-1]
}

if extend {
lastFile.Span.EndKey = span.EndKey
span = lastFile.Span
lastFile.EntryCounts.Add(keyAsRowCount)
s.stats.spanGrows++
} else {
if lastFile != nil && sameAsFileSpan(span, endTime, startTime, lastFile) {
// The new span covers the same span as the last BackupManifest_File, yet we are not extending. That means we
// are splitting due to the fileSpanByteLimit. As such, the current key can serve as an exclusive end key for the
// last file and an inclusive start key for the new file.
lastFile.Span.EndKey = key.Key
span.Key = key.Key
}
f := backuppb.BackupManifest_File{
Span: span,
EntryCounts: roachpb.RowCount{
DataSize: int64(len(key.Key)) + int64(len(value)),
},
StartTime: startTime,
EndTime: endTime,
Path: s.outName,
}
s.flushedFiles = append(s.flushedFiles, f)
s.completedSpans++
}

s.flushedSize += keyAsRowCount.DataSize
s.prevKey = append(s.prevKey[:0], key.Key...)

return span, nil
}

// maybeFlushOOOSpans checks if the new span's startKey precedes the last spans' endKey. If it does, the file is
// flushed. If `skipSameSpan` is true, the file is not flushed if the new span is the same as the last span with the
// same time bounds.
func (s *fileSSTSink) maybeFlushOOOSpans(
ctx context.Context, newSpan roachpb.Span, startTime, endTime hlc.Timestamp, skipSameSpan bool,
) error {
if len(s.flushedFiles) == 0 {
return nil
}
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
if skipSameSpan && sameAsFileSpan(newSpan, startTime, endTime, lastFile) {
return nil
}
if newSpan.Key.Compare(lastFile.Span.EndKey) < 0 {
log.VEventf(
ctx, 1, "flushing backup file %s of size %d because span %v overlaps with %v",
s.outName, s.flushedSize, newSpan, lastFile.Span,
)
s.stats.oooFlushes++
if err := s.flushFile(s.ctx); err != nil {
return err
}
}
return nil
}

// maybeFlushOOOKey checks if the new key is out of order with respect to the last key written using writeKey
// to the SST file. If it is, the file is flushed and a new one is created.
func (s *fileSSTSink) maybeFlushOOOKey(ctx context.Context, newKey roachpb.Key) error {
if s.prevKey == nil {
return nil
}
keyPrefix, err := elidedPrefix(newKey, s.elideMode)
if err != nil {
return err
}
if newKey.Compare(s.prevKey) < 0 || !bytes.Equal(keyPrefix, s.elidedPrefix) {
log.VEventf(
ctx, 1, "flushing backup file %s of size %d because key %s cannot append before %s",
s.outName, s.flushedSize, newKey, s.prevKey,
)
s.stats.oooFlushes++
if err := s.flushFile(s.ctx); err != nil {
return err
}
}
return nil
}

// maybeDoSizeFlush checks if the size of the current backup file exceeds the targetFileSize and flushes the file
// if it does. This function should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) maybeDoSizeFlush(ctx context.Context) error {
if s.shouldDoSizeFlush() {
return s.doSizeFlush(ctx)
} else {
log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize)
return nil
}
}

// maybeDoSizeFlushWithNextKey is identical to maybeDoSizeFlush, except information about the next key to be written
// is provided. This is used to determine if the next key can act as an exclusive end key for the last
// BackupManifest_File. The updated span of the next key is returned, as it may be set to contiguously extend the last
// span. This function should only be called after s.midRow has been updated for the last key/span
// written.
func (s *fileSSTSink) maybeDoSizeFlushWithNextKey(
ctx context.Context, nextKey roachpb.Key, nextSpan roachpb.Span, startTime, endTime hlc.Timestamp,
) (roachpb.Span, error) {
if len(s.flushedFiles) == 0 || !s.shouldDoSizeFlush() {
log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize)
return nextSpan, nil
}
// If the span of the next key to be written covers the same span as the last BackupManifest_File, then the next key
// can act as an exclusive end key for the last file's span. Additionally, the span of the nextKey will be updated
// so that it contiguously extends with the last span.
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
if sameAsFileSpan(nextSpan, startTime, endTime, lastFile) {
lastFile.Span.EndKey = nextKey
nextSpan.Key = nextKey
}
return nextSpan, s.doSizeFlush(ctx)
}

// shouldDoSizeFlush returns true if the current backup file exceeds the targetFileSize and we are currently not mid-row.
// Should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) shouldDoSizeFlush() bool {
return s.flushedSize > targetFileSize.Get(s.conf.settings) && !s.midRow
}

// doSizeFlush flushes the buffered files with size exceeded as the reason.
func (s *fileSSTSink) doSizeFlush(ctx context.Context) error {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
return s.flushFile(ctx)
}

// extendsLastSpan returns true if the new span can be added to the last span in the flushedFiles slice.
// If the last written key was mid-row, the new span is always considered to extend the last span.
// If the new span is a contiguous extension of the last span, it is also considered to extend the last span.
// extendSameSpan determines if an identical span should be considered an extension of the last span.
// This should only be called after s.midRow has been updated for the last key/span written.
func (s *fileSSTSink) shouldExtendLastFile(
span roachpb.Span, startTime, endTime hlc.Timestamp, extendSameSpan bool,
) bool {
if len(s.flushedFiles) == 0 {
return false
}
lastFile := &s.flushedFiles[len(s.flushedFiles)-1]
return s.midRow ||
extendsFileSpan(span, startTime, endTime, lastFile) ||
(extendSameSpan && sameAsFileSpan(span, startTime, endTime, lastFile))
}

// adjustFileEndKey checks if the export response end key can be used as a
// split point during restore. If the end key is not splittable (i.e. it splits
// two column families in the same row), the function will attempt to adjust the
// endkey to become splitable. The function returns the potentially adjusted
// end key and whether this end key is mid row/unsplitable (i.e. splits a 2
// end key to become splittable. The function returns the potentially adjusted
// end key and whether this end key is mid-row/un-splittable (i.e. splits a 2
// column families or mvcc versions).
func adjustFileEndKey(endKey, maxPointKey, maxRangeEnd roachpb.Key) (roachpb.Key, bool) {
maxKey := maxPointKey
Expand Down Expand Up @@ -398,9 +622,9 @@ func (s *fileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
break
}
k := iter.UnsafeKey()
suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix)
suffix, ok := bytes.CutPrefix(k.Key, s.elidedPrefix)
if !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix)
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidedPrefix)
}
k.Key = suffix

Expand Down Expand Up @@ -479,11 +703,11 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) {
maxKey = append(maxKey[:0], rk.EndKey...)
}
var ok bool
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix)
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidedPrefix)
}
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix)
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidedPrefix)
}
if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil {
return nil, err
Expand All @@ -493,6 +717,20 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) {
return maxKey, nil
}

// setMidRowForPrevKey checks if the last key written using writeKey was mid-row by using the next key to be written or
// the end key of its span and sets s.midRow accordingly. If no key was previously written using writeKey,
// this is a no-op.
func (s *fileSSTSink) setMidRowForPrevKey(endKey roachpb.Key) error {
if s.prevKey != nil {
endRowKey, err := keys.EnsureSafeSplitKey(endKey)
if err != nil {
return err
}
s.midRow = s.prevKey.Compare(endRowKey) > 0
}
return nil
}

func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
Expand All @@ -518,3 +756,34 @@ func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error)
}
return nil, nil
}

func elideMVCCKeyPrefix(
key storage.MVCCKey, mode execinfrapb.ElidePrefix,
) (storage.MVCCKey, []byte, error) {
prefix, err := elidedPrefix(key.Key, mode)
if err != nil {
return storage.MVCCKey{}, nil, err
}
cutKey, ok := bytes.CutPrefix(key.Key, prefix)
if !ok {
return storage.MVCCKey{}, nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", key.Key, prefix)
}
key.Key = cutKey
return key, prefix, nil
}

// sameAsFileSpan returns true if the span, start, and end match the span, start, and end of the BackupManifest_File.
func sameAsFileSpan(
span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File,
) bool {
return backupFile.Span.Equal(span) && backupFile.StartTime.Equal(start) && backupFile.EndTime.Equal(end)
}

// extendsFileSpan returns true if a span can successfully contiguously extend the span of a BackupManifest_File.
func extendsFileSpan(
span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File,
) bool {
return backupFile.Span.EndKey.Equal(span.Key) &&
backupFile.StartTime.Equal(start) &&
backupFile.EndTime.Equal(end)
}
Loading

0 comments on commit 4921ba7

Please sign in to comment.