Skip to content

Commit

Permalink
changefeedccl: fix schema change boundary race condition
Browse files Browse the repository at this point in the history
This patch fixes an issue where a schema change could incorrectly
cause a changefeed to fail with an assertion error like `received
boundary timestamp ... of type ... before reaching existing boundary
of type ...` due to a race condition between aggregators sending
resolved spans for multiple schema change descriptor updates to
the change frontier.

Release note (bug fix): An issue where a schema change could
incorrectly cause a changefeed to fail with an assertion error
like `received boundary timestamp ... of type ... before reaching
existing boundary of type ...` has now been fixed.
  • Loading branch information
andyyang890 committed Dec 16, 2024
1 parent 975f089 commit 041cca5
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"parquet.go",
"parquet_sink_cloudstorage.go",
"protected_timestamps.go",
"resolved_span_frontier.go",
"retry.go",
"scheduled_changefeed.go",
"schema_registry.go",
Expand Down
62 changes: 41 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
return nil
}

advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
advanced, err := ca.frontier.ForwardResolvedSpan(ca.Ctx(), resolved)
if err != nil {
return err
}
Expand Down Expand Up @@ -876,7 +876,7 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
// At a lower frequency we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
(ca.frontier.InBackfill(resolved) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, sv)) &&
canCheckpointSpans(sv, ca.lastSpanFlush)

