diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 5d958694b7bc..33c347813313 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1306,7 +1306,6 @@ unreserved_keyword ::= | 'NOWAIT' | 'NULLS' | 'IGNORE_FOREIGN_KEYS' - | 'IGNORE_CDC_IGNORED_TTL_DELETES' | 'INSENSITIVE' | 'OF' | 'OFF' @@ -3879,7 +3878,6 @@ bare_label_keywords ::= | 'IFERROR' | 'IFNULL' | 'IGNORE_FOREIGN_KEYS' - | 'IGNORE_CDC_IGNORED_TTL_DELETES' | 'ILIKE' | 'IMMEDIATE' | 'IMMEDIATELY' diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index dff57cd904ac..ada46ff9cd51 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -124,6 +124,16 @@ func createLogicalReplicationStreamPlanHook( mode = jobspb.LogicalReplicationDetails_Validated } + discard := jobspb.LogicalReplicationDetails_DiscardNothing + if m, ok := options.Discard(); ok { + switch m { + case "ttl-deletes": + discard = jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes + default: + return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m) + } + } + var ( targetsDescription string srcTableNames = make([]string, len(stmt.From.Tables)) @@ -262,16 +272,16 @@ func createLogicalReplicationStreamPlanHook( Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI), Username: p.User(), Details: jobspb.LogicalReplicationDetails{ - StreamID: uint64(spec.StreamID), - SourceClusterID: spec.SourceClusterID, - ReplicationStartTime: replicationStartTime, - SourceClusterConnStr: string(streamAddress), - ReplicationPairs: repPairs, - TableNames: srcTableNames, - DefaultConflictResolution: defaultConflictResolution, - IgnoreCDCIgnoredTTLDeletes: options.IgnoreCDCIgnoredTTLDeletes(), - Mode: mode, - MetricsLabel: options.metricsLabel, + StreamID: uint64(spec.StreamID), + SourceClusterID: spec.SourceClusterID, + ReplicationStartTime: replicationStartTime, + SourceClusterConnStr: string(streamAddress), + ReplicationPairs: repPairs, + TableNames: srcTableNames, + DefaultConflictResolution: defaultConflictResolution, + Discard: discard, + Mode: mode, + MetricsLabel: options.metricsLabel, }, Progress: progress, } @@ -313,9 +323,9 @@ func createLogicalReplicationStreamTypeCheck( stmt.Options.DefaultFunction, stmt.Options.Mode, stmt.Options.MetricsLabel, + stmt.Options.Discard, }, exprutil.Bools{ - stmt.Options.IgnoreCDCIgnoredTTLDeletes, stmt.Options.SkipSchemaCheck, }, } @@ -333,10 +343,10 @@ type resolvedLogicalReplicationOptions struct { mode string defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution // Mapping of table name to function descriptor - userFunctions map[string]int32 - ignoreCDCIgnoredTTLDeletes bool - skipSchemaCheck bool - metricsLabel string + userFunctions map[string]int32 + discard string + skipSchemaCheck bool + metricsLabel string } func evalLogicalReplicationOptions( @@ -418,8 +428,12 @@ func evalLogicalReplicationOptions( } } - if options.IgnoreCDCIgnoredTTLDeletes == tree.DBoolTrue { - r.ignoreCDCIgnoredTTLDeletes = true + if options.Discard != nil { + discard, err := eval.String(ctx, options.Discard) + if err != nil { + return nil, err + } + r.discard = discard } if options.SkipSchemaCheck == tree.DBoolTrue { r.skipSchemaCheck = true @@ -476,11 +490,11 @@ func (r *resolvedLogicalReplicationOptions) GetUserFunctions() (map[string]int32 return r.userFunctions, true } -func (r *resolvedLogicalReplicationOptions) IgnoreCDCIgnoredTTLDeletes() bool { - if r == nil { - return false +func (r *resolvedLogicalReplicationOptions) Discard() (string, bool) { + if r == nil || r.discard == "" { + return "", false } - return r.ignoreCDCIgnoredTTLDeletes + return r.discard, true } func (r *resolvedLogicalReplicationOptions) SkipSchemaCheck() bool { diff --git a/pkg/ccl/crosscluster/logical/logical_replication_dist.go b/pkg/ccl/crosscluster/logical/logical_replication_dist.go index f4fbc54b2943..404c1f540c62 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_dist.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_dist.go @@ -32,7 +32,7 @@ func constructLogicalReplicationWriterSpecs( tableMetadataByDestID map[int32]execinfrapb.TableReplicationMetadata, jobID jobspb.JobID, streamID streampb.StreamID, - ignoreCDCIgnoredTTLDeletes bool, + discard jobspb.LogicalReplicationDetails_Discard, mode jobspb.LogicalReplicationDetails_ApplyMode, metricsLabel string, ) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) { @@ -45,7 +45,7 @@ func constructLogicalReplicationWriterSpecs( Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info StreamAddress: string(streamAddress), TableMetadataByDestID: tableMetadataByDestID, - IgnoreCDCIgnoredTTLDeletes: ignoreCDCIgnoredTTLDeletes, + Discard: discard, Mode: mode, MetricsLabel: metricsLabel, } diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 2075dfa142df..9f29c615ff1a 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -430,7 +430,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl( tableMetadataByDestID, p.job.ID(), streampb.StreamID(payload.StreamID), - payload.IgnoreCDCIgnoredTTLDeletes, + payload.Discard, payload.Mode, payload.MetricsLabel, ) diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index c70ef91e556f..3025172725c6 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -659,7 +659,7 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) { ) dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String()).Scan(&jobAID) - dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH IGNORE_CDC_IGNORED_TTL_DELETES", dbAURL.String()).Scan(&jobBID) + dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH DISCARD = 'ttl-deletes'", dbAURL.String()).Scan(&jobBID) now := server.Server(0).Clock().Now() t.Logf("waiting for replication job %d", jobAID) @@ -669,10 +669,10 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) { // Verify that Job contains FilterRangeFeed details := jobutils.GetJobPayload(t, dbA, jobAID).GetLogicalReplicationDetails() - require.False(t, details.IgnoreCDCIgnoredTTLDeletes) + require.False(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) details = jobutils.GetJobPayload(t, dbB, jobBID).GetLogicalReplicationDetails() - require.True(t, details.IgnoreCDCIgnoredTTLDeletes) + require.True(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) require.Equal(t, len(filterVal), 2) diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index c944c275d826..1e380a3c4802 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -301,7 +301,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.IgnoreCDCIgnoredTTLDeletes) + streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes) } } sub, err := streamClient.Subscribe(ctx, @@ -309,7 +309,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { int32(lrw.FlowCtx.NodeID.SQLInstanceID()), lrw.ProcessorID, token, lrw.spec.InitialScanTimestamp, lrw.frontier, - streamclient.WithFiltering(lrw.spec.IgnoreCDCIgnoredTTLDeletes), + streamclient.WithFiltering(lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes), streamclient.WithDiff(true), ) if err != nil { diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 2a3a8faea23f..5eea39726242 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -254,7 +254,7 @@ message LogicalReplicationDetails { } DefaultConflictResolution default_conflict_resolution = 7 [(gogoproto.nullable) = false]; - bool ignore_cdc_ignored_ttl_deletes = 8 [(gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"]; + reserved 8; enum ApplyMode { Immediate = 0; @@ -265,7 +265,14 @@ message LogicalReplicationDetails { string metrics_label = 10; - // Next ID: 11. + enum Discard { + DiscardNothing = 0; + DiscardCDCIgnoredTTLDeletes = 1; + } + + Discard discard = 11; + + // Next ID: 12. } message LogicalReplicationProgress { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 4a5966a02c76..41c51217ca4c 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -527,13 +527,14 @@ message LogicalReplicationWriterSpec { // names. map table_metadata_by_dest_id = 8 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetadataByDestID"]; - // IgnoreCDCIgnoredTTLDeletes is an option on whether to filter out 'OmitinRangefeed' events - // when processing changes in LDR - optional bool ignore_cdc_ignored_ttl_deletes = 9 [(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"]; + reserved 9; + + // Discard is an option on whether to filter some events. + optional jobs.jobspb.LogicalReplicationDetails.Discard discard = 12 [(gogoproto.nullable) = false]; optional jobs.jobspb.LogicalReplicationDetails.ApplyMode mode = 10 [(gogoproto.nullable) = false]; optional string metrics_label = 11 [(gogoproto.nullable) = false]; - // Next ID: 12. + // Next ID: 13. } diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index abcd6841aec2..7c3284f762f3 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -974,7 +974,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach { %token HAVING HASH HEADER HIGH HISTOGRAM HOLD HOUR %token IDENTITY -%token IF IFERROR IFNULL IGNORE_FOREIGN_KEYS IGNORE_CDC_IGNORED_TTL_DELETES ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE +%token IF IFERROR IFNULL IGNORE_FOREIGN_KEYS ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE %token INCLUDING INCLUDE_ALL_SECONDARY_TENANTS INCLUDE_ALL_VIRTUAL_CLUSTERS INCREMENT INCREMENTAL INCREMENTAL_LOCATION %token INET INET_CONTAINED_BY_OR_EQUALS %token INET_CONTAINS_OR_EQUALS INDEX INDEXES INHERITS INJECT INITIALLY @@ -4635,7 +4635,7 @@ create_stmt: // < CURSOR = start_time > | // < DEFAULT FUNCTION = lww | dlq | udf // < FUNCTION 'udf' FOR TABLE local_name , ... > | -// < IGNORE_CDC_IGNORED_TTL_DELETES > +// < DISCARD = 'ttl-deletes' > // ] create_logical_replication_stream_stmt: CREATE LOGICAL REPLICATION STREAM FROM logical_replication_resources ON string_or_placeholder INTO logical_replication_resources opt_logical_replication_options @@ -4733,9 +4733,9 @@ logical_replication_options: { $$.val = &tree.LogicalReplicationOptions{UserFunctions: map[tree.UnresolvedName]tree.RoutineName{*$5.unresolvedObjectName().ToUnresolvedName():$2.unresolvedObjectName().ToRoutineName()}} } -| IGNORE_CDC_IGNORED_TTL_DELETES + | DISCARD '=' string_or_placeholder { - $$.val = &tree.LogicalReplicationOptions{IgnoreCDCIgnoredTTLDeletes: tree.MakeDBool(true)} + $$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()} } | SKIP SCHEMA CHECK { @@ -17745,7 +17745,6 @@ unreserved_keyword: | NOWAIT | NULLS | IGNORE_FOREIGN_KEYS -| IGNORE_CDC_IGNORED_TTL_DELETES | INSENSITIVE | OF | OFF @@ -18173,7 +18172,6 @@ bare_label_keywords: | IFERROR | IFNULL | IGNORE_FOREIGN_KEYS -| IGNORE_CDC_IGNORED_TTL_DELETES | ILIKE | IMMEDIATE | IMMEDIATELY diff --git a/pkg/sql/parser/testdata/create_logical_replication b/pkg/sql/parser/testdata/create_logical_replication index d0c38da5cff9..54bf1ea8d825 100644 --- a/pkg/sql/parser/testdata/create_logical_replication +++ b/pkg/sql/parser/testdata/create_logical_replication @@ -55,12 +55,12 @@ CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON '_' INTO TABLE foo WITH OPTI CREATE LOGICAL REPLICATION STREAM FROM TABLE _ ON 'uri' INTO TABLE _ WITH OPTIONS (CURSOR = '1536242855577149065.0000000000', DEFAULT FUNCTION = 'lww', MODE = 'immediate', FUNCTION _ FOR TABLE _, FUNCTION _ FOR TABLE _) -- identifiers removed parse -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES; +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', DISCARD = 'ttl-deletes'; ---- -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- normalized! -CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), IGNORE_CDC_IGNORED_TTL_DELETES) -- fully parenthesized -CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', IGNORE_CDC_IGNORED_TTL_DELETES) -- literals removed -CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- identifiers removed +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- normalized! +CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), DISCARD = ('ttl-deletes')) -- fully parenthesized +CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', DISCARD = '_') -- literals removed +CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- identifiers removed error CREATE LOGICAL REPLICATION STREAM FROM TABLE foo, bar ON 'uri' INTO TABLE foo, bar; diff --git a/pkg/sql/sem/tree/create_logical_replication.go b/pkg/sql/sem/tree/create_logical_replication.go index 5a0f4a799476..c92235855957 100644 --- a/pkg/sql/sem/tree/create_logical_replication.go +++ b/pkg/sql/sem/tree/create_logical_replication.go @@ -25,13 +25,13 @@ type LogicalReplicationResources struct { type LogicalReplicationOptions struct { // Mapping of table name to UDF name - UserFunctions map[UnresolvedName]RoutineName - Cursor Expr - MetricsLabel Expr - Mode Expr - DefaultFunction Expr - IgnoreCDCIgnoredTTLDeletes *DBool - SkipSchemaCheck *DBool + UserFunctions map[UnresolvedName]RoutineName + Cursor Expr + MetricsLabel Expr + Mode Expr + DefaultFunction Expr + Discard Expr + SkipSchemaCheck *DBool } var _ Statement = &CreateLogicalReplicationStream{} @@ -122,10 +122,12 @@ func (lro *LogicalReplicationOptions) Format(ctx *FmtCtx) { ctx.FormatNode(&k) } } - if lro.IgnoreCDCIgnoredTTLDeletes != nil && *lro.IgnoreCDCIgnoredTTLDeletes { + if lro.Discard != nil { maybeAddSep() - ctx.WriteString("IGNORE_CDC_IGNORED_TTL_DELETES") + ctx.WriteString("DISCARD = ") + ctx.FormatNode(lro.Discard) } + if lro.SkipSchemaCheck != nil && *lro.SkipSchemaCheck { maybeAddSep() ctx.WriteString("SKIP SCHEMA CHECK") @@ -176,12 +178,12 @@ func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions } } - if o.IgnoreCDCIgnoredTTLDeletes != nil { - if other.IgnoreCDCIgnoredTTLDeletes != nil { - return errors.New("IGNORE_CDC_IGNORED_TTL_DELETES option specified multiple times") + if o.Discard != nil { + if other.Discard != nil { + return errors.New("DISCARD option specified multiple times") } } else { - o.IgnoreCDCIgnoredTTLDeletes = other.IgnoreCDCIgnoredTTLDeletes + o.Discard = other.Discard } if o.SkipSchemaCheck != nil { if other.SkipSchemaCheck != nil { @@ -209,7 +211,7 @@ func (o LogicalReplicationOptions) IsDefault() bool { o.Mode == options.Mode && o.DefaultFunction == options.DefaultFunction && o.UserFunctions == nil && - o.IgnoreCDCIgnoredTTLDeletes == options.IgnoreCDCIgnoredTTLDeletes && + o.Discard == options.Discard && o.SkipSchemaCheck == options.SkipSchemaCheck && o.MetricsLabel == options.MetricsLabel }