diff --git a/pkg/backup/backupsink/BUILD.bazel b/pkg/backup/backupsink/BUILD.bazel index af804ba9f115..828eedea6e22 100644 --- a/pkg/backup/backupsink/BUILD.bazel +++ b/pkg/backup/backupsink/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "file_sst_sink.go", "sink_utils.go", + "sst_sink_key_writer.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/backup/backupsink", visibility = ["//visibility:public"], @@ -44,6 +45,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/execinfrapb", "//pkg/storage", + "//pkg/testutils", "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/backup/backupsink/file_sst_sink.go b/pkg/backup/backupsink/file_sst_sink.go index d9db62e6a133..a8750ede843d 100644 --- a/pkg/backup/backupsink/file_sst_sink.go +++ b/pkg/backup/backupsink/file_sst_sink.go @@ -37,11 +37,6 @@ var ( settings.WithPublic) ) -type FileSSTSinkWriter interface { - io.Closer - Flush(context.Context) error -} - type ExportedSpan struct { Metadata backuppb.BackupManifest_File DataSST []byte @@ -84,7 +79,9 @@ type FileSSTSink struct { // flush. This counter resets on each flush. completedSpans int32 - elidePrefix roachpb.Key + // elidedPrefix represents the elided prefix of the last exportSpan/key written to the sink. + // This resets on each flush. + elidedPrefix roachpb.Key // stats contain statistics about the actions of the FileSSTSink over its // entire lifespan. @@ -98,7 +95,7 @@ type FileSSTSink struct { } // fileSpanByteLimit is the maximum size of a file span that can be extended. -const fileSpanByteLimit = 64 << 20 +var fileSpanByteLimit int64 = 64 << 20 func MakeFileSSTSink( conf SSTSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer, @@ -120,12 +117,12 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key // since it overlaps but SSTWriter demands writes in-order. if len(s.flushedFiles) > 0 { last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey - if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) { + if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidedPrefix) { log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s", s.outName, s.flushedSize, span, last, ) s.stats.oooFlushes++ - if err := s.flushFile(ctx); err != nil { + if err := s.Flush(ctx); err != nil { return nil, err } } @@ -137,7 +134,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key return nil, err } } - s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...) + s.elidedPrefix = append(s.elidedPrefix[:0], spanPrefix...) log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) @@ -145,7 +142,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key // then surface all the range keys and flush. // // TODO(msbutler): investigate using single a single iterator that surfaces - // all point keys first and then all range keys + // all point keys first and then all range keys. maxKey, err := s.copyPointKeys(ctx, resp.DataSST) if err != nil { return nil, err @@ -208,7 +205,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key if s.flushedSize > targetFileSize.Get(s.conf.Settings) && !s.midRow { s.stats.sizeFlushes++ log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) - if err := s.flushFile(ctx); err != nil { + if err := s.Flush(ctx); err != nil { return nil, err } } else { @@ -238,10 +235,6 @@ func (s *FileSSTSink) Close() error { } func (s *FileSSTSink) Flush(ctx context.Context) error { - return s.flushFile(ctx) -} - -func (s *FileSSTSink) flushFile(ctx context.Context) error { if s.out == nil { // If the writer was not initialized but the sink has reported completed // spans then there were empty ExportRequests that were processed by the @@ -310,7 +303,7 @@ func (s *FileSSTSink) flushFile(ctx context.Context) error { } s.flushedFiles = nil - s.elidePrefix = s.elidePrefix[:0] + s.elidedPrefix = s.elidedPrefix[:0] s.flushedSize = 0 s.flushedRevStart.Reset() s.completedSpans = 0 @@ -350,6 +343,7 @@ func (s *FileSSTSink) open(ctx context.Context) error { return nil } + func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachpb.Key, error) { iterOpts := storage.IterOptions{ KeyTypes: storage.IterKeyTypePointsOnly, @@ -376,9 +370,9 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp break } k := iter.UnsafeKey() - suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix) + suffix, ok := bytes.CutPrefix(k.Key, s.elidedPrefix) if !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix) + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidedPrefix) } k.Key = suffix @@ -457,11 +451,11 @@ func (s *FileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { maxKey = append(maxKey[:0], rk.EndKey...) } var ok bool - if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix) + if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidedPrefix); !ok { + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidedPrefix) } - if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok { - return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix) + if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidedPrefix); !ok { + return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidedPrefix) } if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil { return nil, err diff --git a/pkg/backup/backupsink/file_sst_sink_test.go b/pkg/backup/backupsink/file_sst_sink_test.go index 042f690229c9..e323a13a5bb6 100644 --- a/pkg/backup/backupsink/file_sst_sink_test.go +++ b/pkg/backup/backupsink/file_sst_sink_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 The Cockroach Authors. +// Copyright 2024 The Cockroach Authors. // // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -461,6 +462,10 @@ func s2k0(s string) roachpb.Key { return s2kWithColFamily(s, 0) } +func s2k1(s string) roachpb.Key { + return s2kWithColFamily(s, 1) +} + // TestFileSSTSinkStats tests the internal counters and stats of the FileSSTSink under // different write scenarios. func TestFileSSTSinkStats(t *testing.T) { @@ -909,12 +914,377 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { } } +func TestFileSSTSinkWriteKey(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + // Artificially set file size limits for testing. + defer testutils.HookGlobal(&fileSpanByteLimit, 8<<10)() + targetFileSize.Override(ctx, &st.SV, 16<<10) + + gtFileSizeVal := make([]byte, 10<<10) + gtSSTSizeVal := make([]byte, 20<<10) + + type testCase struct { + name string + exportKVs []*mvccKVSet + // spans of files that should have been flushed during WriteKey and before + // the final manual flush. + flushedSpans []roachpb.Spans + // any spans of files that will be flushed out by the manual flush. + unflushedSpans []roachpb.Spans + // If test requires specific elide modes only -- nil to use default elide modes. + elideModes []execinfrapb.ElidePrefix + } + + for _, tt := range []testCase{ + { + name: "single-span", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + }, + { + name: "single-span-size-flush-last-key", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtSSTSizeVal}}), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + }, + // Ensure that if the size is exceeded mid-span, a flush occurs. + { + name: "single-span-size-flush-mid-span", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10, value: gtSSTSizeVal}, {key: "b", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("b")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("b"), EndKey: s2k0("c")}}, + }, + }, + { + name: "double-size-flush-single-span", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "e").withKVs( + []kvAndTS{ + {key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtSSTSizeVal}, + {key: "c", timestamp: 10, value: gtSSTSizeVal}, {key: "d", timestamp: 10}, + }, + ), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + {{Key: s2k0("c"), EndKey: s2k0("d")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("d"), EndKey: s2k0("e")}}, + }, + }, + { + name: "double-ooo-span-flush", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}}), + newMVCCKeySet("b", "d").withKVs([]kvAndTS{{key: "b", timestamp: 10}}), + newMVCCKeySet("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + {{Key: s2k0("b"), EndKey: s2k0("d")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("c"), EndKey: s2k0("e")}}, + }, + }, + { + name: "size-ooo-span-and-ooo-key-flush", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "d").withKVs([]kvAndTS{ // size + {key: "a", timestamp: 10, value: gtSSTSizeVal}, + {key: "c", timestamp: 10}, + }).withStartEndTS(10, 15), + newMVCCKeySet("a", "d"). // ooo-key + withKVs([]kvAndTS{{key: "b", timestamp: 10}}). + withStartEndTS(10, 15), + newMVCCKeySet("c", "f"). // ooo-span + withKVs([]kvAndTS{{key: "e", timestamp: 10}}). + withStartEndTS(10, 15), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + {{Key: s2k0("c"), EndKey: s2k0("d")}}, + {{Key: s2k0("a"), EndKey: s2k0("d")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("c"), EndKey: s2k0("f")}}, + }, + }, + // Ensure that when keys are written out of order, flushes occur to avoid + // writing keys to SST out of order. + { + name: "single-span-ooo-key", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "a", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + }, + // Two spans that are contiguous with each other and have the same timestamp + // should be merged into one span. + { + name: "extend-metadata", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).withStartEndTS(5, 10), + newMVCCKeySet("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).withStartEndTS(5, 10), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("e")}}, + }, + }, + // Two spans that are contiguous with each other but don't have the same + // timestamp should not be merged. + { + name: "no-extend-metadata-diff-timestamp", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).withStartEndTS(5, 10), + newMVCCKeySet("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).withStartEndTS(5, 11), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}, {Key: s2k0("c"), EndKey: s2k0("e")}}, + }, + }, + // Two spans that have the same timestamp but are not contiguous should + // not be merged. + { + name: "no-extend-metadata-same-timestamp-not-contiguous", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).withStartEndTS(5, 10), + newMVCCKeySet("d", "f").withKVs([]kvAndTS{{key: "d", timestamp: 10}, {key: "e", timestamp: 10}}).withStartEndTS(5, 10), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}, {Key: s2k0("d"), EndKey: s2k0("f")}}, + }, + }, + // If a span overlaps its previous span, a flush must occur first. + { + name: "flush-from-overlapping-ooo-spans", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}), + newMVCCKeySet("b", "e").withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "c", timestamp: 10}, {key: "d", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("b"), EndKey: s2k0("e")}}, + }, + }, + // If a span precedes the previous span but does not overlap, a flush must + // occur first. + { + name: "flush-from-non-overlapping-ooo-spans", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}), + newMVCCKeySet("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("c"), EndKey: s2k0("e")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + }, + // Writing keys with different prefixes should flush the previous span. + { + name: "prefixes-differ-with-eliding", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("2/a", "2/c").withKVs([]kvAndTS{{key: "2/a", timestamp: 10}, {key: "2/b", timestamp: 10}}), + newMVCCKeySet("2/c", "2/e").withKVs([]kvAndTS{{key: "2/c", timestamp: 10}, {key: "2/d", timestamp: 10}}), + newMVCCKeySet("3/e", "3/g").withKVs([]kvAndTS{{key: "3/e", timestamp: 10}, {key: "3/f", timestamp: 10}}), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("2/a"), EndKey: s2k0("2/e")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("3/e"), EndKey: s2k0("3/g")}}, + }, + elideModes: []execinfrapb.ElidePrefix{execinfrapb.ElidePrefix_TenantAndTable}, + }, + // In-order different prefixes shouldn't force a flush if no eliding is done. + { + name: "prefixes-differ-with-no-eliding", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("2/a", "2/c").withKVs([]kvAndTS{{key: "2/a", timestamp: 10}, {key: "2/b", timestamp: 10}}), + newMVCCKeySet("2/c", "2/e").withKVs([]kvAndTS{{key: "2/c", timestamp: 10}, {key: "2/d", timestamp: 10}}), + newMVCCKeySet("3/e", "3/g").withKVs([]kvAndTS{{key: "3/e", timestamp: 10}, {key: "3/f", timestamp: 10}}), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("2/a"), EndKey: s2k0("2/e")}, {Key: s2k0("3/e"), EndKey: s2k0("3/g")}}, + }, + elideModes: []execinfrapb.ElidePrefix{execinfrapb.ElidePrefix_None}, + }, + // Flush does not occur if last key written is mid-row even if size exceeded. + { + name: "no-size-flush-if-mid-row", + exportKVs: []*mvccKVSet{ + newRawMVCCKeySet(s2k0("a"), s2kWithColFamily("b", 1)). + withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtSSTSizeVal}}). + withStartEndTS(10, 15), + newRawMVCCKeySet(s2kWithColFamily("b", 1), s2k0("d")). + withRawKVs([]mvccKV{ + kvAndTS{key: "b", timestamp: 10}.toMvccKV(s2k1), + }). + withStartEndTS(10, 15), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("d")}}, + }, + }, + // If size flush is blocked by mid-row key, the next key should cause a flush. + { + name: "size-flush-postponed-till-after-mid-row", + exportKVs: []*mvccKVSet{ + newRawMVCCKeySet(s2k0("a"), s2kWithColFamily("b", 1)). + withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtSSTSizeVal}}). + withStartEndTS(10, 15), + newRawMVCCKeySet(s2kWithColFamily("b", 1), s2k0("d")). + withRawKVs([]mvccKV{ + kvAndTS{key: "b", timestamp: 10}.toMvccKV(s2k1), + kvAndTS{key: "c", timestamp: 10}.toMvccKV(s2k0), + }). + withStartEndTS(10, 15), + }, + flushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("c"), EndKey: s2k0("d")}}, + }, + }, + // It is safe to flush at the range boundary. + { + name: "size-flush-at-range-boundary", + exportKVs: []*mvccKVSet{ + newRawMVCCKeySet(s2k0("a"), s2k("c")). + withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtSSTSizeVal}}), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k("c")}}, + }, + }, + // If fileSpanByteLimit is reached, extending the file should be prevented + // and a new file created. + { + name: "no-extend-due-to-manifest-size-limit", + exportKVs: []*mvccKVSet{ + newMVCCKeySet("a", "d"). + withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtFileSizeVal}, {key: "c", timestamp: 10}}), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}, {Key: s2k0("c"), EndKey: s2k0("d")}}, + }, + }, + // If fileSpanByteLimit is reached but the last key written was mid-row, + // the file must be extended. + { + name: "extend-mid-row-despite-manifest-size-limit", + exportKVs: []*mvccKVSet{ + newRawMVCCKeySet(s2k0("a"), s2kWithColFamily("b", 1)). + withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10, value: gtFileSizeVal}}). + withStartEndTS(10, 15), + newRawMVCCKeySet(s2kWithColFamily("b", 1), s2k0("d")). + withRawKVs([]mvccKV{ + kvAndTS{key: "b", timestamp: 10}.toMvccKV(s2k1), + kvAndTS{key: "c", timestamp: 10}.toMvccKV(s2k0), + }). + withStartEndTS(10, 15), + }, + unflushedSpans: []roachpb.Spans{ + {{Key: s2k0("a"), EndKey: s2k0("c")}, {Key: s2k0("c"), EndKey: s2k0("d")}}, + }, + }, + } { + elideModes := []execinfrapb.ElidePrefix{execinfrapb.ElidePrefix_None, execinfrapb.ElidePrefix_TenantAndTable} + if tt.elideModes != nil { + elideModes = tt.elideModes + } + for _, elide := range elideModes { + t.Run(fmt.Sprintf("%s/elide=%s", tt.name, elide), func(t *testing.T) { + sink, store := sstSinkKeyWriterTestSetup(t, st) + defer func() { + require.NoError(t, sink.Close()) + }() + sink.conf.ElideMode = elide + + for _, ek := range tt.exportKVs { + span := ek.span + for _, kv := range ek.kvs { + recordedSpan, err := sink.WriteKey(ctx, kv.key, kv.value, span, ek.startTime, ek.endTime) + require.NoError(t, err) + span = recordedSpan + } + } + + progress := make([]backuppb.BackupManifest_File, 0) + Loop: + for { + select { + case p := <-sink.conf.ProgCh: + var progDetails backuppb.BackupManifest_Progress + if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { + t.Fatal(err) + } + + progress = append(progress, progDetails.Files...) + default: + break Loop + } + } + + eliding := sink.conf.ElideMode != execinfrapb.ElidePrefix_None + require.NoError(t, checkFiles(ctx, store, progress, tt.flushedSpans, eliding)) + + var actualUnflushedFiles []backuppb.BackupManifest_File + actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) + require.NoError(t, sink.Flush(ctx)) + require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans, eliding)) + require.Empty(t, sink.flushedFiles) + }) + } + } +} + type kvAndTS struct { key string value []byte timestamp int64 } +func (k kvAndTS) toMvccKV(enc func(string) roachpb.Key) mvccKV { + v := roachpb.Value{} + v.SetBytes(k.value) + v.InitChecksum(nil) + return mvccKV{ + key: storage.MVCCKey{ + Key: enc(k.key), + Timestamp: hlc.Timestamp{WallTime: k.timestamp}, + }, + value: v.RawBytes, + } +} + type rangeKeyAndTS struct { key string endKey string @@ -1031,21 +1401,29 @@ func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb func fileSSTSinkTestSetup( t *testing.T, settings *cluster.Settings, ) (*FileSSTSink, cloud.ExternalStorage) { + conf, store := sinkTestSetup(t, settings) + sink := MakeFileSSTSink(conf, store, nil /* pacer */) + return sink, store +} - store := nodelocal.TestingMakeNodelocalStorage(t.TempDir(), settings, cloudpb.ExternalStorage{}) +func sstSinkKeyWriterTestSetup( + t *testing.T, settings *cluster.Settings, +) (*SSTSinkKeyWriter, cloud.ExternalStorage) { + conf, store := sinkTestSetup(t, settings) + sink := MakeSSTSinkKeyWriter(conf, store, nil /* pacer */) + return sink, store +} - // Never block. +func sinkTestSetup(t *testing.T, settings *cluster.Settings) (SSTSinkConf, cloud.ExternalStorage) { + store := nodelocal.TestingMakeNodelocalStorage(t.TempDir(), settings, cloudpb.ExternalStorage{}) progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 100) - sinkConf := SSTSinkConf{ ID: 1, Enc: nil, ProgCh: progCh, Settings: &settings.SV, } - - sink := MakeFileSSTSink(sinkConf, store, nil /* pacer */) - return sink, store + return sinkConf, store } func checkFiles( @@ -1127,3 +1505,76 @@ func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key, elide return false } + +type mvccKV struct { + key storage.MVCCKey + value []byte +} + +type mvccKVSet struct { + span roachpb.Span + kvs []mvccKV + startTime hlc.Timestamp + endTime hlc.Timestamp +} + +func newMVCCKeySet(spanStart string, spanEnd string) *mvccKVSet { + return &mvccKVSet{ + span: roachpb.Span{ + Key: s2k0(spanStart), + EndKey: s2k0(spanEnd), + }, + } +} + +func newRawMVCCKeySet(spanStart roachpb.Key, spanEnd roachpb.Key) *mvccKVSet { + return &mvccKVSet{ + span: roachpb.Span{ + Key: spanStart, + EndKey: spanEnd, + }, + } +} + +func (b *mvccKVSet) withStartEndTS(startTime, endTime int64) *mvccKVSet { + b.startTime = hlc.Timestamp{WallTime: startTime} + b.endTime = hlc.Timestamp{WallTime: endTime} + return b +} + +func (b *mvccKVSet) withKVs(kvs []kvAndTS) *mvccKVSet { + return b.withKVsAndEncoding(kvs, s2k0) +} + +func (b *mvccKVSet) withKVsAndEncoding(kvs []kvAndTS, enc func(string) roachpb.Key) *mvccKVSet { + rawKVs := make([]mvccKV, 0, len(kvs)) + for _, kv := range kvs { + v := roachpb.Value{} + v.SetBytes(kv.value) + v.InitChecksum(nil) + rawKVs = append(rawKVs, mvccKV{ + key: storage.MVCCKey{ + Key: enc(kv.key), + Timestamp: hlc.Timestamp{WallTime: kv.timestamp}, + }, + value: v.RawBytes, + }) + } + return b.withRawKVs(rawKVs) +} + +func (b *mvccKVSet) withRawKVs(kvs []mvccKV) *mvccKVSet { + b.kvs = kvs + var minTS, maxTS hlc.Timestamp + for _, kv := range kvs { + if minTS.IsEmpty() || kv.key.Timestamp.Less(minTS) { + minTS = kv.key.Timestamp + } + if maxTS.IsEmpty() || maxTS.Less(kv.key.Timestamp) { + maxTS = kv.key.Timestamp + } + } + b.startTime = minTS + b.endTime = maxTS + return b +} diff --git a/pkg/backup/backupsink/sink_utils.go b/pkg/backup/backupsink/sink_utils.go index fb483486fc6b..411e8fa91cf2 100644 --- a/pkg/backup/backupsink/sink_utils.go +++ b/pkg/backup/backupsink/sink_utils.go @@ -6,13 +6,18 @@ package backupsink import ( + "bytes" "fmt" + "github.com/cockroachdb/cockroach/pkg/backup/backuppb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" ) // ElidedPrefix returns the prefix of the key that is elided by the given mode. @@ -35,6 +40,21 @@ func ElidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) return nil, nil } +func elideMVCCKeyPrefix( + key storage.MVCCKey, mode execinfrapb.ElidePrefix, +) (storage.MVCCKey, []byte, error) { + prefix, err := ElidedPrefix(key.Key, mode) + if err != nil { + return storage.MVCCKey{}, nil, err + } + cutKey, ok := bytes.CutPrefix(key.Key, prefix) + if !ok { + return storage.MVCCKey{}, nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", key.Key, prefix) + } + key.Key = cutKey + return key, prefix, nil +} + // adjustFileEndKey checks if the export respsonse end key can be used as a // split point during restore. If the end key is not splitable (i.e. it splits // two column families in the same row), the function will attempt to adjust the @@ -82,3 +102,19 @@ func generateUniqueSSTName(nodeID base.SQLInstanceID) string { return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID))) } + +// sameAsFileSpan returns true if the span, start, and end match the span, start, and end of the BackupManifest_File. +func sameAsFileSpan( + span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File, +) bool { + return backupFile.Span.Equal(span) && backupFile.StartTime.Equal(start) && backupFile.EndTime.Equal(end) +} + +// extendsFileSpan returns true if a span can successfully contiguously extend the span of a BackupManifest_File. +func extendsFileSpan( + span roachpb.Span, start, end hlc.Timestamp, backupFile *backuppb.BackupManifest_File, +) bool { + return backupFile.Span.EndKey.Equal(span.Key) && + backupFile.StartTime.Equal(start) && + backupFile.EndTime.Equal(end) +} diff --git a/pkg/backup/backupsink/sst_sink_key_writer.go b/pkg/backup/backupsink/sst_sink_key_writer.go new file mode 100644 index 000000000000..859a6a4aa65e --- /dev/null +++ b/pkg/backup/backupsink/sst_sink_key_writer.go @@ -0,0 +1,301 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package backupsink + +import ( + "bytes" + "context" + + "github.com/cockroachdb/cockroach/pkg/backup/backuppb" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +type SSTSinkKeyWriter struct { + FileSSTSink + // prevKey represents the last key written using WriteKey. When writing a new + // key, this helps determine if the last key written was mid-row. This resets + // on each flush. + prevKey roachpb.Key +} + +func MakeSSTSinkKeyWriter( + conf SSTSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer, +) *SSTSinkKeyWriter { + return &SSTSinkKeyWriter{ + FileSSTSink: *MakeFileSSTSink(conf, dest, pacer), + } +} + +// WriteKey writes a single key to the SST file. The key should be the full key. +// span refers to the span that the key belongs to. start and end are the time +// bounds of the span being backed up. The writing of each key could +// potentially split up the span of the previously written key (if both keys +// were part of the same span and the span had to be split). As a consequence +// the span that is recorded for the new key is returned, as it may be a subspan +// of the span passed in for the key. +// +// flush should be called after the last key is written to ensure that the SST +// is written to the destination. +func (s *SSTSinkKeyWriter) WriteKey( + ctx context.Context, + key storage.MVCCKey, + value []byte, + span roachpb.Span, + startTime hlc.Timestamp, + endTime hlc.Timestamp, +) (roachpb.Span, error) { + if key.Key.Compare(span.Key) < 0 || key.Key.Compare(span.EndKey) >= 0 { + return roachpb.Span{}, errors.AssertionFailedf("key %s outside of span %v", key.Key, span) + } + if err := s.maybeFlushOOOSpans(ctx, span, startTime, endTime, true /* skipSameSpan */); err != nil { + return roachpb.Span{}, err + } + if err := s.maybeFlushOOOKey(ctx, key.Key); err != nil { + return roachpb.Span{}, err + } + s.setMidRowForPrevKey(key.Key) + + span, err := s.maybeDoSizeFlushWithNextKey(ctx, key.Key, span, startTime, endTime) + if err != nil { + return roachpb.Span{}, err + } + + // At this point, we can make the following assumptions about the new key and + // the span it belongs to: + // - The new key comes after the previous key. + // - The new span comes after the previous span, although it may not + // contiguously extend the previous span. + if s.out == nil { + if err := s.open(ctx); err != nil { + return roachpb.Span{}, err + } + } + + elidedKey, prefix, err := elideMVCCKeyPrefix(key, s.conf.ElideMode) + if err != nil { + return roachpb.Span{}, err + } + s.elidedPrefix = append(s.elidedPrefix[:0], prefix...) + if err := s.sst.PutRawMVCC(elidedKey, value); err != nil { + return roachpb.Span{}, err + } + + extend, err := s.shouldExtendLastFile(span, startTime, endTime, true /* extendSameSpan */) + if err != nil { + return roachpb.Span{}, err + } + keyAsRowCount := roachpb.RowCount{ + DataSize: int64(len(key.Key)) + int64(len(value)), + } + + var lastFile *backuppb.BackupManifest_File + if len(s.flushedFiles) > 0 { + lastFile = &s.flushedFiles[len(s.flushedFiles)-1] + } + + if extend { + lastFile.Span.EndKey = span.EndKey + span = lastFile.Span + lastFile.EntryCounts.Add(keyAsRowCount) + s.stats.spanGrows++ + } else { + if lastFile != nil && sameAsFileSpan(span, startTime, endTime, lastFile) { + // The new span covers the same span as the last BackupManifest_File, yet + // we are not extending. That means we are splitting due to the + // fileSpanByteLimit. As such, the current key can serve as an exclusive + // end key for the last file and an inclusive start key for the new file. + lastFile.Span.EndKey = key.Key + span.Key = key.Key + } + f := backuppb.BackupManifest_File{ + Span: span, + EntryCounts: roachpb.RowCount{ + DataSize: int64(len(key.Key)) + int64(len(value)), + }, + StartTime: startTime, + EndTime: endTime, + Path: s.outName, + } + s.flushedFiles = append(s.flushedFiles, f) + s.completedSpans++ + } + + s.flushedSize += keyAsRowCount.DataSize + s.prevKey = append(s.prevKey[:0], key.Key...) + + return span, nil +} + +func (s *SSTSinkKeyWriter) Flush(ctx context.Context) error { + if len(s.flushedFiles) > 0 { + lastFile := s.flushedFiles[len(s.flushedFiles)-1] + // If WriteKey was used, it is possible that the last written key was + // mid-row, but because no keys were written after, s.midRow was not updated. + // To ensure this, we update midRow using the end of the span that the last + // key belonged to. + s.setMidRowForPrevKey(lastFile.Span.EndKey) + } + err := s.FileSSTSink.Flush(ctx) + s.prevKey = nil + return err +} + +// TODO (kev-cao): Most of these `maybeFlushX` functions can be generalized to +// other SSTSink implementations. When we get around to refactoring out the +// other implementation, we should consider moving these functions into the base +// FileSSTSink implementation. + +// maybeFlushOOOSpans checks if the new span's startKey precedes the last spans' +// endKey. If it does, the files are flushed as the underlying SST writer +// expects keys in order. If `skipSameSpan` is true, the files are not flushed +// if the new span is the same as the last span with the same time bounds. +func (s *SSTSinkKeyWriter) maybeFlushOOOSpans( + ctx context.Context, newSpan roachpb.Span, startTime, endTime hlc.Timestamp, skipSameSpan bool, +) error { + if len(s.flushedFiles) == 0 { + return nil + } + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + if skipSameSpan && sameAsFileSpan(newSpan, startTime, endTime, lastFile) { + return nil + } + if newSpan.Key.Compare(lastFile.Span.EndKey) < 0 { + log.VEventf( + ctx, 1, "flushing backup file %s of size %d because span %v overlaps with %v", + s.outName, s.flushedSize, newSpan, lastFile.Span, + ) + s.stats.oooFlushes++ + if err := s.Flush(s.ctx); err != nil { + return err + } + } + return nil +} + +// maybeFlushOOOKey checks if the new key is out of order with respect to the +// last key written using WriteKey to the SST file. If it is, the file is +// flushed and a new one is created. +func (s *SSTSinkKeyWriter) maybeFlushOOOKey(ctx context.Context, newKey roachpb.Key) error { + if s.prevKey == nil { + return nil + } + keyPrefix, err := ElidedPrefix(newKey, s.conf.ElideMode) + if err != nil { + return err + } + if newKey.Compare(s.prevKey) < 0 || !bytes.Equal(keyPrefix, s.elidedPrefix) { + log.VEventf( + ctx, 1, "flushing backup file %s of size %d because key %s cannot append before %s", + s.outName, s.flushedSize, newKey, s.prevKey, + ) + s.stats.oooFlushes++ + if err := s.Flush(s.ctx); err != nil { + return err + } + } + return nil +} + +// maybeDoSizeFlush checks if the size of the current backup file exceeds the +// targetFileSize and flushes the file if it does. This function should only be +// called after s.midRow has been updated for the last key/span written. +func (s *SSTSinkKeyWriter) maybeDoSizeFlush(ctx context.Context) error { + if s.shouldDoSizeFlush() { + return s.doSizeFlush(ctx) + } else { + log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) + return nil + } +} + +// maybeDoSizeFlushWithNextKey is identical to maybeDoSizeFlush, except info +// about the next key to be written is provided. This is used to determine if +// the next key can act as an exclusive end key for the last BackupManifest_File. +// The updated span of the next key is returned, as it may be set to contiguously +// extend the last span. This function should only be called after s.midRow has +// been updated for the last key/span written. +func (s *SSTSinkKeyWriter) maybeDoSizeFlushWithNextKey( + ctx context.Context, nextKey roachpb.Key, nextSpan roachpb.Span, startTime, endTime hlc.Timestamp, +) (roachpb.Span, error) { + if len(s.flushedFiles) == 0 || !s.shouldDoSizeFlush() { + log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) + return nextSpan, nil + } + // If the span of the next key to be written covers the same span as the last + // BackupManifest_File, then the next key can act as an exclusive end key for + // the last file's span. Additionally, the span of the nextKey will be updated + // so that it contiguously extends with the last span. + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + if sameAsFileSpan(nextSpan, startTime, endTime, lastFile) { + lastFile.Span.EndKey = nextKey + nextSpan.Key = nextKey + } + return nextSpan, s.doSizeFlush(ctx) +} + +// shouldDoSizeFlush returns true if the current backup file exceeds the +// targetFileSize and we are currently not mid-row. Should only be called after +// s.midRow has been updated for the last key/span written. +func (s *SSTSinkKeyWriter) shouldDoSizeFlush() bool { + return s.flushedSize > targetFileSize.Get(s.conf.Settings) && !s.midRow +} + +// doSizeFlush flushes the buffered files with size exceeded as the reason. +func (s *SSTSinkKeyWriter) doSizeFlush(ctx context.Context) error { + s.stats.sizeFlushes++ + log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) + return s.Flush(ctx) +} + +// shouldExtendLastFile returns true if the new span can be added to the last +// span in the flushedFiles slice. If the last written key was mid-row, the new +// span is always considered to extend the last span. If the new span is a +// contiguous extension of the last span, it is also considered to extend the +// last span. extendSameSpan determines if an identical span should be considered +// an extension of the last span. This should only be called after s.midRow has +// been updated for the last key/span written. +func (s *SSTSinkKeyWriter) shouldExtendLastFile( + span roachpb.Span, startTime, endTime hlc.Timestamp, extendSameSpan bool, +) (bool, error) { + if len(s.flushedFiles) == 0 { + return false, nil + } + lastFile := &s.flushedFiles[len(s.flushedFiles)-1] + canExtendFile := extendsFileSpan(span, startTime, endTime, lastFile) || + (extendSameSpan && sameAsFileSpan(span, startTime, endTime, lastFile)) + if s.midRow { + if !canExtendFile { + return false, errors.AssertionFailedf( + "last key was mid-row, but new span %v does not extend last span %v", span, lastFile.Span, + ) + } + return true, nil + } + return canExtendFile && lastFile.EntryCounts.DataSize < fileSpanByteLimit, nil +} + +// setMidRowForPrevKey checks if the last key written using WriteKey was mid-row +// by using the next key to be written or e end key of its span and sets s.midRow +// accordingly. If no key was previously written using WriteKey, this is a no-op. +func (s *SSTSinkKeyWriter) setMidRowForPrevKey(endKey roachpb.Key) { + if s.prevKey == nil { + return + } + endRowKey, err := keys.EnsureSafeSplitKey(endKey) + if err != nil { + // If the key does not parse a family key, it must be from reaching the end + // of a range and be a range boundary. + return + } + s.midRow = s.prevKey.Compare(endRowKey) > 0 +} diff --git a/pkg/testutils/hook.go b/pkg/testutils/hook.go index b3fa4d575dae..ac2e50d1306e 100644 --- a/pkg/testutils/hook.go +++ b/pkg/testutils/hook.go @@ -17,3 +17,12 @@ func TestingHook(ptr, val interface{}) func() { global.Set(reflect.ValueOf(val)) return func() { global.Set(orig) } } + +// HookGlobal provides a way to temporarily set a package-global variable to a +// new value for the duration of a test. It returns a closure that restores the +// original value. +func HookGlobal[T any](ptr *T, val T) func() { + orig := *ptr + *ptr = val + return func() { *ptr = orig } +}