Expand Down Expand Up @@ -969,7 +969,7 @@ type changeFrontier struct {

// frontier contains the current resolved timestamp high-water for the tracked
// span set.
frontier *schemaChangeFrontier
frontier *frontierResolvedSpanFrontier

// localState contains an in memory cache of progress updates.
// Used by core style changefeeds as well as regular changefeeds to make
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func newChangeFrontierProcessor(
post *execinfrapb.PostProcessSpec,
) (execinfra.Processor, error) {
memMonitor := execinfra.NewMonitor(ctx, flowCtx.Mon, "changefntr-mem")
sf, err := makeSchemaChangeFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
sf, err := newFrontierResolvedSpanFrontier(hlc.Timestamp{}, spec.TrackedSpans...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1603,7 +1603,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error {
}

func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved)
frontierChanged, err := cf.frontier.ForwardResolvedSpan(cf.Ctx(), resolved)
if err != nil {
return err
}
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
inBackfill := !frontierChanged && resolvedSpan.Timestamp.Equal(cf.frontier.BackfillTS())
inBackfill := !frontierChanged && cf.frontier.InBackfill(resolvedSpan)

// If we're not in a backfill, highwater progress and an empty checkpoint will
// be saved. This is throttled however we always persist progress to a schema
Expand Down Expand Up @@ -2006,15 +2006,15 @@ func makeSchemaChangeFrontier(

// ForwardResolvedSpan advances the timestamp for a resolved span, taking care
// of updating schema change boundary information.
func (f *schemaChangeFrontier) ForwardResolvedSpan(r jobspb.ResolvedSpan) (bool, error) {
func (f *schemaChangeFrontier) ForwardResolvedSpan(
ctx context.Context, r jobspb.ResolvedSpan,
) (bool, error) {
if r.BoundaryType != jobspb.ResolvedSpan_NONE {
if !f.boundaryTime.IsEmpty() && r.Timestamp.Less(f.boundaryTime) {
// Boundary resolved events should be ingested from the schema feed
// serially, where the changefeed won't even observe a new schema change
// boundary until it has progressed past the current boundary
return false, errors.AssertionFailedf("received boundary timestamp %v < %v "+
"of type %v before reaching existing boundary of type %v",
r.Timestamp, f.boundaryTime, r.BoundaryType, f.boundaryType)
// Boundary resolved events should be ingested from the schema feed
// serially, where the changefeed won't even observe a new schema change
// boundary until it has progressed past the current boundary.
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
Expand Down Expand Up @@ -2073,16 +2073,15 @@ func (f *schemaChangeFrontier) getCheckpointSpans(
return getCheckpointSpans(f.Frontier(), f.Entries, maxBytes)
}

// BackfillTS returns the timestamp of the incoming spans for an ongoing
// Backfill (either an Initial Scan backfill or a Schema Change backfill).
// If no Backfill is occurring, an empty timestamp is returned.
func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp {
// InBackfill returns whether a resolved span is part of an ongoing backfill
// (either an initial scan backfill or a schema change backfill).
func (f *schemaChangeFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
frontier := f.Frontier()

// The scan for the initial backfill results in spans sent at StatementTime,
// which is what initialHighWater is set to when performing an initial scan.
if frontier.IsEmpty() {
return f.initialHighWater
return r.Timestamp.Equal(f.initialHighWater)
}

// If the backfill is occurring after any initial scan (non-empty frontier),
Expand All @@ -2093,9 +2092,30 @@ func (f *schemaChangeFrontier) BackfillTS() hlc.Timestamp {
// is read from the job progress and is equal to the old BACKFILL boundary
restarted := frontier.Equal(f.initialHighWater)
if backfilling || restarted {
return frontier.Next()
return r.Timestamp.Equal(frontier.Next())
}
return hlc.Timestamp{}

return false
}

// assertBoundaryNotEarlier is a helper method provided to assert that a
// resolved span does not have an earlier boundary than the existing one.
func (f *schemaChangeFrontier) assertBoundaryNotEarlier(
ctx context.Context, r jobspb.ResolvedSpan,
) error {
boundaryType := r.BoundaryType
if boundaryType == jobspb.ResolvedSpan_NONE {
return errors.AssertionFailedf("assertBoundaryNotEarlier should not be called for NONE boundary")
}
boundaryTS := r.Timestamp
if f.boundaryTime.After(boundaryTS) {
err := errors.AssertionFailedf("received resolved span for %s "+
"with %v boundary (%v), which is earlier than previously received %v boundary (%v)",
r.Span, r.BoundaryType, r.Timestamp, f.boundaryType, f.boundaryTime)
log.Errorf(ctx, "error while forwarding boundary resolved span: %v", err)
return err
}
return nil
}

// schemaChangeBoundaryReached returns true at the single moment when all spans
Expand Down
115 changes: 115 additions & 0 deletions pkg/ccl/changefeedccl/resolved_span_frontier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package changefeedccl

import (
"context"
"slices"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// frontierResolvedSpanFrontier wraps a schemaChangeFrontier with additional
// checks specific to how change frontiers processes schema changes.
type frontierResolvedSpanFrontier struct {
schemaChangeFrontier

// backfills is a sorted list of timestamps for ongoing backfills.
// Usually there will only be one, but since aggregators run
// backfills in parallel without synchronization, there may be
// multiple backfills happening at one time.
backfills []hlc.Timestamp
}

// newFrontierResolvedSpanFrontier returns a new frontierResolvedSpanFrontier.
func newFrontierResolvedSpanFrontier(
initialHighWater hlc.Timestamp, spans ...roachpb.Span,
) (*frontierResolvedSpanFrontier, error) {
scf, err := makeSchemaChangeFrontier(initialHighWater, spans...)
if err != nil {
return nil, err
}
return &frontierResolvedSpanFrontier{
schemaChangeFrontier: *scf,
}, nil
}

// ForwardResolvedSpan forwards the progress of a resolved span and also does
// some boundary validation.
func (f *frontierResolvedSpanFrontier) ForwardResolvedSpan(
ctx context.Context, r jobspb.ResolvedSpan,
) (bool, error) {
switch boundaryType := r.BoundaryType; boundaryType {
case jobspb.ResolvedSpan_NONE:
case jobspb.ResolvedSpan_BACKFILL:
// The change frontier collects resolved spans from all the change
// aggregators. Since a BACKFILL schema change does not cause an
// aggregator to shut down, an aggregator may encounter a second
// schema change (and send resolved spans for that second schema
// change) before the frontier has received resolved spans for the
// first BACKFILL schema change from all aggregators. Thus, as long as
// it is a BACKFILL we have already seen, then it is fine for it to be
// an earlier timestamp than the latest boundary.
boundaryTS := r.Timestamp
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
if ok {
break
}
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.backfills = append(f.backfills, boundaryTS)
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
case jobspb.ResolvedSpan_EXIT, jobspb.ResolvedSpan_RESTART:
// EXIT and RESTART are final boundaries that cause the changefeed
// processors to all move to draining and so should not be followed
// by any other boundaries.
if err := f.assertBoundaryNotEarlier(ctx, r); err != nil {
return false, err
}
f.boundaryTime = r.Timestamp
f.boundaryType = r.BoundaryType
default:
return false, errors.AssertionFailedf("unknown boundary type: %v", boundaryType)
}
f.latestTs.Forward(r.Timestamp)
frontierChanged, err := f.Forward(r.Span, r.Timestamp)
if err != nil {
return false, err
}
// If the frontier changed, we check if the frontier has advanced past any known backfills.
if frontierChanged {
frontier := f.Frontier()
i, _ := slices.BinarySearchFunc(f.backfills, frontier, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
f.backfills = f.backfills[i:]
}
return frontierChanged, nil
}

// InBackfill returns whether a resolved span is part of an ongoing backfill
// (either an initial scan backfill or a schema change backfill).
// NB: Since the frontierResolvedSpanFrontier consolidates the frontiers of
// multiple change aggregators, there may be more than one concurrent backfill
// happening at different timestamps.
func (f *frontierResolvedSpanFrontier) InBackfill(r jobspb.ResolvedSpan) bool {
boundaryTS := r.Timestamp
_, ok := slices.BinarySearchFunc(f.backfills, boundaryTS, func(elem hlc.Timestamp, ts hlc.Timestamp) int {
return elem.Compare(ts)
})
if ok {
return true
}

return f.schemaChangeFrontier.InBackfill(r)
}
68 changes: 68 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,64 @@ func runCDCKafkaAuth(ctx context.Context, t test.Test, c cluster.Cluster) {
}
}

// runCDCMultipleSchemaChanges is a regression test for #136847.
func runCDCMultipleSchemaChanges(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings())

db := c.Conn(ctx, t.L(), 1)
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true")

tableNames := []string{"a", "b", "c", "d", "e", "f", "g", "h"}
for _, tableName := range tableNames {
createStmt := fmt.Sprintf(`CREATE TABLE %s (
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
col TIMESTAMP(6) NULL
)`, tableName)
sqlDB.Exec(t, createStmt)
}

var jobID int
sqlDB.QueryRow(t,
fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://'", strings.Join(tableNames, ", ")),
).Scan(&jobID)

alterStmts := []string{"SET sql_safe_updates = false"}
for _, tableName := range tableNames {
alterStmts = append(alterStmts, fmt.Sprintf(`ALTER TABLE %s DROP col`, tableName))
}
sqlDB.ExecMultiple(t, alterStmts...)
timeAfterSchemaChanges := timeutil.Now()

t.L().Printf("waiting for changefeed highwater to pass %s", timeAfterSchemaChanges)
highwaterLoop:
for {
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(30 * time.Second):
info, err := getChangefeedInfo(db, jobID)
require.NoError(t, err)
status := info.GetStatus()
if status != "running" {
errorStr := info.GetError()
// Wait for job error to be populated.
if errorStr == "" {
t.L().Printf("changefeed status is %s instead of running, no error set yet", status)
continue
}
t.Fatalf("changefeed status is %s instead of running: %s", status, errorStr)
}
hw := info.GetHighWater()
if hw.After(timeAfterSchemaChanges) {
break highwaterLoop
}
t.L().Printf("changefeed highwater is %s <= %s", hw, timeAfterSchemaChanges)
}
}
}

func registerCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-only",
Expand Down Expand Up @@ -2127,6 +2185,16 @@ func registerCDC(r registry.Registry) {
runCDCSchemaRegistry(ctx, t, c)
},
})
r.Add(registry.TestSpec{
Name: "cdc/multiple-schema-changes",
Owner: registry.OwnerCDC,
Benchmark: false,
Cluster: r.MakeClusterSpec(3, spec.CPU(16)),
CompatibleClouds: registry.AllClouds,
Suites: registry.Suites(registry.Nightly),
Timeout: 1 * time.Hour,
Run: runCDCMultipleSchemaChanges,
})
}

const (
Expand Down

0 comments on commit 041cca5

Please sign in to comment.