Skip to content

Commit

Permalink
crosscluster/logical: rename IGNORE_CDC_IGNORED_TTL_DELETES to DISCARD
Browse files Browse the repository at this point in the history
With more filtering modes expected or at least plausible in the future, this
option name was too specific to one type of filtering to be reused. Instead we
now accept a 'discard' option that can be passed different modes in the future.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Oct 7, 2024
1 parent aa06370 commit ac6577c
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 62 deletions.
2 changes: 0 additions & 2 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,6 @@ unreserved_keyword ::=
| 'NOWAIT'
| 'NULLS'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'INSENSITIVE'
| 'OF'
| 'OFF'
Expand Down Expand Up @@ -3879,7 +3878,6 @@ bare_label_keywords ::=
| 'IFERROR'
| 'IFNULL'
| 'IGNORE_FOREIGN_KEYS'
| 'IGNORE_CDC_IGNORED_TTL_DELETES'
| 'ILIKE'
| 'IMMEDIATE'
| 'IMMEDIATELY'
Expand Down
56 changes: 35 additions & 21 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func createLogicalReplicationStreamPlanHook(
mode = jobspb.LogicalReplicationDetails_Validated
}

discard := jobspb.LogicalReplicationDetails_DiscardNothing
if m, ok := options.Discard(); ok {
switch m {
case "ttl-deletes":
discard = jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes
default:
return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m)
}
}

var (
targetsDescription string
srcTableNames = make([]string, len(stmt.From.Tables))
Expand Down Expand Up @@ -266,16 +276,16 @@ func createLogicalReplicationStreamPlanHook(
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
IgnoreCDCIgnoredTTLDeletes: options.IgnoreCDCIgnoredTTLDeletes(),
Mode: mode,
MetricsLabel: options.metricsLabel,
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
},
Progress: progress,
}
Expand Down Expand Up @@ -317,9 +327,9 @@ func createLogicalReplicationStreamTypeCheck(
stmt.Options.DefaultFunction,
stmt.Options.Mode,
stmt.Options.MetricsLabel,
stmt.Options.Discard,
},
exprutil.Bools{
stmt.Options.IgnoreCDCIgnoredTTLDeletes,
stmt.Options.SkipSchemaCheck,
},
}
Expand All @@ -337,10 +347,10 @@ type resolvedLogicalReplicationOptions struct {
mode string
defaultFunction *jobspb.LogicalReplicationDetails_DefaultConflictResolution
// Mapping of table name to function descriptor
userFunctions map[string]int32
ignoreCDCIgnoredTTLDeletes bool
skipSchemaCheck bool
metricsLabel string
userFunctions map[string]int32
discard string
skipSchemaCheck bool
metricsLabel string
}

func evalLogicalReplicationOptions(
Expand Down Expand Up @@ -422,8 +432,12 @@ func evalLogicalReplicationOptions(
}
}

