Skip to content

Commit

Permalink
Merge #132133
Browse files Browse the repository at this point in the history
132133: crosscluster/logical: rename IGNORE_CDC_IGNORED_TTL_DELETES to DISCARD r=dt a=dt

With more filtering modes expected or at least plausible in the future, this option name was too specific to one type of filtering to be reused. Instead we now accept a 'discard' option that can be passed different modes in the future.

Release note: none.
Epic: none.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Oct 9, 2024
2 parents c104fb1 + ac6577c commit 16c9cfb
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 62 deletions.
2 changes: 0 additions & 2 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,6 @@ unreserved_keyword ::=
| 'NOWAIT'
| 'NULLS'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'INSENSITIVE'
| 'OF'
| 'OFF'
Expand Down Expand Up @@ -3879,7 +3878,6 @@ bare_label_keywords ::=
| 'IFERROR'
| 'IFNULL'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'ILIKE'
| 'IMMEDIATE'
| 'IMMEDIATELY'
Expand Down
56 changes: 35 additions & 21 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func createLogicalReplicationStreamPlanHook(
mode = jobspb.LogicalReplicationDetails_Validated
}

discard := jobspb.LogicalReplicationDetails_DiscardNothing
if m, ok := options.Discard(); ok {
switch m {
case "ttl-deletes":
discard = jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes
default:
return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m)
}
}

var (
targetsDescription string
srcTableNames = make([]string, len(stmt.From.Tables))
Expand Down Expand Up @@ -262,16 +272,16 @@ func createLogicalReplicationStreamPlanHook(
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
IgnoreCDCIgnoredTTLDeletes: options.IgnoreCDCIgnoredTTLDeletes(),
Mode: mode,
MetricsLabel: options.metricsLabel,
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
},
Progress: progress,
}
Expand Down Expand Up @@ -313,9 +323,9 @@ func createLogicalReplicationStreamTypeCheck(
stmt.Options.DefaultFunction,
stmt.Options.Mode,
stmt.Options.MetricsLabel,
stmt.Options.Discard,
},
exprutil.Bools{
stmt.Options.IgnoreCDCIgnoredTTLDeletes,
stmt.Options.SkipSchemaCheck,
},
}
Expand All @@ -333,10 +343,10 @@ type resolvedLogicalReplicationOptions struct {
mode string
defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution
// Mapping of table name to function descriptor
userFunctions map[string]int32
ignoreCDCIgnoredTTLDeletes bool
skipSchemaCheck bool
metricsLabel string
userFunctions map[string]int32
discard string
skipSchemaCheck bool
metricsLabel string
}

func evalLogicalReplicationOptions(
Expand Down Expand Up @@ -418,8 +428,12 @@ func evalLogicalReplicationOptions(
}
}

