diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 0e24bc18cdfe..2ee917fb0a9f 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "parquet.go", "parquet_sink_cloudstorage.go", "protected_timestamps.go", + "resolved_span_frontier.go", "retry.go", "scheduled_changefeed.go", "schema_registry.go", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 639933132bb6..6894098884cf 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -836,7 +836,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu return nil } - advanced, err := ca.frontier.ForwardResolvedSpan(resolved) + advanced, err := ca.frontier.ForwardResolvedSpan(ca.Ctx(), resolved) if err != nil { return err } @@ -876,7 +876,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu // At a lower frequency we checkpoint specific spans in the job progress // either in backfills or if the highwater mark is excessively lagging behind checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */ - (resolved.Timestamp.Equal(ca.frontier.BackfillTS()) || + (ca.frontier.InBackfill(resolved) || ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, sv)) && canCheckpointSpans(sv, ca.lastSpanFlush) @@ -969,7 +969,7 @@ type changeFrontier struct { // frontier contains the current resolved timestamp high-water for the tracked // span set. - frontier *schemaChangeFrontier + frontier *frontierResolvedSpanFrontier // localState contains an in memory cache of progress updates. // Used by core style changefeeds as well as regular changefeeds to make @@ -1160,7 +1160,7 @@ func newChangeFrontierProcessor( post *execinfrapb.PostProcessSpec, ) (execinfra.Processor, error) { memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem") - sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...) + sf, err := newFrontierResolvedSpanFrontier(hlc.Timestamp{}, spec.TrackedSpans...) if err != nil { return nil, err } @@ -1603,7 +1603,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { } func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { - frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved) + frontierChanged, err := cf.frontier.ForwardResolvedSpan(cf.Ctx(), resolved) if err != nil { return err } @@ -1666,7 +1666,7 @@ func (cf *changeFrontier) maybeCheckpointJob( ) (bool, error) { // When in a Backfill, the frontier remains unchanged at the backfill boundary // as we receive spans from the scan request at the Backfill Timestamp - inBackfill := !frontierChanged && resolvedSpan.Timestamp.Equal(cf.frontier.BackfillTS()) + inBackfill := !frontierChanged && cf.frontier.InBackfill(resolvedSpan) // If we're not in a backfill, highwater progress and an empty checkpoint will // be saved. This is throttled however we always persist progress to a schema @@ -2006,15 +2006,15 @@ func makeSchemaChangeFrontier( // ForwardResolvedSpan advances the timestamp for a resolved span, taking care // of updating schema change boundary information. -func (f *schemaChangeFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool, error) { +func (f *schemaChangeFrontier) ForwardResolvedSpan( + ctx context.Context, r jobspb.ResolvedSpan, +) (bool, error) { if r.BoundaryType != jobspb.ResolvedSpan_NONE { - if !f.boundaryTime.IsEmpty() && r.Timestamp.Less(f.boundaryTime) { - // Boundary resolved events should be ingested from the schema feed - // serially, where the changefeed won't even observe a new schema change - // boundary until it has progressed past the current boundary - return false, errors.AssertionFailedf("received boundary timestamp %v < %v "+ - "of type %v before reaching existing boundary of type %v", - r.Timestamp, f.boundaryTime, r.BoundaryType, f.boundaryType) + // Boundary resolved events should be ingested from the schema feed + // serially, where the changefeed won't even observe a new schema change + // boundary until it has progressed past the current boundary. + if err := f.assertBoundaryNotEarlier(ctx, r); err != nil { + return false, err } f.boundaryTime = r.Timestamp f.boundaryType = r.BoundaryType @@ -2073,16 +2073,15 @@ func (f *schemaChangeFrontier) getCheckpointSpans( return getCheckpointSpans(f.Frontier(), f.Entries, maxBytes) } -// BackfillTS returns the timestamp of the incoming spans for an ongoing -// Backfill (either an Initial Scan backfill or a Schema Change backfill). -// If no Backfill is occurring, an empty timestamp is returned. -func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp { +// InBackfill returns whether a resolved span is part of an ongoing backfill +// (either an initial scan backfill or a schema change backfill). +func (f *schemaChangeFrontier) InBackfill(r jobspb.ResolvedSpan) bool { frontier := f.Frontier() // The scan for the initial backfill results in spans sent at StatementTime, // which is what initialHighWater is set to when performing an initial scan. if frontier.IsEmpty() { - return f.initialHighWater + return r.Timestamp.Equal(f.initialHighWater) } // If the backfill is occurring after any initial scan (non-empty frontier), @@ -2093,9 +2092,30 @@ func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp { // is read from the job progress and is equal to the old BACKFILL boundary restarted := frontier.Equal(f.initialHighWater) if backfilling || restarted { - return frontier.Next() + return r.Timestamp.Equal(frontier.Next()) } - return hlc.Timestamp{} + + return false +} + +// assertBoundaryNotEarlier is a helper method provided to assert that a +// resolved span does not have an earlier boundary than the existing one. +func (f *schemaChangeFrontier) assertBoundaryNotEarlier( + ctx context.Context, r jobspb.ResolvedSpan, +) error { + boundaryType := r.BoundaryType + if boundaryType == jobspb.ResolvedSpan_NONE { + return errors.AssertionFailedf("assertBoundaryNotEarlier should not be called for NONE boundary") + } + boundaryTS := r.Timestamp + if f.boundaryTime.After(boundaryTS) { + err := errors.AssertionFailedf("received resolved span for %s "+ + "with %v boundary (%v), which is earlier than previously received %v boundary (%v)", + r.Span, r.BoundaryType, r.Timestamp, f.boundaryType, f.boundaryTime) + log.Errorf(ctx, "error while forwarding boundary resolved span: %v", err) + return err + } + return nil } // schemaChangeBoundaryReached returns true at the single moment when all spans diff --git a/pkg/ccl/changefeedccl/resolved_span_frontier.go b/pkg/ccl/changefeedccl/resolved_span_frontier.go new file mode 100644 index 000000000000..af4729d33f19 --- /dev/null +++ b/pkg/ccl/changefeedccl/resolved_span_frontier.go @@ -0,0 +1,115 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package changefeedccl + +import ( + "context" + "slices" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// frontierResolvedSpanFrontier wraps a schemaChangeFrontier with additional +// checks specific to how change frontiers processes schema changes. +type frontierResolvedSpanFrontier struct { + schemaChangeFrontier + + // backfills is a sorted list of timestamps for ongoing backfills. + // Usually there will only be one, but since aggregators run + // backfills in parallel without synchronization, there may be + // multiple backfills happening at one time. + backfills []hlc.Timestamp +} + +// newFrontierResolvedSpanFrontier returns a new frontierResolvedSpanFrontier. +func newFrontierResolvedSpanFrontier( + initialHighWater hlc.Timestamp, spans ...roachpb.Span, +) (*frontierResolvedSpanFrontier, error) { + scf, err := makeSchemaChangeFrontier(initialHighWater, spans...) + if err != nil { + return nil, err + } + return &frontierResolvedSpanFrontier{ + schemaChangeFrontier: *scf, + }, nil +} + +// ForwardResolvedSpan forwards the progress of a resolved span and also does +// some boundary validation. +func (f *frontierResolvedSpanFrontier) ForwardResolvedSpan( + ctx context.Context, r jobspb.ResolvedSpan, +) (bool, error) { + switch boundaryType := r.BoundaryType; boundaryType { + case jobspb.ResolvedSpan_NONE: + case jobspb.ResolvedSpan_BACKFILL: + // The change frontier collects resolved spans from all the change + // aggregators. Since a BACKFILL schema change does not cause an + // aggregator to shut down, an aggregator may encounter a second + // schema change (and send resolved spans for that second schema + // change) before the frontier has received resolved spans for the + // first BACKFILL schema change from all aggregators. Thus, as long as + // it is a BACKFILL we have already seen, then it is fine for it to be + // an earlier timestamp than the latest boundary. + boundaryTS := r.Timestamp + _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int { + return elem.Compare(ts) + }) + if ok { + break + } + if err := f.assertBoundaryNotEarlier(ctx, r); err != nil { + return false, err + } + f.backfills = append(f.backfills, boundaryTS) + f.boundaryTime = r.Timestamp + f.boundaryType = r.BoundaryType + case jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART: + // EXIT and RESTART are final boundaries that cause the changefeed + // processors to all move to draining and so should not be followed + // by any other boundaries. + if err := f.assertBoundaryNotEarlier(ctx, r); err != nil { + return false, err + } + f.boundaryTime = r.Timestamp + f.boundaryType = r.BoundaryType + default: + return false, errors.AssertionFailedf("unknown boundary type: %v", boundaryType) + } + f.latestTs.Forward(r.Timestamp) + frontierChanged, err := f.Forward(r.Span, r.Timestamp) + if err != nil { + return false, err + } + // If the frontier changed, we check if the frontier has advanced past any known backfills. + if frontierChanged { + frontier := f.Frontier() + i, _ := slices.BinarySearchFunc(f.backfills, frontier, func(elem hlc.Timestamp, ts hlc.Timestamp) int { + return elem.Compare(ts) + }) + f.backfills = f.backfills[i:] + } + return frontierChanged, nil +} + +// InBackfill returns whether a resolved span is part of an ongoing backfill +// (either an initial scan backfill or a schema change backfill). +// NB: Since the frontierResolvedSpanFrontier consolidates the frontiers of +// multiple change aggregators, there may be more than one concurrent backfill +// happening at different timestamps. +func (f *frontierResolvedSpanFrontier) InBackfill(r jobspb.ResolvedSpan) bool { + boundaryTS := r.Timestamp + _, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int { + return elem.Compare(ts) + }) + if ok { + return true + } + + return f.schemaChangeFrontier.InBackfill(r) +} diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 27e3744428b9..eea3f8cec1af 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -57,6 +57,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/debug" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "golang.org/x/oauth2/clientcredentials" ) @@ -1286,6 +1287,64 @@ func runCDCKafkaAuth(ctx context.Context, t test.Test, c cluster.Cluster) { } } +// runCDCMultipleSchemaChanges is a regression test for #136847. +func runCDCMultipleSchemaChanges(ctx context.Context, t test.Test, c cluster.Cluster) { + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings()) + + db := c.Conn(ctx, t.L(), 1) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") + + tableNames := []string{"a", "b", "c", "d", "e", "f", "g", "h"} + for _, tableName := range tableNames { + createStmt := fmt.Sprintf(`CREATE TABLE %s ( + id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, + col TIMESTAMP(6) NULL +)`, tableName) + sqlDB.Exec(t, createStmt) + } + + var jobID int + sqlDB.QueryRow(t, + fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://'", strings.Join(tableNames, ", ")), + ).Scan(&jobID) + + alterStmts := []string{"SET sql_safe_updates = false"} + for _, tableName := range tableNames { + alterStmts = append(alterStmts, fmt.Sprintf(`ALTER TABLE %s DROP col`, tableName)) + } + sqlDB.ExecMultiple(t, alterStmts...) + timeAfterSchemaChanges := timeutil.Now() + + t.L().Printf("waiting for changefeed highwater to pass %s", timeAfterSchemaChanges) +highwaterLoop: + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case <-time.After(30 * time.Second): + info, err := getChangefeedInfo(db, jobID) + require.NoError(t, err) + status := info.GetStatus() + if status != "running" { + errorStr := info.GetError() + // Wait for job error to be populated. + if errorStr == "" { + t.L().Printf("changefeed status is %s instead of running, no error set yet", status) + continue + } + t.Fatalf("changefeed status is %s instead of running: %s", status, errorStr) + } + hw := info.GetHighWater() + if hw.After(timeAfterSchemaChanges) { + break highwaterLoop + } + t.L().Printf("changefeed highwater is %s <= %s", hw, timeAfterSchemaChanges) + } + } +} + func registerCDC(r registry.Registry) { r.Add(registry.TestSpec{ Name: "cdc/initial-scan-only", @@ -2127,6 +2186,16 @@ func registerCDC(r registry.Registry) { runCDCSchemaRegistry(ctx, t, c) }, }) + r.Add(registry.TestSpec{ + Name: "cdc/multiple-schema-changes", + Owner: registry.OwnerCDC, + Benchmark: false, + Cluster: r.MakeClusterSpec(3, spec.CPU(16)), + CompatibleClouds: registry.AllClouds, + Suites: registry.Suites(registry.Nightly), + Timeout: 1 * time.Hour, + Run: runCDCMultipleSchemaChanges, + }) } const (