From ac6577ca2ebe8ef391fad3bb2c799705a9a741d0 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 7 Oct 2024 22:13:50 +0000 Subject: [PATCH] crosscluster/logical: rename IGNORE_CDC_IGNORED_TTL_DELETES to DISCARD With more filtering modes expected or at least plausible in the future, this option name was too specific to one type of filtering to be reused. Instead we now accept a 'discard' option that can be passed different modes in the future. Release note: none. Epic: none. --- docs/generated/sql/bnf/stmt_block.bnf | 2 - .../create_logical_replication_stmt.go | 56 ++++++++++++------- .../logical/logical_replication_dist.go | 4 +- .../logical/logical_replication_job.go | 2 +- .../logical/logical_replication_job_test.go | 6 +- .../logical_replication_writer_processor.go | 4 +- pkg/jobs/jobspb/jobs.proto | 11 +++- pkg/sql/execinfrapb/processors_bulk_io.proto | 9 +-- pkg/sql/parser/sql.y | 10 ++-- .../testdata/create_logical_replication | 10 ++-- .../sem/tree/create_logical_replication.go | 30 +++++----- 11 files changed, 82 insertions(+), 62 deletions(-) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index a3dd26e247d7..29a4aab3af77 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1306,7 +1306,6 @@ unreserved_keyword ::= | 'NOWAIT' | 'NULLS' | 'IGNORE_FOREIGN_KEYS' - | 'IGNORE_CDC_IGNORED_TTL_DELETES' | 'INSENSITIVE' | 'OF' | 'OFF' @@ -3879,7 +3878,6 @@ bare_label_keywords ::= | 'IFERROR' | 'IFNULL' | 'IGNORE_FOREIGN_KEYS' - | 'IGNORE_CDC_IGNORED_TTL_DELETES' | 'ILIKE' | 'IMMEDIATE' | 'IMMEDIATELY' diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 4182000f50cd..8961ee67d2bd 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -124,6 +124,16 @@ func createLogicalReplicationStreamPlanHook( mode = jobspb.LogicalReplicationDetails_Validated } + discard := jobspb.LogicalReplicationDetails_DiscardNothing + if m, ok := options.Discard(); ok { + switch m { + case "ttl-deletes": + discard = jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes + default: + return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m) + } + } + var ( targetsDescription string srcTableNames = make([]string, len(stmt.From.Tables)) @@ -266,16 +276,16 @@ func createLogicalReplicationStreamPlanHook( Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI), Username: p.User(), Details: jobspb.LogicalReplicationDetails{ - StreamID: uint64(spec.StreamID), - SourceClusterID: spec.SourceClusterID, - ReplicationStartTime: replicationStartTime, - SourceClusterConnStr: string(streamAddress), - ReplicationPairs: repPairs, - TableNames: srcTableNames, - DefaultConflictResolution: defaultConflictResolution, - IgnoreCDCIgnoredTTLDeletes: options.IgnoreCDCIgnoredTTLDeletes(), - Mode: mode, - MetricsLabel: options.metricsLabel, + StreamID: uint64(spec.StreamID), + SourceClusterID: spec.SourceClusterID, + ReplicationStartTime: replicationStartTime, + SourceClusterConnStr: string(streamAddress), + ReplicationPairs: repPairs, + TableNames: srcTableNames, + DefaultConflictResolution: defaultConflictResolution, + Discard: discard, + Mode: mode, + MetricsLabel: options.metricsLabel, }, Progress: progress, } @@ -317,9 +327,9 @@ func createLogicalReplicationStreamTypeCheck( stmt.Options.DefaultFunction, stmt.Options.Mode, stmt.Options.MetricsLabel, + stmt.Options.Discard, }, exprutil.Bools{ - stmt.Options.IgnoreCDCIgnoredTTLDeletes, stmt.Options.SkipSchemaCheck, }, } @@ -337,10 +347,10 @@ type resolvedLogicalReplicationOptions struct { mode string defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution // Mapping of table name to function descriptor - userFunctions map[string]int32 - ignoreCDCIgnoredTTLDeletes bool - skipSchemaCheck bool - metricsLabel string + userFunctions map[string]int32 + discard string + skipSchemaCheck bool + metricsLabel string } func evalLogicalReplicationOptions( @@ -422,8 +432,12 @@ func evalLogicalReplicationOptions( } } - if options.IgnoreCDCIgnoredTTLDeletes == tree.DBoolTrue { - r.ignoreCDCIgnoredTTLDeletes = true + if options.Discard != nil { + discard, err := eval.String(ctx, options.Discard) + if err != nil { + return nil, err + } + r.discard = discard } if options.SkipSchemaCheck == tree.DBoolTrue { r.skipSchemaCheck = true @@ -480,11 +494,11 @@ func (r *resolvedLogicalReplicationOptions) GetUserFunctions() (map[string]int32 return r.userFunctions, true } -func (r *resolvedLogicalReplicationOptions) IgnoreCDCIgnoredTTLDeletes() bool { - if r == nil { - return false +func (r *resolvedLogicalReplicationOptions) Discard() (string, bool) { + if r == nil || r.discard == "" { + return "", false } - return r.ignoreCDCIgnoredTTLDeletes + return r.discard, true } func (r *resolvedLogicalReplicationOptions) SkipSchemaCheck() bool { diff --git a/pkg/ccl/crosscluster/logical/logical_replication_dist.go b/pkg/ccl/crosscluster/logical/logical_replication_dist.go index f4fbc54b2943..404c1f540c62 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_dist.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_dist.go @@ -32,7 +32,7 @@ func constructLogicalReplicationWriterSpecs( tableMetadataByDestID map[int32]execinfrapb.TableReplicationMetadata, jobID jobspb.JobID, streamID streampb.StreamID, - ignoreCDCIgnoredTTLDeletes bool, + discard jobspb.LogicalReplicationDetails_Discard, mode jobspb.LogicalReplicationDetails_ApplyMode, metricsLabel string, ) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) { @@ -45,7 +45,7 @@ func constructLogicalReplicationWriterSpecs( Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info StreamAddress: string(streamAddress), TableMetadataByDestID: tableMetadataByDestID, - IgnoreCDCIgnoredTTLDeletes: ignoreCDCIgnoredTTLDeletes, + Discard: discard, Mode: mode, MetricsLabel: metricsLabel, } diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 4237e3e7b690..b90a733a1b1d 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -420,7 +420,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl( tableMetadataByDestID, p.job.ID(), streampb.StreamID(payload.StreamID), - payload.IgnoreCDCIgnoredTTLDeletes, + payload.Discard, payload.Mode, payload.MetricsLabel, ) diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index ca22de328a57..0856f04bb92b 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -659,7 +659,7 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) { ) dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String()).Scan(&jobAID) - dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH IGNORE_CDC_IGNORED_TTL_DELETES", dbAURL.String()).Scan(&jobBID) + dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH DISCARD = 'ttl-deletes'", dbAURL.String()).Scan(&jobBID) now := server.Server(0).Clock().Now() t.Logf("waiting for replication job %d", jobAID) @@ -669,10 +669,10 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) { // Verify that Job contains FilterRangeFeed details := jobutils.GetJobPayload(t, dbA, jobAID).GetLogicalReplicationDetails() - require.False(t, details.IgnoreCDCIgnoredTTLDeletes) + require.False(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) details = jobutils.GetJobPayload(t, dbB, jobBID).GetLogicalReplicationDetails() - require.True(t, details.IgnoreCDCIgnoredTTLDeletes) + require.True(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) require.Equal(t, len(filterVal), 2) diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index c944c275d826..1e380a3c4802 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -301,7 +301,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.IgnoreCDCIgnoredTTLDeletes) + streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) } } sub, err := streamClient.Subscribe(ctx, @@ -309,7 +309,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { int32(lrw.FlowCtx.NodeID.SQLInstanceID()), lrw.ProcessorID, token, lrw.spec.InitialScanTimestamp, lrw.frontier, - streamclient.WithFiltering(lrw.spec.IgnoreCDCIgnoredTTLDeletes), + streamclient.WithFiltering(lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes), streamclient.WithDiff(true), ) if err != nil { diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index eed415de0b21..dc91819fd01f 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -252,7 +252,7 @@ message LogicalReplicationDetails { } DefaultConflictResolution default_conflict_resolution = 7 [(gogoproto.nullable) = false]; - bool ignore_cdc_ignored_ttl_deletes = 8 [(gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"]; + reserved 8; enum ApplyMode { Immediate = 0; @@ -263,7 +263,14 @@ message LogicalReplicationDetails { string metrics_label = 10; - // Next ID: 11. + enum Discard { + DiscardNothing = 0; + DiscardCDCIgnoredTTLDeletes = 1; + } + + Discard discard = 11; + + // Next ID: 12. } message LogicalReplicationProgress { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 4a5966a02c76..41c51217ca4c 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -527,13 +527,14 @@ message LogicalReplicationWriterSpec { // names. map table_metadata_by_dest_id = 8 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetadataByDestID"]; - // IgnoreCDCIgnoredTTLDeletes is an option on whether to filter out 'OmitinRangefeed' events - // when processing changes in LDR - optional bool ignore_cdc_ignored_ttl_deletes = 9 [(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"]; + reserved 9; + + // Discard is an option on whether to filter some events. + optional jobs.jobspb.LogicalReplicationDetails.Discard discard = 12 [(gogoproto.nullable) = false]; optional jobs.jobspb.LogicalReplicationDetails.ApplyMode mode = 10 [(gogoproto.nullable) = false]; optional string metrics_label = 11 [(gogoproto.nullable) = false]; - // Next ID: 12. + // Next ID: 13. } diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index c19f61e17f73..a78d12f2bd7b 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -974,7 +974,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach { %token HAVING HASH HEADER HIGH HISTOGRAM HOLD HOUR %token IDENTITY -%token IF IFERROR IFNULL IGNORE_FOREIGN_KEYS IGNORE_CDC_IGNORED_TTL_DELETES ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE +%token IF IFERROR IFNULL IGNORE_FOREIGN_KEYS ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE %token INCLUDING INCLUDE_ALL_SECONDARY_TENANTS INCLUDE_ALL_VIRTUAL_CLUSTERS INCREMENT INCREMENTAL INCREMENTAL_LOCATION %token INET INET_CONTAINED_BY_OR_EQUALS %token INET_CONTAINS_OR_EQUALS INDEX INDEXES INHERITS INJECT INITIALLY @@ -4635,7 +4635,7 @@ create_stmt: // < CURSOR = start_time > | // < DEFAULT FUNCTION = lww | dlq | udf // < FUNCTION 'udf' FOR TABLE local_name , ... > | -// < IGNORE_CDC_IGNORED_TTL_DELETES > +// < DISCARD = 'ttl-deletes' > // ] create_logical_replication_stream_stmt: CREATE LOGICAL REPLICATION STREAM FROM logical_replication_resources ON string_or_placeholder INTO logical_replication_resources opt_logical_replication_options @@ -4733,9 +4733,9 @@ logical_replication_options: { $$.val = &tree.LogicalReplicationOptions{UserFunctions: map[tree.UnresolvedName]tree.RoutineName{*$5.unresolvedObjectName().ToUnresolvedName():$2.unresolvedObjectName().ToRoutineName()}} } -| IGNORE_CDC_IGNORED_TTL_DELETES + | DISCARD '=' string_or_placeholder { - $$.val = &tree.LogicalReplicationOptions{IgnoreCDCIgnoredTTLDeletes: tree.MakeDBool(true)} + $$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()} } | SKIP SCHEMA CHECK { @@ -17745,7 +17745,6 @@ unreserved_keyword: | NOWAIT | NULLS | IGNORE_FOREIGN_KEYS -| IGNORE_CDC_IGNORED_TTL_DELETES | INSENSITIVE | OF | OFF @@ -18173,7 +18172,6 @@ bare_label_keywords: | IFERROR | IFNULL | IGNORE_FOREIGN_KEYS -| IGNORE_CDC_IGNORED_TTL_DELETES | ILIKE | IMMEDIATE | IMMEDIATELY diff --git a/pkg/sql/parser/testdata/create_logical_replication b/pkg/sql/parser/testdata/create_logical_replication index d0c38da5cff9..54bf1ea8d825 100644 --- a/pkg/sql/parser/testdata/create_logical_replication +++ b/pkg/sql/parser/testdata/create_logical_replication @@ -55,12 +55,12 @@ CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON '_' INTO TABLE foo WITH OPTI CREATE LOGICAL REPLICATION STREAM FROM TABLE _ ON 'uri' INTO TABLE _ WITH OPTIONS (CURSOR = '1536242855577149065.0000000000', DEFAULT FUNCTION = 'lww', MODE = 'immediate', FUNCTION _ FOR TABLE _, FUNCTION _ FOR TABLE _) -- identifiers removed parse -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES; +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', DISCARD = 'ttl-deletes'; ---- -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- normalized! -CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), IGNORE_CDC_IGNORED_TTL_DELETES) -- fully parenthesized -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', IGNORE_CDC_IGNORED_TTL_DELETES) -- literals removed -CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- identifiers removed +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- normalized! +CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), DISCARD = ('ttl-deletes')) -- fully parenthesized +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', DISCARD = '_') -- literals removed +CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- identifiers removed error CREATE LOGICAL REPLICATION STREAM FROM TABLE foo, bar ON 'uri' INTO TABLE foo, bar; diff --git a/pkg/sql/sem/tree/create_logical_replication.go b/pkg/sql/sem/tree/create_logical_replication.go index 5a0f4a799476..c92235855957 100644 --- a/pkg/sql/sem/tree/create_logical_replication.go +++ b/pkg/sql/sem/tree/create_logical_replication.go @@ -25,13 +25,13 @@ type LogicalReplicationResources struct { type LogicalReplicationOptions struct { // Mapping of table name to UDF name - UserFunctions map[UnresolvedName]RoutineName - Cursor Expr - MetricsLabel Expr - Mode Expr - DefaultFunction Expr - IgnoreCDCIgnoredTTLDeletes *DBool - SkipSchemaCheck *DBool + UserFunctions map[UnresolvedName]RoutineName + Cursor Expr + MetricsLabel Expr + Mode Expr + DefaultFunction Expr + Discard Expr + SkipSchemaCheck *DBool } var _ Statement = &CreateLogicalReplicationStream{} @@ -122,10 +122,12 @@ func (lro *LogicalReplicationOptions) Format(ctx *FmtCtx) { ctx.FormatNode(&k) } } - if lro.IgnoreCDCIgnoredTTLDeletes != nil && *lro.IgnoreCDCIgnoredTTLDeletes { + if lro.Discard != nil { maybeAddSep() - ctx.WriteString("IGNORE_CDC_IGNORED_TTL_DELETES") + ctx.WriteString("DISCARD = ") + ctx.FormatNode(lro.Discard) } + if lro.SkipSchemaCheck != nil && *lro.SkipSchemaCheck { maybeAddSep() ctx.WriteString("SKIP SCHEMA CHECK") @@ -176,12 +178,12 @@ func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions } } - if o.IgnoreCDCIgnoredTTLDeletes != nil { - if other.IgnoreCDCIgnoredTTLDeletes != nil { - return errors.New("IGNORE_CDC_IGNORED_TTL_DELETES option specified multiple times") + if o.Discard != nil { + if other.Discard != nil { + return errors.New("DISCARD option specified multiple times") } } else { - o.IgnoreCDCIgnoredTTLDeletes = other.IgnoreCDCIgnoredTTLDeletes + o.Discard = other.Discard } if o.SkipSchemaCheck != nil { if other.SkipSchemaCheck != nil { @@ -209,7 +211,7 @@ func (o LogicalReplicationOptions) IsDefault() bool { o.Mode == options.Mode && o.DefaultFunction == options.DefaultFunction && o.UserFunctions == nil && - o.IgnoreCDCIgnoredTTLDeletes == options.IgnoreCDCIgnoredTTLDeletes && + o.Discard == options.Discard && o.SkipSchemaCheck == options.SkipSchemaCheck && o.MetricsLabel == options.MetricsLabel }