diff --git a/pkg/backup/file_sst_sink.go b/pkg/backup/file_sst_sink.go index fb6e53646044..4883046e1266 100644 --- a/pkg/backup/file_sst_sink.go +++ b/pkg/backup/file_sst_sink.go @@ -63,8 +63,15 @@ type fileSSTSink struct { // flush. This counter resets on each flush. completedSpans int32 - elideMode execinfrapb.ElidePrefix - elidePrefix roachpb.Key + elideMode execinfrapb.ElidePrefix + + // elidedPrefix represents the elided prefix of the last exportSpan/key written to the sink. + // This resets on each flush. + elidedPrefix roachpb.Key + + // prevKey represents the last key written using writeKey. When writing a new key, this helps determine + // if the last key written was mid-row. This resets on each flush. + prevKey roachpb.Key // stats contain statistics about the actions of the fileSSTSink over its // entire lifespan. @@ -131,10 +138,20 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { return nil } + var lastFile *backuppb.BackupManifest_File + if len(s.flushedFiles) > 0 { + lastFile = &s.flushedFiles[len(s.flushedFiles)-1] + } + // If writeKey was used, it is possible that the last written key was mid-row, but because no keys were written + // after, s.midRow was not updated. To ensure this, we update midRow using the end of the span that last key belonged + // to + if err := s.setMidRowForPrevKey(lastFile.Span.EndKey); err != nil { + return err + } if s.midRow { var lastKey roachpb.Key - if len(s.flushedFiles) > 0 { - lastKey = s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey + if lastFile != nil { + lastKey = lastFile.Span.EndKey } return errors.AssertionFailedf("backup closed file ending mid-key in %q", lastKey) } @@ -174,10 +191,11 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { } s.flushedFiles = nil - s.elidePrefix = s.elidePrefix[:0] + s.elidedPrefix = s.elidedPrefix[:0] s.flushedSize = 0 s.flushedRevStart.Reset() s.completedSpans = 0 + s.prevKey = nil return nil } @@ -234,7 +252,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key // since it overlaps but SSTWriter demands writes in-order. if len(s.flushedFiles) > 0 { last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey - if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) { + if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidedPrefix) { log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s", s.outName, s.flushedSize, span, last, ) @@ -251,7 +269,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key return nil, err } } - s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...) + s.elidedPrefix = append(s.elidedPrefix[:0], spanPrefix...) log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) @@ -331,11 +349,217 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key return resp.resumeKey, err } -// adjustFileEndKey checks if the export respsonse end key can be used as a -// split point during restore. If the end key is not splitable (i.e. it splits +// writeKey writes a single key to the SST file. The key should be the full key. span refers to the span that the key +// belongs to. start and end are the time bounds of the span being backed up. The writing of each key could +// potentially split up the span of the previously written key (if both keys were part of the same span and the span +// had to be split). As a consequence the span that is recorded for the new key is returned, as it may be a subspan of +// the span passed in for the key. +// +// flush should be called after the last key is written to ensure that the SST is written to the destination. +func (s *fileSSTSink) writeKey( + ctx context.Context, + key storage.MVCCKey, + value []byte, + span roachpb.Span, + startTime hlc.Timestamp, + endTime hlc.Timestamp, +) (roachpb.Span, error) { + if key.Key.Compare(span.Key) < 0 || key.Key.Compare(span.EndKey) >= 0 { + return roachpb.Span{}, errors.AssertionFailedf("key %s outside of span %v", key.Key, span) + } + if err := s.maybeFlushOOOSpans(ctx, span, startTime, endTime, true /* skipSameSpan */); err != nil { + return roachpb.Span{}, err + } + if err := s.maybeFlushOOOKey(ctx, key.Key); err != nil { + return roachpb.Span{}, err + } + if err := s.setMidRowForPrevKey(key.Key); err != nil { + return roachpb.Span{}, err + } + + span, err := s.maybeDoSizeFlushWithNextKey(ctx, key.Key, span, startTime, endTime) + if err != nil { + return roachpb.Span{}, err + } + + // At this point, we can make the following assumptions about the new key and the span it belongs to: + // - The new key comes after the previous key + // - The new span comes after the previous span, although it may not contiguously extend the previous span + if s.out == nil { + if err := s.open(ctx); err != nil { + return roachpb.Span{}, err + } + } + + elidedKey, prefix, err := elideMVCCKeyPrefix(key, s.elideMode) + if err != nil { + return roachpb.Span{}, err + } + s.elidedPrefix = append(s.elidedPrefix[:0], prefix...) + if err := s.sst.PutRawMVCC(elidedKey, value); err != nil { + return roachpb.Span{}, err + } + + extend := s.shouldExtendLastFile(span, startTime, endTime, true /* extendSameSpan */) + keyAsRowCount := roachpb.RowCount{ + DataSize: int64(len(key.Key)) + int64(len(value)), + } + + var lastFile *backuppb.BackupManifest_File + if len(s.flushedFiles) > 0 { + lastFile = &s.flushedFiles[len(s.flushedFiles)-1] + } + + if extend { + lastFile.Span.EndKey = span.EndKey + span = lastFile.Span + lastFile.EntryCounts.Add(keyAsRowCount) + s.stats.spanGrows++ + } else { + if lastFile != nil && sameAsFileSpan(span, endTime, startTime, lastFile) { + // The new span covers the same span as the last BackupManifest_File, yet we are not extending. That means we + // are splitting due to the fileSpanByteLimit. As such, the current key can serve as an exclusive end key for the + // last file and an inclusive start key for the new file. + lastFile.Span.EndKey = key.Key + span.Key = key.Key + } + f := backuppb.BackupManifest_File{ + Span: span, + EntryCounts: roachpb.RowCount{ + DataSize: int64(len(key.Key)) + int64(len(value)), + }, + StartTime: startTime, + EndTime: endTime, + Path: s.outName, + } + s.flushedFiles = append(s.flushedFiles, f) + s.completedSpans++ + } + + s.flushedSize += keyAsRowCount.DataSize + s.prevKey = append(s.prevKey[:0], key.Key...) + + return span, nil +} + +// maybeFlushOOOSpans checks if the new span's startKey precedes the last spans' endKey. If it does, the file is +// flushed. If `skipSameSpan` is true, the file is not flushed if the new span is the same as the last span with the +// same time bounds. +func (s *fileSSTSink) maybeFlushOOOSpans( + ctx context.Context, newSpan roachpb.Span, startTime, endTime hlc.Timestamp, skipSameSpan bool, +) error { + if len(s.flushedFiles) == 0 { + return nil + } + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + if skipSameSpan && sameAsFileSpan(newSpan, startTime, endTime, lastFile) { + return nil + } + if newSpan.Key.Compare(lastFile.Span.EndKey) < 0 { + log.VEventf( + ctx, 1, "flushing backup file %s of size %d because span %v overlaps with %v", + s.outName, s.flushedSize, newSpan, lastFile.Span, + ) + s.stats.oooFlushes++ + if err := s.flushFile(s.ctx); err != nil { + return err + } + } + return nil +} + +// maybeFlushOOOKey checks if the new key is out of order with respect to the last key written using writeKey +// to the SST file. If it is, the file is flushed and a new one is created. +func (s *fileSSTSink) maybeFlushOOOKey(ctx context.Context, newKey roachpb.Key) error { + if s.prevKey == nil { + return nil + } + keyPrefix, err := elidedPrefix(newKey, s.elideMode) + if err != nil { + return err + } + if newKey.Compare(s.prevKey) < 0 || !bytes.Equal(keyPrefix, s.elidedPrefix) { + log.VEventf( + ctx, 1, "flushing backup file %s of size %d because key %s cannot append before %s", + s.outName, s.flushedSize, newKey, s.prevKey, + ) + s.stats.oooFlushes++ + if err := s.flushFile(s.ctx); err != nil { + return err + } + } + return nil +} + +// maybeDoSizeFlush checks if the size of the current backup file exceeds the targetFileSize and flushes the file +// if it does. This function should only be called after s.midRow has been updated for the last key/span written. +func (s *fileSSTSink) maybeDoSizeFlush(ctx context.Context) error { + if s.shouldDoSizeFlush() { + return s.doSizeFlush(ctx) + } else { + log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) + return nil + } +} + +// maybeDoSizeFlushWithNextKey is identical to maybeDoSizeFlush, except information about the next key to be written +// is provided. This is used to determine if the next key can act as an exclusive end key for the last +// BackupManifest_File. The updated span of the next key is returned, as it may be set to contiguously extend the last +// span. This function should only be called after s.midRow has been updated for the last key/span +// written. +func (s *fileSSTSink) maybeDoSizeFlushWithNextKey( + ctx context.Context, nextKey roachpb.Key, nextSpan roachpb.Span, startTime, endTime hlc.Timestamp, +) (roachpb.Span, error) { + if len(s.flushedFiles) == 0 || !s.shouldDoSizeFlush() { + log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) + return nextSpan, nil + } + // If the span of the next key to be written covers the same span as the last BackupManifest_File, then the next key + // can act as an exclusive end key for the last file's span. Additionally, the span of the nextKey will be updated + // so that it contiguously extends with the last span. + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + if sameAsFileSpan(nextSpan, startTime, endTime, lastFile) { + lastFile.Span.EndKey = nextKey + nextSpan.Key = nextKey + } + return nextSpan, s.doSizeFlush(ctx) +} + +// shouldDoSizeFlush returns true if the current backup file exceeds the targetFileSize and we are currently not mid-row. +// Should only be called after s.midRow has been updated for the last key/span written. +func (s *fileSSTSink) shouldDoSizeFlush() bool { + return s.flushedSize > targetFileSize.Get(s.conf.settings) && !s.midRow +} + +// doSizeFlush flushes the buffered files with size exceeded as the reason. +func (s *fileSSTSink) doSizeFlush(ctx context.Context) error { + s.stats.sizeFlushes++ + log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) + return s.flushFile(ctx) +} + +// extendsLastSpan returns true if the new span can be added to the last span in the flushedFiles slice. +// If the last written key was mid-row, the new span is always considered to extend the last span. +// If the new span is a contiguous extension of the last span, it is also considered to extend the last span. +// extendSameSpan determines if an identical span should be considered an extension of the last span. +// This should only be called after s.midRow has been updated for the last key/span written. +func (s *fileSSTSink) shouldExtendLastFile( + span roachpb.Span, startTime, endTime hlc.Timestamp, extendSameSpan bool, +) bool { + if len(s.flushedFiles) == 0 { + return false + } + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + return s.midRow || + extendsFileSpan(span, startTime, endTime, lastFile) || + (extendSameSpan && sameAsFileSpan(span, startTime, endTime, lastFile)) +} + +// adjustFileEndKey checks if the export response end key can be used as a +// split point during restore. If the end key is not splittable (i.e. it splits // two column families in the same row), the function will attempt to adjust the -// endkey to become splitable. The function returns the potentially adjusted -// end key and whether this end key is mid row/unsplitable (i.e. splits a 2 +// end key to become splittable. The function returns the potentially adjusted +// end key and whether this end key is mid-row/un-splittable (i.e. splits a 2 // column families or mvcc versions). func adjustFileEndKey(endKey, maxPointKey, maxRangeEnd roachpb.Key) (roachpb.Key, bool) { maxKey := maxPointKey @@ -398,9 +622,9 @@ func (s *fileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp break } k := iter.UnsafeKey() - suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix) + suffix, ok := bytes.CutPrefix(k.Key, s.elidedPrefix) if !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix) + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidedPrefix) } k.Key = suffix @@ -479,11 +703,11 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { maxKey = append(maxKey[:0], rk.EndKey...) } var ok bool - if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix) + if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidedPrefix); !ok { + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidedPrefix) } - if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix) + if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidedPrefix); !ok { + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidedPrefix) } if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil { return nil, err @@ -493,6 +717,20 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { return maxKey, nil } +// setMidRowForPrevKey checks if the last key written using writeKey was mid-row by using the next key to be written or +// the end key of its span and sets s.midRow accordingly. If no key was previously written using writeKey, +// this is a no-op. +func (s *fileSSTSink) setMidRowForPrevKey(endKey roachpb.Key) error { + if s.prevKey != nil { + endRowKey, err := keys.EnsureSafeSplitKey(endKey) + if err != nil { + return err + } + s.midRow = s.prevKey.Compare(endRowKey) > 0 + } + return nil +} + func generateUniqueSSTName(nodeID base.SQLInstanceID) string { // The data/ prefix, including a /, is intended to group SSTs in most of the // common file/bucket browse UIs. @@ -518,3 +756,34 @@ func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) } return nil, nil } + +func elideMVCCKeyPrefix( + key storage.MVCCKey, mode execinfrapb.ElidePrefix, +) (storage.MVCCKey, []byte, error) { + prefix, err := elidedPrefix(key.Key, mode) + if err != nil { + return storage.MVCCKey{}, nil, err + } + cutKey, ok := bytes.CutPrefix(key.Key, prefix) + if !ok { + return storage.MVCCKey{}, nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", key.Key, prefix) + } + key.Key = cutKey + return key, prefix, nil +} + +// sameAsFileSpan returns true if the span, start, and end match the span, start, and end of the BackupManifest_File. +func sameAsFileSpan( + span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File, +) bool { + return backupFile.Span.Equal(span) && backupFile.StartTime.Equal(start) && backupFile.EndTime.Equal(end) +} + +// extendsFileSpan returns true if a span can successfully contiguously extend the span of a BackupManifest_File. +func extendsFileSpan( + span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File, +) bool { + return backupFile.Span.EndKey.Equal(span.Key) && + backupFile.StartTime.Equal(start) && + backupFile.EndTime.Equal(end) +} diff --git a/pkg/backup/file_sst_sink_test.go b/pkg/backup/file_sst_sink_test.go index 5b705433d7c4..a2fef98064e9 100644 --- a/pkg/backup/file_sst_sink_test.go +++ b/pkg/backup/file_sst_sink_test.go @@ -909,6 +909,92 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { } } +func TestFileSSTSinkWriteKey(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + // Artificially set the target file size to 10KB for easy flushing + targetFileSize.Override(ctx, &st.SV, 10<<10) + + type testCase struct { + name string + exportKVs []mvccKVSet + flushedSpans []roachpb.Spans + unflushedSpans []roachpb.Spans + manualFlushSpans []roachpb.Spans + } + + for _, tt := range []testCase{ + { + name: "single-exported-span", + exportKVs: []mvccKVSet{ + newMVCCKeySetBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + }, + { + name: "single-exported-span-size-flush", + exportKVs: []mvccKVSet{ + newMVCCKeySetBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "b", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("b")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("b"), EndKey: s2k0("c")}}, + }, + }, + } { + for _, elide := range []execinfrapb.ElidePrefix{execinfrapb.ElidePrefix_None, execinfrapb.ElidePrefix_TenantAndTable} { + t.Run(fmt.Sprintf("%s/elide=%s", tt.name, elide), func(t *testing.T) { + sink, store := fileSSTSinkTestSetUp(ctx, t, st) + defer func() { + require.NoError(t, sink.Close()) + }() + sink.elideMode = elide + + for _, ek := range tt.exportKVs { + span := ek.span + for _, kv := range ek.kvs { + recordedSpan, err := sink.writeKey(ctx, kv.key, kv.value, span, ek.startTime, ek.endTime) + require.NoError(t, err) + span = recordedSpan + } + } + + progress := make([]backuppb.BackupManifest_File, 0) + Loop: + for { + select { + case p := <-sink.conf.progCh: + var progDetails backuppb.BackupManifest_Progress + if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { + t.Fatal(err) + } + + progress = append(progress, progDetails.Files...) + default: + break Loop + } + } + + eliding := sink.elideMode != execinfrapb.ElidePrefix_None + require.NoError(t, checkFiles(ctx, store, progress, tt.flushedSpans, eliding)) + + var actualUnflushedFiles []backuppb.BackupManifest_File + actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) + require.NoError(t, sink.flush(ctx)) + require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans, eliding)) + require.Empty(t, sink.flushedFiles) + }) + } + } +} + type kvAndTS struct { key string value []byte @@ -1127,3 +1213,88 @@ func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key, elide return false } + +type mvccKVSet struct { + span roachpb.Span + kvs []struct { + key storage.MVCCKey + value []byte + } + startTime hlc.Timestamp + endTime hlc.Timestamp +} + +type mvccKVSetBuilder struct { + spanStart string + spanEnd string + kvs []kvAndTS + startTime hlc.Timestamp + endTime hlc.Timestamp +} + +func newMVCCKeySetBuilder(spanStart string, spanEnd string) *mvccKVSetBuilder { + return &mvccKVSetBuilder{ + spanStart: spanStart, + spanEnd: spanEnd, + } +} + +func (b *mvccKVSetBuilder) withStartEndTS(startTime, endTime int64) *mvccKVSetBuilder { + b.startTime = hlc.Timestamp{WallTime: startTime} + b.endTime = hlc.Timestamp{WallTime: endTime} + return b +} + +func (b *mvccKVSetBuilder) withKVs(kvs []kvAndTS) *mvccKVSetBuilder { + b.kvs = kvs + if !b.startTime.IsEmpty() && !b.endTime.IsEmpty() { + return b + } + var minTS, maxTs int64 + for _, kv := range kvs { + if kv.timestamp < minTS || minTS == 0 { + minTS = kv.timestamp + } + if kv.timestamp > maxTs { + maxTs = kv.timestamp + } + } + b.startTime = hlc.Timestamp{WallTime: minTS} + b.endTime = hlc.Timestamp{WallTime: maxTs} + return b +} + +func (b *mvccKVSetBuilder) build() mvccKVSet { + return b.buildWithEncoding(s2k0) +} + +func (b *mvccKVSetBuilder) buildWithEncoding(stringToKey func(string) roachpb.Key) mvccKVSet { + var kvs []struct { + key storage.MVCCKey + value []byte + } + for _, kv := range b.kvs { + v := roachpb.Value{} + v.SetBytes(kv.value) + v.InitChecksum(nil) + kvs = append(kvs, struct { + key storage.MVCCKey + value []byte + }{ + key: storage.MVCCKey{ + Key: stringToKey(kv.key), + Timestamp: hlc.Timestamp{WallTime: kv.timestamp}, + }, + value: v.RawBytes, + }) + } + return mvccKVSet{ + span: roachpb.Span{ + Key: stringToKey(b.spanStart), + EndKey: stringToKey(b.spanEnd), + }, + kvs: kvs, + startTime: b.startTime, + endTime: b.endTime, + } +}