Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: changefeedccl: fix schema change boundary race condition #137704

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"parquet.go",
"parquet_sink_cloudstorage.go",
"protected_timestamps.go",
"resolved_span_frontier.go",
"retry.go",
"scheduled_changefeed.go",
"schema_registry.go",
Expand Down
62 changes: 41 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
return nil
}

advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
advanced, err := ca.frontier.ForwardResolvedSpan(ca.Ctx(), resolved)
if err != nil {
return err
}
Expand Down Expand Up @@ -880,7 +880,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
// At a lower frequency we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
(ca.frontier.InBackfill(resolved) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)

Expand Down Expand Up @@ -973,7 +973,7 @@ type changeFrontier struct {

// frontier contains the current resolved timestamp high-water for the tracked
// span set.
frontier *schemaChangeFrontier
frontier *frontierResolvedSpanFrontier

// localState contains an in memory cache of progress updates.
// Used by core style changefeeds as well as regular changefeeds to make
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func newChangeFrontierProcessor(
post *execinfrapb.PostProcessSpec,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem")
sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
sf, err := newFrontierResolvedSpanFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1603,7 +1603,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
}

func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved)
frontierChanged, err := cf.frontier.ForwardResolvedSpan(cf.Ctx(), resolved)
if err != nil {
return err
}
Expand Down Expand Up @@ -1671,7 +1671,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
inBackfill := !frontierChanged && resolvedSpan.Timestamp.Equal(cf.frontier.BackfillTS())
inBackfill := !frontierChanged && cf.frontier.InBackfill(resolvedSpan)