if options.IgnoreCDCIgnoredTTLDeletes == tree.DBoolTrue {
r.ignoreCDCIgnoredTTLDeletes = true
if options.Discard != nil {
discard, err := eval.String(ctx, options.Discard)
if err != nil {
return nil, err
}
r.discard = discard
}
if options.SkipSchemaCheck == tree.DBoolTrue {
r.skipSchemaCheck = true
Expand Down Expand Up @@ -476,11 +490,11 @@ func (r *resolvedLogicalReplicationOptions) GetUserFunctions() (map[string]int32
return r.userFunctions, true
}

func (r *resolvedLogicalReplicationOptions) IgnoreCDCIgnoredTTLDeletes() bool {
if r == nil {
return false
func (r *resolvedLogicalReplicationOptions) Discard() (string, bool) {
if r == nil || r.discard == "" {
return "", false
}
return r.ignoreCDCIgnoredTTLDeletes
return r.discard, true
}

func (r *resolvedLogicalReplicationOptions) SkipSchemaCheck() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/crosscluster/logical/logical_replication_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func constructLogicalReplicationWriterSpecs(
tableMetadataByDestID map[int32]execinfrapb.TableReplicationMetadata,
jobID jobspb.JobID,
streamID streampb.StreamID,
ignoreCDCIgnoredTTLDeletes bool,
discard jobspb.LogicalReplicationDetails_Discard,
mode jobspb.LogicalReplicationDetails_ApplyMode,
metricsLabel string,
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
Expand All @@ -45,7 +45,7 @@ func constructLogicalReplicationWriterSpecs(
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
TableMetadataByDestID: tableMetadataByDestID,
IgnoreCDCIgnoredTTLDeletes: ignoreCDCIgnoredTTLDeletes,
Discard: discard,
Mode: mode,
MetricsLabel: metricsLabel,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
tableMetadataByDestID,
p.job.ID(),
streampb.StreamID(payload.StreamID),
payload.IgnoreCDCIgnoredTTLDeletes,
payload.Discard,
payload.Mode,
payload.MetricsLabel,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) {
)

dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String()).Scan(&jobAID)
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH IGNORE_CDC_IGNORED_TTL_DELETES", dbAURL.String()).Scan(&jobBID)
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH DISCARD = 'ttl-deletes'", dbAURL.String()).Scan(&jobBID)

now := server.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
Expand All @@ -669,10 +669,10 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) {

// Verify that Job contains FilterRangeFeed
details := jobutils.GetJobPayload(t, dbA, jobAID).GetLogicalReplicationDetails()
require.False(t, details.IgnoreCDCIgnoredTTLDeletes)
require.False(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)

details = jobutils.GetJobPayload(t, dbB, jobBID).GetLogicalReplicationDetails()
require.True(t, details.IgnoreCDCIgnoredTTLDeletes)
require.True(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)

require.Equal(t, len(filterVal), 2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {

if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.IgnoreCDCIgnoredTTLDeletes)
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)
}
}
sub, err := streamClient.Subscribe(ctx,
streampb.StreamID(lrw.spec.StreamID),
int32(lrw.FlowCtx.NodeID.SQLInstanceID()), lrw.ProcessorID,
token,
lrw.spec.InitialScanTimestamp, lrw.frontier,
streamclient.WithFiltering(lrw.spec.IgnoreCDCIgnoredTTLDeletes),
streamclient.WithFiltering(lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes),
streamclient.WithDiff(true),
)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ message LogicalReplicationDetails {
}
DefaultConflictResolution default_conflict_resolution = 7 [(gogoproto.nullable) = false];

bool ignore_cdc_ignored_ttl_deletes = 8 [(gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"];
reserved 8;

enum ApplyMode {
Immediate = 0;
Expand All @@ -265,7 +265,14 @@ message LogicalReplicationDetails {

string metrics_label = 10;

// Next ID: 11.
enum Discard {
DiscardNothing = 0;
DiscardCDCIgnoredTTLDeletes = 1;
}

Discard discard = 11;

// Next ID: 12.
}

message LogicalReplicationProgress {
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ message LogicalReplicationWriterSpec {
// names.
map<int32, TableReplicationMetadata> table_metadata_by_dest_id = 8 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetadataByDestID"];

// IgnoreCDCIgnoredTTLDeletes is an option on whether to filter out 'OmitinRangefeed' events
// when processing changes in LDR
optional bool ignore_cdc_ignored_ttl_deletes = 9 [(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"];
reserved 9;

// Discard is an option on whether to filter some events.
optional jobs.jobspb.LogicalReplicationDetails.Discard discard = 12 [(gogoproto.nullable) = false];

optional jobs.jobspb.LogicalReplicationDetails.ApplyMode mode = 10 [(gogoproto.nullable) = false];

optional string metrics_label = 11 [(gogoproto.nullable) = false];

// Next ID: 12.
// Next ID: 13.
}
10 changes: 4 additions & 6 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach {
%token <str> HAVING HASH HEADER HIGH HISTOGRAM HOLD HOUR

%token <str> IDENTITY
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS IGNORE_CDC_IGNORED_TTL_DELETES ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> INCLUDING INCLUDE_ALL_SECONDARY_TENANTS INCLUDE_ALL_VIRTUAL_CLUSTERS INCREMENT INCREMENTAL INCREMENTAL_LOCATION
%token <str> INET INET_CONTAINED_BY_OR_EQUALS
%token <str> INET_CONTAINS_OR_EQUALS INDEX INDEXES INHERITS INJECT INITIALLY
Expand Down Expand Up @@ -4635,7 +4635,7 @@ create_stmt:
// < CURSOR = start_time > |
// < DEFAULT FUNCTION = lww | dlq | udf
// < FUNCTION 'udf' FOR TABLE local_name , ... > |
// < IGNORE_CDC_IGNORED_TTL_DELETES >
// < DISCARD = 'ttl-deletes' >
// ]
create_logical_replication_stream_stmt:
CREATE LOGICAL REPLICATION STREAM FROM logical_replication_resources ON string_or_placeholder INTO logical_replication_resources opt_logical_replication_options
Expand Down Expand Up @@ -4733,9 +4733,9 @@ logical_replication_options:
{
$$.val = &tree.LogicalReplicationOptions{UserFunctions: map[tree.UnresolvedName]tree.RoutineName{*$5.unresolvedObjectName().ToUnresolvedName():$2.unresolvedObjectName().ToRoutineName()}}
}
| IGNORE_CDC_IGNORED_TTL_DELETES
| DISCARD '=' string_or_placeholder
{
$$.val = &tree.LogicalReplicationOptions{IgnoreCDCIgnoredTTLDeletes: tree.MakeDBool(true)}
$$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()}
}
| SKIP SCHEMA CHECK
{
Expand Down Expand Up @@ -17745,7 +17745,6 @@ unreserved_keyword:
| NOWAIT
| NULLS
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| INSENSITIVE
| OF
| OFF
Expand Down Expand Up @@ -18173,7 +18172,6 @@ bare_label_keywords:
| IFERROR
| IFNULL
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| ILIKE
| IMMEDIATE
| IMMEDIATELY
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/parser/testdata/create_logical_replication
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON '_' INTO TABLE foo WITH OPTI
CREATE LOGICAL REPLICATION STREAM FROM TABLE _ ON 'uri' INTO TABLE _ WITH OPTIONS (CURSOR = '1536242855577149065.0000000000', DEFAULT FUNCTION = 'lww', MODE = 'immediate', FUNCTION _ FOR TABLE _, FUNCTION _ FOR TABLE _) -- identifiers removed

parse
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES;
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', DISCARD = 'ttl-deletes';
----
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- normalized!
CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), IGNORE_CDC_IGNORED_TTL_DELETES) -- fully parenthesized
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', IGNORE_CDC_IGNORED_TTL_DELETES) -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- identifiers removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- normalized!
CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), DISCARD = ('ttl-deletes')) -- fully parenthesized
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', DISCARD = '_') -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- identifiers removed

error
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo, bar ON 'uri' INTO TABLE foo, bar;
Expand Down
30 changes: 16 additions & 14 deletions pkg/sql/sem/tree/create_logical_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ type LogicalReplicationResources struct {

type LogicalReplicationOptions struct {
// Mapping of table name to UDF name
UserFunctions map[UnresolvedName]RoutineName
Cursor Expr
MetricsLabel Expr
Mode Expr
DefaultFunction Expr
IgnoreCDCIgnoredTTLDeletes *DBool
SkipSchemaCheck *DBool
UserFunctions map[UnresolvedName]RoutineName
Cursor Expr
MetricsLabel Expr
Mode Expr
DefaultFunction Expr
Discard Expr
SkipSchemaCheck *DBool
}

var _ Statement = &CreateLogicalReplicationStream{}
Expand Down Expand Up @@ -122,10 +122,12 @@ func (lro *LogicalReplicationOptions) Format(ctx *FmtCtx) {
ctx.FormatNode(&k)
}
}
if lro.IgnoreCDCIgnoredTTLDeletes != nil && *lro.IgnoreCDCIgnoredTTLDeletes {
if lro.Discard != nil {
maybeAddSep()
ctx.WriteString("IGNORE_CDC_IGNORED_TTL_DELETES")
ctx.WriteString("DISCARD = ")
ctx.FormatNode(lro.Discard)
}

if lro.SkipSchemaCheck != nil && *lro.SkipSchemaCheck {
maybeAddSep()
ctx.WriteString("SKIP SCHEMA CHECK")
Expand Down Expand Up @@ -176,12 +178,12 @@ func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions
}
}

if o.IgnoreCDCIgnoredTTLDeletes != nil {
if other.IgnoreCDCIgnoredTTLDeletes != nil {
return errors.New("IGNORE_CDC_IGNORED_TTL_DELETES option specified multiple times")
if o.Discard != nil {
if other.Discard != nil {
return errors.New("DISCARD option specified multiple times")
}
} else {
o.IgnoreCDCIgnoredTTLDeletes = other.IgnoreCDCIgnoredTTLDeletes
o.Discard = other.Discard
}
if o.SkipSchemaCheck != nil {
if other.SkipSchemaCheck != nil {
Expand Down Expand Up @@ -209,7 +211,7 @@ func (o LogicalReplicationOptions) IsDefault() bool {
o.Mode == options.Mode &&
o.DefaultFunction == options.DefaultFunction &&
o.UserFunctions == nil &&
o.IgnoreCDCIgnoredTTLDeletes == options.IgnoreCDCIgnoredTTLDeletes &&
o.Discard == options.Discard &&
o.SkipSchemaCheck == options.SkipSchemaCheck &&
o.MetricsLabel == options.MetricsLabel
}

0 comments on commit 16c9cfb

Please sign in to comment.