From b156bf06c971f2e2eed8287fe8dc513d35be6e39 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 12 Sep 2024 22:22:26 +0000 Subject: [PATCH] changefeedccl: protect system.comments and system.zones The catalog reader reads from system.descriptor, system.comments, and system.zones when reading a table descriptor from disk. We were only protecting system.descriptors. Further, the test here previously wasn't testing anything because it was only forcing the GC-threshold to 1 second before read timestamp it was using. Informs #128806 Release note (bug fix): Fix bug that could prevent a CHANGEFEED from being able to resume after being paused for prolonged period of time. --- pkg/ccl/changefeedccl/protected_timestamps.go | 22 +++++--- .../protected_timestamps_test.go | 55 ++++++++++++++++++- .../v24_1_migrate_pts_records_test.go | 19 ++++--- 3 files changed, 78 insertions(+), 18 deletions(-) diff --git a/pkg/ccl/changefeedccl/protected_timestamps.go b/pkg/ccl/changefeedccl/protected_timestamps.go index 70def6e4a232..0ba31337db34 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps.go +++ b/pkg/ccl/changefeedccl/protected_timestamps.go @@ -42,24 +42,30 @@ func createProtectedTimestampRecord( jobsprotectedts.Jobs, targetToProtect) } +// systemTablesToProtect holds the descriptor IDs of the system tables +// that need to be protected to ensure that a CHANGEFEED can do a +// historical read of a table descriptor. +var systemTablesToProtect = []descpb.ID{ + keys.DescriptorTableID, + keys.CommentsTableID, + keys.ZonesTableID, +} + func makeTargetToProtect(targets changefeedbase.Targets) *ptpb.Target { // NB: We add 1 because we're also going to protect system.descriptors. // We protect system.descriptors because a changefeed needs all of the history // of table descriptors to version data. - tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+1) + tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+len(systemTablesToProtect)) _ = targets.EachTableID(func(id descpb.ID) error { tablesToProtect = append(tablesToProtect, id) return nil }) - tablesToProtect = append(tablesToProtect, keys.DescriptorTableID) + tablesToProtect = append(tablesToProtect, systemTablesToProtect...) return ptpb.MakeSchemaObjectsTarget(tablesToProtect) } func makeSpansToProtect(codec keys.SQLCodec, targets changefeedbase.Targets) []roachpb.Span { - // NB: We add 1 because we're also going to protect system.descriptors. - // We protect system.descriptors because a changefeed needs all of the history - // of table descriptors to version data. - spansToProtect := make([]roachpb.Span, 0, targets.NumUniqueTables()+1) + spansToProtect := make([]roachpb.Span, 0, targets.NumUniqueTables()+len(systemTablesToProtect)) addTablePrefix := func(id uint32) { tablePrefix := codec.TablePrefix(id) spansToProtect = append(spansToProtect, roachpb.Span{ @@ -71,6 +77,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets changefeedbase.Targets) []r addTablePrefix(uint32(id)) return nil }) - addTablePrefix(keys.DescriptorTableID) + for _, id := range systemTablesToProtect { + addTablePrefix(uint32(id)) + } return spansToProtect } diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index e1348e8f26e0..33d61916a09d 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" @@ -456,7 +457,7 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) { defer stopServer() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) sqlDB := sqlutils.MakeSQLRunner(db) - + sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`) sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)") ts := s.Clock().Now() ctx := context.Background() @@ -473,10 +474,60 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) { return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(ctx, ptr) })) + // The following code was shameless stolen from + // TestShowTenantFingerprintsProtectsTimestamp which almost + // surely copied it from the 2-3 other tests that have + // something similar. We should put this in a helper. We have + // ForceTableGC, but in ad-hoc testing that appeared to bypass + // the PTS record making it useless for this test. + // + // TODO(ssd): Make a helper that does this. + refreshPTSReaderCache := func(asOf hlc.Timestamp, tableName, databaseName string) { + tableID, err := s.QueryTableID(ctx, username.RootUserName(), tableName, databaseName) + require.NoError(t, err) + tableKey := s.Codec().TablePrefix(uint32(tableID)) + store, err := s.StorageLayer().GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + var repl *kvserver.Replica + testutils.SucceedsSoon(t, func() error { + repl = store.LookupReplica(roachpb.RKey(tableKey)) + if repl == nil { + return errors.New("could not find replica") + } + return nil + }) + ptsReader := store.GetStoreConfig().ProtectedTimestampReader + t.Logf("updating PTS reader cache to %s", asOf) + require.NoError( + t, + spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, asOf), + ) + require.NoError(t, repl.ReadProtectedTimestampsForTesting(ctx)) + } + gcTestTableRange := func(tableName, databaseName string) { + row := sqlDB.QueryRow(t, fmt.Sprintf("SELECT range_id FROM [SHOW RANGES FROM TABLE %s.%s]", tableName, databaseName)) + var rangeID int64 + row.Scan(&rangeID) + refreshPTSReaderCache(s.Clock().Now(), tableName, databaseName) + t.Logf("enqueuing range %d for mvccGC", rangeID) + sqlDB.Exec(t, `SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, rangeID) + } + // Alter foo few times, then force GC at ts-1. sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN c STRING") sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN d STRING") - require.NoError(t, s.ForceTableGC(ctx, "system", "descriptor", ts.Add(-1, 0))) + time.Sleep(2 * time.Second) + // If you want to GC all system tables: + // + // tabs := systemschema.MakeSystemTables() + // for _, t := range tabs { + // if t.IsPhysicalTable() && !t.IsSequence() { + // gcTestTableRange("system", t.GetName()) + // } + // } + gcTestTableRange("system", "descriptor") + gcTestTableRange("system", "zones") + gcTestTableRange("system", "comments") // We can still fetch table descriptors because of protected timestamp record. asOf := ts diff --git a/pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go b/pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go index 4df3bc28f5c0..00b0b3ee2b52 100644 --- a/pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go +++ b/pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go @@ -88,13 +88,17 @@ func TestMigrateOldStlePTSRecords(t *testing.T) { tableDesc := desctestutils.TestingGetTableDescriptor( s.DB(), execCfg.Codec, "defaultdb", "public", tbl, ) - allTargets = append(allTargets, []catid.DescID{keys.DescriptorTableID, tableDesc.GetID()}) + allTargets = append(allTargets, []catid.DescID{ + keys.DescriptorTableID, + keys.ZonesTableID, + keys.CommentsTableID, + tableDesc.GetID()}) descIDsArr = append(descIDsArr, tableDesc.GetID()) allTables = append(allTables, tbl) } _, err = sqlDB.Exec(fmt.Sprintf("create changefeed for %s INTO 'null://'", strings.Join(allTables, ","))) require.NoError(t, err) - descIDsArr = append(descIDsArr, keys.DescriptorTableID) + descIDsArr = append(descIDsArr, keys.DescriptorTableID, keys.ZonesTableID, keys.CommentsTableID) sort.Slice(descIDsArr, func(i int, j int) bool { return descIDsArr[i] < descIDsArr[j] }) @@ -141,13 +145,10 @@ func TestMigrateOldStlePTSRecords(t *testing.T) { if len(b) < len(a) { return false } - // Since each slice was sorted, the descriptor table will be at index - // 0 since it has the lowest descriptor ID. Then, order by the protected - // data table descriptor at index 1. - return a[1] < b[1] + // Since each slice was sorted, the data data should + // be last. Sort on its descriptor ID. + return a[len(a)-1] < b[len(b)-1] }) - t.Logf("%v", allTargets) - t.Logf("%v", seenTargets) - require.Equal(t, seenTargets, allTargets) + require.Equal(t, allTargets, seenTargets) }