// If we're not in a backfill, highwater progress and an empty checkpoint will
// be saved. This is throttled however we always persist progress to a schema
Expand Down Expand Up @@ -2011,15 +2011,15 @@ func makeSchemaChangeFrontier(

// ForwardResolvedSpan advances the timestamp for a resolved span, taking care
// of updating schema change boundary information.
func (f *schemaChangeFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool, error) {
func (f *schemaChangeFrontier) ForwardResolvedSpan(
ctx context.Context, r jobspb.ResolvedSpan,
) (bool, error) {
if r.BoundaryType != jobspb.ResolvedSpan_NONE {
if !f.boundaryTime.IsEmpty() && r.Timestamp.Less(f.boundaryTime) {
// Boundary resolved events should be ingested from the schema feed
// serially, where the changefeed won't even observe a new schema change
// boundary until it has progressed past the current boundary
return false, errors.AssertionFailedf("received boundary timestamp %v < %v "+
"of type %v before reaching existing boundary of type %v",
r.Timestamp, f.boundaryTime, r.BoundaryType, f.boundaryType)
// Boundary resolved events should be ingested from the schema feed
// serially, where the changefeed won't even observe a new schema change
// boundary until it has progressed past the current boundary.
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
Expand Down Expand Up @@ -2078,16 +2078,15 @@ func (f *schemaChangeFrontier) getCheckpointSpans(
return getCheckpointSpans(f.Frontier(), f.Entries, maxBytes)
}

// BackfillTS returns the timestamp of the incoming spans for an ongoing
// Backfill (either an Initial Scan backfill or a Schema Change backfill).
// If no Backfill is occurring, an empty timestamp is returned.
func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp {
// InBackfill returns whether a resolved span is part of an ongoing backfill
// (either an initial scan backfill or a schema change backfill).
func (f *schemaChangeFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
frontier := f.Frontier()

// The scan for the initial backfill results in spans sent at StatementTime,
// which is what initialHighWater is set to when performing an initial scan.
if frontier.IsEmpty() {
return f.initialHighWater
return r.Timestamp.Equal(f.initialHighWater)
}

// If the backfill is occurring after any initial scan (non-empty frontier),
Expand All @@ -2098,9 +2097,30 @@ func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp {
// is read from the job progress and is equal to the old BACKFILL boundary
restarted := frontier.Equal(f.initialHighWater)
if backfilling || restarted {
return frontier.Next()
return r.Timestamp.Equal(frontier.Next())
}
return hlc.Timestamp{}

return false
}

// assertBoundaryNotEarlier is a helper method provided to assert that a
// resolved span does not have an earlier boundary than the existing one.
func (f *schemaChangeFrontier) assertBoundaryNotEarlier(
ctx context.Context, r jobspb.ResolvedSpan,
) error {
boundaryType := r.BoundaryType
if boundaryType == jobspb.ResolvedSpan_NONE {
return errors.AssertionFailedf("assertBoundaryNotEarlier should not be called for NONE boundary")
}
boundaryTS := r.Timestamp
if f.boundaryTime.After(boundaryTS) {
err := errors.AssertionFailedf("received resolved span for %s "+
"with %v boundary (%v), which is earlier than previously received %v boundary (%v)",
r.Span, r.BoundaryType, r.Timestamp, f.boundaryType, f.boundaryTime)
log.Errorf(ctx, "error while forwarding boundary resolved span: %v", err)
return err
}
return nil
}

// schemaChangeBoundaryReached returns true at the single moment when all spans
Expand Down
115 changes: 115 additions & 0 deletions pkg/ccl/changefeedccl/resolved_span_frontier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package changefeedccl

import (
"context"
"slices"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// frontierResolvedSpanFrontier wraps a schemaChangeFrontier with additional
// checks specific to how change frontiers processes schema changes.
type frontierResolvedSpanFrontier struct {
schemaChangeFrontier

// backfills is a sorted list of timestamps for ongoing backfills.
// Usually there will only be one, but since aggregators run
// backfills in parallel without synchronization, there may be
// multiple backfills happening at one time.
backfills []hlc.Timestamp
}

// newFrontierResolvedSpanFrontier returns a new frontierResolvedSpanFrontier.
func newFrontierResolvedSpanFrontier(
initialHighWater hlc.Timestamp, spans ...roachpb.Span,
) (*frontierResolvedSpanFrontier, error) {
scf, err := makeSchemaChangeFrontier(initialHighWater, spans...)
if err != nil {
return nil, err
}
return &frontierResolvedSpanFrontier{
schemaChangeFrontier: *scf,
}, nil
}

// ForwardResolvedSpan forwards the progress of a resolved span and also does
// some boundary validation.
func (f *frontierResolvedSpanFrontier) ForwardResolvedSpan(
ctx context.Context, r jobspb.ResolvedSpan,
) (bool, error) {
switch boundaryType := r.BoundaryType; boundaryType {
case jobspb.ResolvedSpan_NONE:
case jobspb.ResolvedSpan_BACKFILL:
// The change frontier collects resolved spans from all the change
// aggregators. Since a BACKFILL schema change does not cause an
// aggregator to shut down, an aggregator may encounter a second
// schema change (and send resolved spans for that second schema
// change) before the frontier has received resolved spans for the
// first BACKFILL schema change from all aggregators. Thus, as long as
// it is a BACKFILL we have already seen, then it is fine for it to be
// an earlier timestamp than the latest boundary.
boundaryTS := r.Timestamp
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
if ok {
break
}
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.backfills = append(f.backfills, boundaryTS)
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
case jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
// EXIT and RESTART are final boundaries that cause the changefeed
// processors to all move to draining and so should not be followed
// by any other boundaries.
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
default:
return false, errors.AssertionFailedf("unknown boundary type: %v", boundaryType)
}
f.latestTs.Forward(r.Timestamp)
frontierChanged, err := f.Forward(r.Span, r.Timestamp)
if err != nil {
return false, err
}
// If the frontier changed, we check if the frontier has advanced past any known backfills.
if frontierChanged {
frontier := f.Frontier()
i, _ := slices.BinarySearchFunc(f.backfills, frontier, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
f.backfills = f.backfills[i:]
}
return frontierChanged, nil
}

// InBackfill returns whether a resolved span is part of an ongoing backfill
// (either an initial scan backfill or a schema change backfill).
// NB: Since the frontierResolvedSpanFrontier consolidates the frontiers of
// multiple change aggregators, there may be more than one concurrent backfill
// happening at different timestamps.
func (f *frontierResolvedSpanFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
boundaryTS := r.Timestamp
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
if ok {
return true
}

return f.schemaChangeFrontier.InBackfill(r)
}
69 changes: 69 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2/clientcredentials"
)

Expand Down Expand Up @@ -1071,6 +1072,64 @@ func runCDCKafkaAuth(ctx context.Context, t test.Test, c cluster.Cluster) {
}
}

// runCDCMultipleSchemaChanges is a regression test for #136847.
func runCDCMultipleSchemaChanges(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings())

db := c.Conn(ctx, t.L(), 1)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true")

tableNames := []string{"a", "b", "c", "d", "e", "f", "g", "h"}
for _, tableName := range tableNames {
createStmt := fmt.Sprintf(`CREATE TABLE %s (
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
col TIMESTAMP(6) NULL
)`, tableName)
sqlDB.Exec(t, createStmt)
}

var jobID int
sqlDB.QueryRow(t,
fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://'", strings.Join(tableNames, ", ")),
).Scan(&jobID)

alterStmts := []string{"SET sql_safe_updates = false"}
for _, tableName := range tableNames {
alterStmts = append(alterStmts, fmt.Sprintf(`ALTER TABLE %s DROP col`, tableName))
}
sqlDB.ExecMultiple(t, alterStmts...)
timeAfterSchemaChanges := timeutil.Now()

t.L().Printf("waiting for changefeed highwater to pass %s", timeAfterSchemaChanges)
highwaterLoop:
for {
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(30 * time.Second):
info, err := getChangefeedInfo(db, jobID)
require.NoError(t, err)
status := info.GetStatus()
if status != "running" {
errorStr := info.GetError()
// Wait for job error to be populated.
if errorStr == "" {
t.L().Printf("changefeed status is %s instead of running, no error set yet", status)
continue
}
t.Fatalf("changefeed status is %s instead of running: %s", status, errorStr)
}
hw := info.GetHighWater()
if hw.After(timeAfterSchemaChanges) {
break highwaterLoop
}
t.L().Printf("changefeed highwater is %s <= %s", hw, timeAfterSchemaChanges)
}
}
}

func registerCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-only",
Expand Down Expand Up @@ -1795,6 +1854,16 @@ func registerCDC(r registry.Registry) {
runCDCSchemaRegistry(ctx, t, c)
},
})
r.Add(registry.TestSpec{
Name: "cdc/multiple-schema-changes",
Owner: registry.OwnerCDC,
Benchmark: false,
Cluster: r.MakeClusterSpec(3, spec.CPU(16)),
CompatibleClouds: registry.AllClouds,
Suites: registry.Suites(registry.Nightly),
Timeout: 1 * time.Hour,
Run: runCDCMultipleSchemaChanges,
})
}

const (
Expand Down