Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: sst sink implementation for writing keys #137565

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/backup/backupsink/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"file_sst_sink.go",
"sink_utils.go",
"sst_sink_key_writer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/backup/backupsink",
visibility = ["//visibility:public"],
Expand All @@ -31,7 +32,10 @@ go_library(

go_test(
name = "backupsink_test",
srcs = ["file_sst_sink_test.go"],
srcs = [
"file_sst_sink_test.go",
"sst_sink_key_writer_test.go",
],
embed = [":backupsink"],
deps = [
"//pkg/backup/backuppb",
Expand All @@ -44,6 +48,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/execinfrapb",
"//pkg/storage",
"//pkg/testutils",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
46 changes: 22 additions & 24 deletions pkg/backup/backupsink/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ var (
settings.WithPublic)
)

type FileSSTSinkWriter interface {
io.Closer
Flush(context.Context) error
}

type ExportedSpan struct {
Metadata backuppb.BackupManifest_File
DataSST []byte
Expand Down Expand Up @@ -84,7 +79,9 @@ type FileSSTSink struct {
// flush. This counter resets on each flush.
completedSpans int32

elidePrefix roachpb.Key
// elidedPrefix represents the elided prefix of the last exportSpan/key written to the sink.
// This resets on each flush.
elidedPrefix roachpb.Key

// stats contain statistics about the actions of the FileSSTSink over its
// entire lifespan.
Expand All @@ -98,12 +95,16 @@ type FileSSTSink struct {
}

// fileSpanByteLimit is the maximum size of a file span that can be extended.
const fileSpanByteLimit = 64 << 20
var fileSpanByteLimit int64 = 64 << 20

func MakeFileSSTSink(
conf SSTSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer,
) *FileSSTSink {
return &FileSSTSink{conf: conf, dest: dest, pacer: pacer}
return &FileSSTSink{
conf: conf,
dest: dest,
pacer: pacer,
}
}

func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key, error) {
Expand All @@ -120,12 +121,12 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key
// since it overlaps but SSTWriter demands writes in-order.
if len(s.flushedFiles) > 0 {
last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) {
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidedPrefix) {
log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s",
s.outName, s.flushedSize, span, last,
)
s.stats.oooFlushes++
if err := s.flushFile(ctx); err != nil {
if err := s.Flush(ctx); err != nil {
return nil, err
}
}
Expand All @@ -137,15 +138,15 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key
return nil, err
}
}
s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...)
s.elidedPrefix = append(s.elidedPrefix[:0], spanPrefix...)

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

// To speed up SST reading, surface all the point keys first, flush,
// then surface all the range keys and flush.
//
// TODO(msbutler): investigate using single a single iterator that surfaces
// all point keys first and then all range keys
// all point keys first and then all range keys.
maxKey, err := s.copyPointKeys(ctx, resp.DataSST)
if err != nil {
return nil, err
Expand Down Expand Up @@ -208,7 +209,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key
if s.flushedSize > targetFileSize.Get(s.conf.Settings) && !s.midRow {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
if err := s.flushFile(ctx); err != nil {
if err := s.Flush(ctx); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -238,10 +239,6 @@ func (s *FileSSTSink) Close() error {
}

func (s *FileSSTSink) Flush(ctx context.Context) error {
return s.flushFile(ctx)
}

func (s *FileSSTSink) flushFile(ctx context.Context) error {
if s.out == nil {
// If the writer was not initialized but the sink has reported completed
// spans then there were empty ExportRequests that were processed by the
Expand Down Expand Up @@ -310,7 +307,7 @@ func (s *FileSSTSink) flushFile(ctx context.Context) error {
}

s.flushedFiles = nil
s.elidePrefix = s.elidePrefix[:0]
s.elidedPrefix = s.elidedPrefix[:0]
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0
Expand Down Expand Up @@ -350,6 +347,7 @@ func (s *FileSSTSink) open(ctx context.Context) error {

return nil
}

func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachpb.Key, error) {
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
Expand All @@ -376,9 +374,9 @@ func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp
break
}
k := iter.UnsafeKey()
suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix)
suffix, ok := bytes.CutPrefix(k.Key, s.elidedPrefix)
if !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix)
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidedPrefix)
}
k.Key = suffix

Expand Down Expand Up @@ -457,11 +455,11 @@ func (s *FileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) {
maxKey = append(maxKey[:0], rk.EndKey...)
}
var ok bool
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix)
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidedPrefix)
}
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix)
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidedPrefix); !ok {
return nil, errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidedPrefix)
}
if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil {
return nil, err
Expand Down
59 changes: 39 additions & 20 deletions pkg/backup/backupsink/file_sst_sink_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) {

st := cluster.MakeTestingClusterSettings()
targetFileSize.Override(ctx, &st.SV, 20)
sink, _ := fileSSTSinkTestSetup(t, st)
sink, _ := fileSSTSinkTestSetup(t, st, execinfrapb.ElidePrefix_None)

resumeKey, err := sink.Write(ctx, exportResponse1)
require.NoError(t, err)
Expand Down Expand Up @@ -381,11 +381,10 @@ func TestFileSSTSinkWrite(t *testing.T) {
targetFileSize.Override(ctx, &st.SV, 10<<10)
}

sink, store := fileSSTSinkTestSetup(t, st)
sink, store := fileSSTSinkTestSetup(t, st, elide)
defer func() {
require.NoError(t, sink.Close())
}()
sink.conf.ElideMode = elide

var resumeKey roachpb.Key
var err error
Expand Down Expand Up @@ -461,6 +460,10 @@ func s2k0(s string) roachpb.Key {
return s2kWithColFamily(s, 0)
}

func s2k1(s string) roachpb.Key {
return s2kWithColFamily(s, 1)
}

// TestFileSSTSinkStats tests the internal counters and stats of the FileSSTSink under
// different write scenarios.
func TestFileSSTSinkStats(t *testing.T) {
Expand All @@ -472,7 +475,7 @@ func TestFileSSTSinkStats(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
targetFileSize.Override(ctx, &st.SV, 10<<10)

sink, _ := fileSSTSinkTestSetup(t, st)
sink, _ := fileSSTSinkTestSetup(t, st, execinfrapb.ElidePrefix_None)

defer func() {
require.NoError(t, sink.Close())
Expand Down Expand Up @@ -915,6 +918,19 @@ type kvAndTS struct {
timestamp int64
}

func (k kvAndTS) toMVCCKV(enc func(string) roachpb.Key) mvccKV {
v := roachpb.Value{}
v.SetBytes(k.value)
v.InitChecksum(nil)
return mvccKV{
key: storage.MVCCKey{
Key: enc(k.key),
Timestamp: hlc.Timestamp{WallTime: k.timestamp},
},
value: v.RawBytes,
}
}

type rangeKeyAndTS struct {
key string
endKey string
Expand Down Expand Up @@ -1029,31 +1045,34 @@ func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb
}

func fileSSTSinkTestSetup(
t *testing.T, settings *cluster.Settings,
t *testing.T, settings *cluster.Settings, elideMode execinfrapb.ElidePrefix,
) (*FileSSTSink, cloud.ExternalStorage) {
conf, store := sinkTestSetup(t, settings, elideMode)
sink := MakeFileSSTSink(conf, store, nil /* pacer */)
return sink, store
}

func sinkTestSetup(
t *testing.T, settings *cluster.Settings, elideMode execinfrapb.ElidePrefix,
) (SSTSinkConf, cloud.ExternalStorage) {
store := nodelocal.TestingMakeNodelocalStorage(t.TempDir(), settings, cloudpb.ExternalStorage{})

// Never block.
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 100)

sinkConf := SSTSinkConf{
ID: 1,
Enc: nil,
ProgCh: progCh,
Settings: &settings.SV,
ID: 1,
Enc: nil,
ProgCh: progCh,
Settings: &settings.SV,
ElideMode: elideMode,
}

sink := MakeFileSSTSink(sinkConf, store, nil /* pacer */)
return sink, store
return sinkConf, store
}

func checkFiles(
ctx context.Context,
store cloud.ExternalStorage,
files []backuppb.BackupManifest_File,
expectedFileSpans []roachpb.Spans,
elided bool,
eliding bool,
) error {
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
Expand Down Expand Up @@ -1101,7 +1120,7 @@ func checkFiles(

key := iter.UnsafeKey()

if !endKeyInclusiveSpansContainsKey(spans, key.Key, elided) {
if !endKeyInclusiveSpansContainsKey(spans, key.Key, eliding) {
return errors.Newf("key %v in file %s not contained by its spans [%v]", key.Key, f, spans)
}
}
Expand All @@ -1111,9 +1130,9 @@ func checkFiles(
return nil
}

func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key, elided bool) bool {
func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key, eliding bool) bool {
for _, sp := range spans {
if elided {
if eliding {
sp.Key, _ = keys.StripTablePrefix(sp.Key)
sp.EndKey, _ = keys.StripTablePrefix(sp.EndKey)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/backup/backupsink/sink_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package backupsink

import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -82,3 +83,22 @@ func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
return fmt.Sprintf("data/%d.sst",
builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID)))
}

// isContiguousSpan returns true if the first span ends where the second span begins.
func isContiguousSpan(first, second roachpb.Span) bool {
return first.EndKey.Equal(second.Key)
}

// sameElidedPrefix returns true if the elided prefix of a and b are equal based
// on the given mode.
func sameElidedPrefix(a, b roachpb.Key, mode execinfrapb.ElidePrefix) (bool, error) {
prefixA, err := ElidedPrefix(a, mode)
if err != nil {
return false, err
}
prefixB, err := ElidedPrefix(b, mode)
if err != nil {
return false, err
}
return bytes.Equal(prefixA, prefixB), nil
}
Loading
Loading