Skip to content

Commit

Permalink
changefeedccl: migrate pts records to include new tables
Browse files Browse the repository at this point in the history
Update the pts records of running feeds when the
set of targets changes, such as when we add new
system tables to protect.

Fixes: #133578
Part of: #128806

Release note (general change): The pts records of
running feeds are now updated when the set of
targets changes, such as when system tables
are added to the protected tables list.
  • Loading branch information
asg0451 committed Dec 19, 2024
1 parent 95460dd commit e0f0b53
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 63 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_golang_mock//gomock",
"@com_github_ibm_sarama//:sarama",
Expand Down
74 changes: 50 additions & 24 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1820,40 +1821,65 @@ func (cf *changeFrontier) manageProtectedTimestamps(
return false, err
}

if rec.Target != nil {
// Only update the PTS timestamp if it is lagging behind the high
// watermark. This is to prevent a rush of updates to the PTS if the
// changefeed restarts, which can cause contention and second order effects
// on system tables.
if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) {
// If this changefeed was created in 22.1 or earlier, it may be using a deprecated pts record in which
// the target field is nil. If so, we "migrate" it to use the new style of pts records and delete the old one.
if rec.Target == nil {
if preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts(); preserveDeprecatedPts {
return false, nil
}

log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
return false, err
}
return true, nil
}

// If this changefeed was created in 22.1 or earlier, it may be using a deprecated pts record in which
// the target field is nil. If so, we "migrate" it to use the new style of pts records and delete the old one.
preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts()
if !preserveDeprecatedPts {
prevRecordId := progress.ProtectedTimestampRecord
ptr := createProtectedTimestampRecord(
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater,
)
if err := pts.Protect(ctx, ptr); err != nil {
return false, err
// If we've identified more tables that need to be protected since this
// changefeed was created, it will be missing here. If so, we "migrate" it
// to include all the appropriate targets.
if targets := AllTargets(cf.spec.Feed); !makeTargetToProtect(targets).Equal(rec.Target) {
if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets {
return false, nil
}
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
if err := pts.Release(ctx, prevRecordId); err != nil {
if err := cf.remakePTSRecord(ctx, pts, progress, highWater); err != nil {
return false, err
}
return true, nil
}

log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v",
progress.ProtectedTimestampRecord, prevRecordId, highWater)
// Only update the PTS timestamp if it is lagging behind the high
// watermark. This is to prevent a rush of updates to the PTS if the
// changefeed restarts, which can cause contention and second order effects
// on system tables.
if !rec.Timestamp.AddDuration(ptsUpdateLag).Less(highWater) {
return false, nil
}

return true, nil
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
}

func (cf *changeFrontier) remakePTSRecord(
ctx context.Context,
pts protectedts.Storage,
progress *jobspb.ChangefeedProgress,
resolved hlc.Timestamp,
) error {
prevRecordId := progress.ProtectedTimestampRecord
ptr := createProtectedTimestampRecord(
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), resolved,
)
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
if err := pts.Release(ctx, prevRecordId); err != nil {
return err
}

log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v",
progress.ProtectedTimestampRecord, prevRecordId, resolved)

return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
Expand Down
208 changes: 169 additions & 39 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package changefeedccl
import (
"context"
"fmt"
"slices"
"sync/atomic"
"testing"
"time"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -562,6 +565,121 @@ func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {

}

// TestChangefeedUpdateProtectedTimestampTargets tests that changefeeds will
// remake their PTS records if they detect that they lack required targets.
func TestChangefeedMigratesProtectedTimestampTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
ctx := context.Background()

dontMigrate := atomic.Bool{}
dontMigrate.Store(true)
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.PreservePTSTargets = func() bool {
return dontMigrate.Load()
}

ptsInterval := 50 * time.Millisecond
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)
changefeedbase.ProtectTimestampLag.Override(
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t))

sysDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'")
sysDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`)
defer closeFeed(t, foo)

registry := s.Server.JobRegistry().(*jobs.Registry)
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
ptp := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.SystemServer.DB(), s.Codec, "d", "foo")
fooID := fooDesc.GetID()

jobFeed := foo.(cdctest.EnterpriseTestFeed)

// removes table 3 from the target of the PTS record.
removeOnePTSTarget := func(recordID uuid.UUID) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
s := `select target from system.protected_ts_records where id = $1`
datums, err := txn.QueryRowEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride, s, recordID)
require.NoError(t, err)
j := tree.MustBeDBytes(datums[0])

target := &ptpb.Target{}
require.NoError(t, protoutil.Unmarshal([]byte(j), target))

// remove '3' (system.descriptor) to simulate a missing system table
ids := target.GetSchemaObjects().IDs
idx := slices.Index(ids, catid.DescID(3))
target.GetSchemaObjects().IDs = slices.Delete(ids, idx, idx+1)

bs, err := protoutil.Marshal(target)
require.NoError(t, err)

_, err = txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride,
"UPDATE system.protected_ts_records SET target = $1 WHERE id = $2", bs, recordID,
)
require.NoError(t, err)
return nil
})
}

// Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record.
oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed)
require.NoError(t, removeOnePTSTarget(oldRecordID))

// Sanity check: make sure that it worked
oldRecord, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID)
require.NoError(t, err)
targetIDs := oldRecord.Target.GetSchemaObjects().IDs
require.Contains(t, targetIDs, fooID)
require.NotSubset(t, targetIDs, systemTablesToProtect)

// Flip the knob so the changefeed migrates the record
dontMigrate.Store(false)

getNewPTSRecord := func() *ptpb.Record {
var recID uuid.UUID
var record *ptpb.Record
testutils.SucceedsSoon(t, func() error {
recID = getPTSRecordID(ctx, t, registry, jobFeed)
if recID.Equal(oldRecordID) {
return errors.New("waiting for new PTS record")
}
return nil
})
record, err := readPTSRecord(ctx, t, execCfg, ptp, recID)
require.NoError(t, err)
return record
}

// Read the new PTS record.
newRec := getNewPTSRecord()
require.NotNil(t, newRec.Target)

// Assert the new PTS record has the right targets.
targetIDs = newRec.Target.GetSchemaObjects().IDs
require.Contains(t, targetIDs, fooID)
require.Subset(t, targetIDs, systemTablesToProtect)

// Ensure the old pts record was deleted.
_, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID)
require.ErrorContains(t, err, "does not exist")
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
}

// TestChangefeedUpdateProtectedTimestamp tests that changefeeds using the
// old style PTS records will migrate themselves to use the new style PTS
// records.
Expand Down Expand Up @@ -605,41 +723,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
descID := descpb.ID(keys.DescriptorTableID)

jobFeed := foo.(cdctest.EnterpriseTestFeed)
loadProgressErr := func() (jobspb.Progress, error) {
job, err := registry.LoadJob(ctx, jobFeed.JobID())
if err != nil {
return jobspb.Progress{}, err
}
return job.Progress(), nil
}

getPTSRecordID := func() uuid.UUID {
var recordID uuid.UUID
testutils.SucceedsSoon(t, func() error {
progress, err := loadProgressErr()
if err != nil {
return err
}
uid := progress.GetChangefeed().ProtectedTimestampRecord
if uid == uuid.Nil {
return errors.Newf("no pts record")
}
recordID = uid
return nil
})
return recordID
}

readPTSRecord := func(recID uuid.UUID) (rec *ptpb.Record, err error) {
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID)
if err != nil {
return err
}
return nil
})
return
}
removePTSTarget := func(recordID uuid.UUID) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
if _, err := txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride,
Expand All @@ -654,9 +738,9 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
}

// Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record.
oldRecordID := getPTSRecordID()
oldRecordID := getPTSRecordID(ctx, t, registry, jobFeed)
require.NoError(t, removePTSTarget(oldRecordID))
rec, err := readPTSRecord(oldRecordID)
rec, err := readPTSRecord(ctx, t, execCfg, ptp, oldRecordID)
require.NoError(t, err)
require.NotNil(t, rec)
require.Nil(t, rec.Target)
Expand All @@ -668,14 +752,14 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
var recID uuid.UUID
var record *ptpb.Record
testutils.SucceedsSoon(t, func() error {
recID = getPTSRecordID()
recID = getPTSRecordID(ctx, t, registry, jobFeed)
if recID.Equal(oldRecordID) {
return errors.New("waiting for new PTS record")
}

return nil
})
record, err = readPTSRecord(recID)
record, err = readPTSRecord(ctx, t, execCfg, ptp, recID)
if err != nil {
t.Fatal(err)
}
Expand All @@ -692,7 +776,7 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
require.Contains(t, targetIDs, descID)

// Ensure the old pts record was deleted.
_, err = readPTSRecord(oldRecordID)
_, err = readPTSRecord(ctx, t, execCfg, ptp, oldRecordID)
require.ErrorContains(t, err, "does not exist")
}

Expand Down Expand Up @@ -768,3 +852,49 @@ func fetchUsersAndPasswords(
}
return users, nil
}

func getPTSRecordID(
ctx context.Context, t *testing.T, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed,
) uuid.UUID {
var recordID uuid.UUID
testutils.SucceedsSoon(t, func() error {
progress, err := loadProgressErr(ctx, registry, jobFeed)
if err != nil {
return err
}
uid := progress.GetChangefeed().ProtectedTimestampRecord
if uid == uuid.Nil {
return errors.Newf("no pts record")
}
recordID = uid
return nil
})
return recordID
}

func readPTSRecord(
ctx context.Context,
t *testing.T,
execCfg sql.ExecutorConfig,
ptp protectedts.Provider,
recID uuid.UUID,
) (rec *ptpb.Record, err error) {
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID)
if err != nil {
return err
}
return nil
})
return
}

func loadProgressErr(
ctx context.Context, registry *jobs.Registry, jobFeed cdctest.EnterpriseTestFeed,
) (jobspb.Progress, error) {
job, err := registry.LoadJob(ctx, jobFeed.JobID())
if err != nil {
return jobspb.Progress{}, err
}
return job.Progress(), nil
}
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type TestingKnobs struct {
// its PTS record from the deprecated style to the new style.
PreserveDeprecatedPts func() bool

// PreservePTSTargets is used to prevent a changefeed from upgrading
// its PTS record to include all required targets.
PreservePTSTargets func() bool

// PulsarClientSkipCreation skips creating the sink client when
// dialing.
PulsarClientSkipCreation bool
Expand Down

0 comments on commit e0f0b53

Please sign in to comment.