if options.IgnoreCDCIgnoredTTLDeletes == tree.DBoolTrue {
r.ignoreCDCIgnoredTTLDeletes = true
if options.Discard != nil {
discard, err := eval.String(ctx, options.Discard)
if err != nil {
return nil, err
}
r.discard = discard
}
if options.SkipSchemaCheck == tree.DBoolTrue {
r.skipSchemaCheck = true
Expand Down Expand Up @@ -480,11 +494,11 @@ func (r *resolvedLogicalReplicationOptions) GetUserFunctions() (map[string]int32
return r.userFunctions, true
}

func (r *resolvedLogicalReplicationOptions) IgnoreCDCIgnoredTTLDeletes() bool {
if r == nil {
return false
func (r *resolvedLogicalReplicationOptions) Discard() (string, bool) {
if r == nil || r.discard == "" {
return "", false
}
return r.ignoreCDCIgnoredTTLDeletes
return r.discard, true
}

func (r *resolvedLogicalReplicationOptions) SkipSchemaCheck() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/crosscluster/logical/logical_replication_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func constructLogicalReplicationWriterSpecs(
tableMetadataByDestID map[int32]execinfrapb.TableReplicationMetadata,
jobID jobspb.JobID,
streamID streampb.StreamID,
ignoreCDCIgnoredTTLDeletes bool,
discard jobspb.LogicalReplicationDetails_Discard,
mode jobspb.LogicalReplicationDetails_ApplyMode,
metricsLabel string,
) (map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, error) {
Expand All @@ -45,7 +45,7 @@ func constructLogicalReplicationWriterSpecs(
Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info
StreamAddress: string(streamAddress),
TableMetadataByDestID: tableMetadataByDestID,
IgnoreCDCIgnoredTTLDeletes: ignoreCDCIgnoredTTLDeletes,
Discard: discard,
Mode: mode,
MetricsLabel: metricsLabel,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
tableMetadataByDestID,
p.job.ID(),
streampb.StreamID(payload.StreamID),
payload.IgnoreCDCIgnoredTTLDeletes,
payload.Discard,
payload.Mode,
payload.MetricsLabel,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) {
)

dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbBURL.String()).Scan(&jobAID)
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH IGNORE_CDC_IGNORED_TTL_DELETES", dbAURL.String()).Scan(&jobBID)
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH DISCARD = 'ttl-deletes'", dbAURL.String()).Scan(&jobBID)

now := server.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
Expand All @@ -669,10 +669,10 @@ func TestFilterRangefeedInReplicationStream(t *testing.T) {

// Verify that Job contains FilterRangeFeed
details := jobutils.GetJobPayload(t, dbA, jobAID).GetLogicalReplicationDetails()
require.False(t, details.IgnoreCDCIgnoredTTLDeletes)
require.False(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)

details = jobutils.GetJobPayload(t, dbB, jobBID).GetLogicalReplicationDetails()
require.True(t, details.IgnoreCDCIgnoredTTLDeletes)
require.True(t, details.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)

require.Equal(t, len(filterVal), 2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {

if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil {
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.IgnoreCDCIgnoredTTLDeletes)
streamingKnobs.BeforeClientSubscribe(addr, string(token), lrw.frontier, lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes)
}
}
sub, err := streamClient.Subscribe(ctx,
streampb.StreamID(lrw.spec.StreamID),
int32(lrw.FlowCtx.NodeID.SQLInstanceID()), lrw.ProcessorID,
token,
lrw.spec.InitialScanTimestamp, lrw.frontier,
streamclient.WithFiltering(lrw.spec.IgnoreCDCIgnoredTTLDeletes),
streamclient.WithFiltering(lrw.spec.Discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes),
streamclient.WithDiff(true),
)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ message LogicalReplicationDetails {
}
DefaultConflictResolution default_conflict_resolution = 7 [(gogoproto.nullable) = false];

bool ignore_cdc_ignored_ttl_deletes = 8 [(gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"];
reserved 8;

enum ApplyMode {
Immediate = 0;
Expand All @@ -263,7 +263,14 @@ message LogicalReplicationDetails {

string metrics_label = 10;

// Next ID: 11.
enum Discard {
DiscardNothing = 0;
DiscardCDCIgnoredTTLDeletes = 1;
}

Discard discard = 11;

// Next ID: 12.
}

message LogicalReplicationProgress {
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ message LogicalReplicationWriterSpec {
// names.
map<int32, TableReplicationMetadata> table_metadata_by_dest_id = 8 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetadataByDestID"];

// IgnoreCDCIgnoredTTLDeletes is an option on whether to filter out 'OmitinRangefeed' events
// when processing changes in LDR
optional bool ignore_cdc_ignored_ttl_deletes = 9 [(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoreCDCIgnoredTTLDeletes"];
reserved 9;

// Discard is an option on whether to filter some events.
optional jobs.jobspb.LogicalReplicationDetails.Discard discard = 12 [(gogoproto.nullable) = false];

optional jobs.jobspb.LogicalReplicationDetails.ApplyMode mode = 10 [(gogoproto.nullable) = false];

optional string metrics_label = 11 [(gogoproto.nullable) = false];

// Next ID: 12.
// Next ID: 13.
}
10 changes: 4 additions & 6 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func (u *sqlSymUnion) triggerForEach() tree.TriggerForEach {
%token <str> HAVING HASH HEADER HIGH HISTOGRAM HOLD HOUR

%token <str> IDENTITY
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS IGNORE_CDC_IGNORED_TTL_DELETES ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> IF IFERROR IFNULL IGNORE_FOREIGN_KEYS ILIKE IMMEDIATE IMMEDIATELY IMMUTABLE IMPORT IN INCLUDE
%token <str> INCLUDING INCLUDE_ALL_SECONDARY_TENANTS INCLUDE_ALL_VIRTUAL_CLUSTERS INCREMENT INCREMENTAL INCREMENTAL_LOCATION
%token <str> INET INET_CONTAINED_BY_OR_EQUALS
%token <str> INET_CONTAINS_OR_EQUALS INDEX INDEXES INHERITS INJECT INITIALLY
Expand Down Expand Up @@ -4635,7 +4635,7 @@ create_stmt:
// < CURSOR = start_time > |
// < DEFAULT FUNCTION = lww | dlq | udf
// < FUNCTION 'udf' FOR TABLE local_name , ... > |
// < IGNORE_CDC_IGNORED_TTL_DELETES >
// < DISCARD = 'ttl-deletes' >
// ]
create_logical_replication_stream_stmt:
CREATE LOGICAL REPLICATION STREAM FROM logical_replication_resources ON string_or_placeholder INTO logical_replication_resources opt_logical_replication_options
Expand Down Expand Up @@ -4733,9 +4733,9 @@ logical_replication_options:
{
$$.val = &tree.LogicalReplicationOptions{UserFunctions: map[tree.UnresolvedName]tree.RoutineName{*$5.unresolvedObjectName().ToUnresolvedName():$2.unresolvedObjectName().ToRoutineName()}}
}
| IGNORE_CDC_IGNORED_TTL_DELETES
| DISCARD '=' string_or_placeholder
{
$$.val = &tree.LogicalReplicationOptions{IgnoreCDCIgnoredTTLDeletes: tree.MakeDBool(true)}
$$.val = &tree.LogicalReplicationOptions{Discard: $3.expr()}
}
| SKIP SCHEMA CHECK
{
Expand Down Expand Up @@ -17745,7 +17745,6 @@ unreserved_keyword:
| NOWAIT
| NULLS
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| INSENSITIVE
| OF
| OFF
Expand Down Expand Up @@ -18173,7 +18172,6 @@ bare_label_keywords:
| IFERROR
| IFNULL
| IGNORE_FOREIGN_KEYS
| IGNORE_CDC_IGNORED_TTL_DELETES
| ILIKE
| IMMEDIATE
| IMMEDIATELY
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/parser/testdata/create_logical_replication
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON '_' INTO TABLE foo WITH OPTI
CREATE LOGICAL REPLICATION STREAM FROM TABLE _ ON 'uri' INTO TABLE _ WITH OPTIONS (CURSOR = '1536242855577149065.0000000000', DEFAULT FUNCTION = 'lww', MODE = 'immediate', FUNCTION _ FOR TABLE _, FUNCTION _ FOR TABLE _) -- identifiers removed

parse
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES;
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH MODE = 'immediate', DISCARD = 'ttl-deletes';
----
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- normalized!
CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), IGNORE_CDC_IGNORED_TTL_DELETES) -- fully parenthesized
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', IGNORE_CDC_IGNORED_TTL_DELETES) -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', IGNORE_CDC_IGNORED_TTL_DELETES) -- identifiers removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON 'uri' INTO TABLE foo.bar WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- normalized!
CREATE LOGICAL REPLICATION STREAM FROM TABLE (foo.bar) ON ('uri') INTO TABLE (foo.bar) WITH OPTIONS (MODE = ('immediate'), DISCARD = ('ttl-deletes')) -- fully parenthesized
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo.bar ON '_' INTO TABLE foo.bar WITH OPTIONS (MODE = '_', DISCARD = '_') -- literals removed
CREATE LOGICAL REPLICATION STREAM FROM TABLE _._ ON 'uri' INTO TABLE _._ WITH OPTIONS (MODE = 'immediate', DISCARD = 'ttl-deletes') -- identifiers removed

error
CREATE LOGICAL REPLICATION STREAM FROM TABLE foo, bar ON 'uri' INTO TABLE foo, bar;
Expand Down
30 changes: 16 additions & 14 deletions pkg/sql/sem/tree/create_logical_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ type LogicalReplicationResources struct {

type LogicalReplicationOptions struct {
// Mapping of table name to UDF name
UserFunctions map[UnresolvedName]RoutineName
Cursor Expr
MetricsLabel Expr
Mode Expr
DefaultFunction Expr
IgnoreCDCIgnoredTTLDeletes *DBool
SkipSchemaCheck *DBool
UserFunctions map[UnresolvedName]RoutineName
Cursor Expr
MetricsLabel Expr
Mode Expr
DefaultFunction Expr
Discard Expr
SkipSchemaCheck *DBool
}

var _ Statement = &CreateLogicalReplicationStream{}
Expand Down Expand Up @@ -122,10 +122,12 @@ func (lro *LogicalReplicationOptions) Format(ctx *FmtCtx) {
ctx.FormatNode(&k)
}
}
if lro.IgnoreCDCIgnoredTTLDeletes != nil && *lro.IgnoreCDCIgnoredTTLDeletes {
if lro.Discard != nil {
maybeAddSep()
ctx.WriteString("IGNORE_CDC_IGNORED_TTL_DELETES")
ctx.WriteString("DISCARD = ")
ctx.FormatNode(lro.Discard)
}

if lro.SkipSchemaCheck != nil && *lro.SkipSchemaCheck {
maybeAddSep()
ctx.WriteString("SKIP SCHEMA CHECK")
Expand Down Expand Up @@ -176,12 +178,12 @@ func (o *LogicalReplicationOptions) CombineWith(other *LogicalReplicationOptions
}
}

if o.IgnoreCDCIgnoredTTLDeletes != nil {
if other.IgnoreCDCIgnoredTTLDeletes != nil {
return errors.New("IGNORE_CDC_IGNORED_TTL_DELETES option specified multiple times")
if o.Discard != nil {
if other.Discard != nil {
return errors.New("DISCARD option specified multiple times")
}
} else {
o.IgnoreCDCIgnoredTTLDeletes = other.IgnoreCDCIgnoredTTLDeletes
o.Discard = other.Discard
}
if o.SkipSchemaCheck != nil {
if other.SkipSchemaCheck != nil {
Expand Down Expand Up @@ -209,7 +211,7 @@ func (o LogicalReplicationOptions) IsDefault() bool {
o.Mode == options.Mode &&
o.DefaultFunction == options.DefaultFunction &&
o.UserFunctions == nil &&
o.IgnoreCDCIgnoredTTLDeletes == options.IgnoreCDCIgnoredTTLDeletes &&
o.Discard == options.Discard &&
o.SkipSchemaCheck == options.SkipSchemaCheck &&
o.MetricsLabel == options.MetricsLabel
}

0 comments on commit ac6577c

Please sign in to comment.