From ef51da23a6d194eb526b89e2f7991c6c5f397036 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Tue, 19 Nov 2024 22:11:49 -0400 Subject: [PATCH 1/8] solve-smells: Solve implementation smells in JdbcSourceTask.java --- .../connect/jdbc/source/JdbcSourceTask.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index 7065f4fb..b8283689 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -75,6 +75,23 @@ public JdbcSourceTask(final Time time) { this.time = time; } + // Validation methods + private boolean isIncrementingOrTimestampIncrementing(String incrementalMode) { + return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) + || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + } + + private boolean isTimestampOrTimestampIncrementing(String incrementalMode) { + return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) + || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + } + + private boolean isIncrementingMode(String mode) { + return mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING) + || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP) + || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING); + } + @Override public String version() { return Version.getVersion(); @@ -118,9 +135,8 @@ public void start(final Map properties) { //used only in table mode final Map>> partitionsByTableFqn = new HashMap<>(); Map, Map> offsets = null; - if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { + + if (isIncrementingMode(mode)) { final List> partitions = new ArrayList<>(tables.size()); switch (queryMode) { case TABLE: @@ -390,7 +406,7 @@ private void validateNonNullable( } boolean incrementingOptional = false; - boolean atLeastOneTimestampNotOptional = false; + boolean timestampRequired = false; final Connection conn = cachedConnectionProvider.getConnection(); final Map defnsById = dialect.describeColumns(conn, table, null); for (final ColumnDefinition defn : defnsById.values()) { @@ -399,7 +415,7 @@ private void validateNonNullable( incrementingOptional = defn.isOptional(); } else if (lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault()))) { if (!defn.isOptional()) { - atLeastOneTimestampNotOptional = true; + timestampRequired = true; } } } @@ -407,20 +423,20 @@ private void validateNonNullable( // Validate that requested columns for offsets are NOT NULL. Currently this is only performed // for table-based copying because custom query mode doesn't allow this to be looked up // without a query or parsing the query since we don't have a table name. - if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) - || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) - && incrementingOptional) { - throw new ConnectException("Cannot make incremental queries using incrementing column " - + incrementingColumn + " on " + table + " because this column " - + "is nullable."); + if (isIncrementingOrTimestampIncrementing(incrementalMode) && incrementingOptional) { + String errorMessage = String.format( + "Cannot make incremental queries using incrementing column %s on %s because this column is nullable.", + incrementingColumn, table + ); + throw new ConnectException(errorMessage); } - if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) - || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) - && !atLeastOneTimestampNotOptional) { - throw new ConnectException("Cannot make incremental queries using timestamp columns " - + timestampColumns + " on " + table + " because all of these " - + "columns " - + "nullable."); + + if (isTimestampOrTimestampIncrementing(incrementalMode) && !timestampRequired) { + String errorMessage = String.format( + "Cannot make incremental queries using incrementing column %s on %s because all these columns nullable.", + timestampColumns, table + ); + throw new ConnectException(errorMessage); } } catch (final SQLException e) { throw new ConnectException("Failed trying to validate that columns used for offsets are NOT" From 55bc771d37f3a08b3c3b977b4145d88aba68b102 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Thu, 28 Nov 2024 01:44:49 -0400 Subject: [PATCH 2/8] Refactor: Solve Complex Condition by devide conditions into smaller parts --- .../aiven/connect/jdbc/config/JdbcConfig.java | 4 +++- .../connect/jdbc/source/JdbcSourceTask.java | 20 +++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java index fc5438c6..89202e1d 100644 --- a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java @@ -50,7 +50,9 @@ public class JdbcConfig extends AbstractConfig { + "querying with time-based criteria. Defaults to UTC."; private static final String DB_TIMEZONE_CONFIG_DISPLAY = "DB time zone"; - public static final String DIALECT_NAME_CONFIG = "dialect.name"; + // Deficient Encapsulation : This variable in not used outside of this class, but then after it declared as a public variable + // So I change access modifier from public to private + private static final String DIALECT_NAME_CONFIG = "dialect.name"; private static final String DIALECT_NAME_DISPLAY = "Database Dialect"; private static final String DIALECT_NAME_DEFAULT = ""; private static final String DIALECT_NAME_DOC = diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index b8283689..6bab77d3 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -76,19 +76,27 @@ public JdbcSourceTask(final Time time) { } // Validation methods + // Decompose Complex conditions into smaller parts private boolean isIncrementingOrTimestampIncrementing(String incrementalMode) { - return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) - || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + return isIncrementingMode(incrementalMode) && !isTimestampMode(incrementalMode); } private boolean isTimestampOrTimestampIncrementing(String incrementalMode) { - return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) - || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + return isTimestampMode(incrementalMode) || isTimestampIncrementingMode(incrementalMode); } private boolean isIncrementingMode(String mode) { - return mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP) + return mode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) + || mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING); + } + + private boolean isTimestampMode(String mode) { + return mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) + || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP); + } + + private boolean isTimestampIncrementingMode(String mode) { + return mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING) || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING); } From fd1c6798fa40f5c5e8eb75d4591ef884f1c6bc71 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Thu, 28 Nov 2024 02:35:20 -0400 Subject: [PATCH 3/8] improve: decompose condition and also change the conditions in main logic --- .../connect/jdbc/source/JdbcSourceTask.java | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index 6bab77d3..5e6e742b 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -75,29 +75,40 @@ public JdbcSourceTask(final Time time) { this.time = time; } - // Validation methods - // Decompose Complex conditions into smaller parts - private boolean isIncrementingOrTimestampIncrementing(String incrementalMode) { - return isIncrementingMode(incrementalMode) && !isTimestampMode(incrementalMode); + // Smaller conditions for readability + private boolean isModeIncrementing(String mode) { + return mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING); } - private boolean isTimestampOrTimestampIncrementing(String incrementalMode) { - return isTimestampMode(incrementalMode) || isTimestampIncrementingMode(incrementalMode); + private boolean isModeTimestamp(String mode) { + return mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP); } - private boolean isIncrementingMode(String mode) { - return mode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) - || mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING); + private boolean isModeTimestampIncrementing(String mode) { + return mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING); } - private boolean isTimestampMode(String mode) { - return mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP); + private boolean isIncrementingOptionalMode(String incrementalMode) { + return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING) + || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); } - private boolean isTimestampIncrementingMode(String mode) { - return mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING) - || mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING); + private boolean isTimestampOptionalMode(String incrementalMode) { + return incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) + || incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING); + } + + // Composite validation methods + private boolean isSupportedTaskMode(String mode) { + return isModeIncrementing(mode) || isModeTimestamp(mode) || isModeTimestampIncrementing(mode); + } + + private boolean isIncrementingModeOptional(String incrementalMode, boolean incrementingOptional) { + return isIncrementingOptionalMode(incrementalMode) && incrementingOptional; + } + + private boolean isTimestampModeNotOptional(String incrementalMode, boolean atLeastOneTimestampNotOptional) { + return isTimestampOptionalMode(incrementalMode) && !atLeastOneTimestampNotOptional; } @Override @@ -144,7 +155,7 @@ public void start(final Map properties) { final Map>> partitionsByTableFqn = new HashMap<>(); Map, Map> offsets = null; - if (isIncrementingMode(mode)) { + if (isSupportedTaskMode(mode)) { final List> partitions = new ArrayList<>(tables.size()); switch (queryMode) { case TABLE: @@ -431,7 +442,7 @@ private void validateNonNullable( // Validate that requested columns for offsets are NOT NULL. Currently this is only performed // for table-based copying because custom query mode doesn't allow this to be looked up // without a query or parsing the query since we don't have a table name. - if (isIncrementingOrTimestampIncrementing(incrementalMode) && incrementingOptional) { + if (isIncrementingModeOptional(incrementalMode, incrementingOptional)) { String errorMessage = String.format( "Cannot make incremental queries using incrementing column %s on %s because this column is nullable.", incrementingColumn, table @@ -439,7 +450,7 @@ private void validateNonNullable( throw new ConnectException(errorMessage); } - if (isTimestampOrTimestampIncrementing(incrementalMode) && !timestampRequired) { + if (isTimestampModeNotOptional(incrementalMode, timestampRequired)) { String errorMessage = String.format( "Cannot make incremental queries using incrementing column %s on %s because all these columns nullable.", timestampColumns, table From 75e42394349b98db3e4e0f642ddb2bdd314d5a58 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Thu, 28 Nov 2024 16:47:55 -0400 Subject: [PATCH 4/8] resolve-smells: resolve Implementation smell by extracting methods from source.JdbcSourceTask.start mehtod --- .../connect/jdbc/source/JdbcSourceTask.java | 310 ++++++++++-------- 1 file changed, 165 insertions(+), 145 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index 5e6e742b..b9c12329 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -116,19 +116,88 @@ public String version() { return Version.getVersion(); } + // Extract method validateConfig, initializeDialect, processQueryMode, getTablePartitionsToCheck, processOffsets + // addTableQuerier from original start. @Override public void start(final Map properties) { log.info("Starting JDBC source task"); + + // Validate and initialize configuration + validateConfig(properties); + + // Initialize the JDBC dialect (specific database dialect) + initializeDialect(); + + // Create the connection provider with the required configurations + cachedConnectionProvider = new SourceConnectionProvider( + dialect, + config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG), + config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG) + ); + + // Retrieve table names and custom query from config + final List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); + final String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); + + // Determine the query mode (either TABLE mode or QUERY mode) + final TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE; + final List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(query) : tables; + + // Handle partitions and offsets based on the query mode + Map, Map> offsets = null; + final String mode = config.getMode(); + if (isSupportedTaskMode(mode)) { + offsets = processQueryMode(tables, mode, queryMode, new HashMap<>()); + } + + // Get additional config values like columns for incrementing, timestamps, and validation + final String incrementingColumn = config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG); + final List timestampColumns = config.getList(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG); + final Long timestampDelayInterval = config.getLong(JdbcSourceTaskConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG); + final Long timestampInitialMs = config.getLong(JdbcSourceTaskConfig.TIMESTAMP_INITIAL_MS_CONFIG); + final Long incrementingOffsetInitial = config.getLong(JdbcSourceTaskConfig.INCREMENTING_INITIAL_VALUE_CONFIG); + final boolean validateNonNulls = config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); + + // Iterate over each table or query and process them based on the query mode + for (final String tableOrQuery : tablesOrQuery) { + // Determine the partitions to check for the current table/query + final List> tablePartitionsToCheck = getTablePartitionsToCheck( + queryMode, tableOrQuery, validateNonNulls, incrementingColumn, timestampColumns + ); + + // Process offsets for the partitions + Map offset = processOffsets(offsets, tablePartitionsToCheck); + + // Add the appropriate TableQuerier to the queue based on the mode (bulk, incrementing, timestamp) + addTableQuerier( + queryMode, mode, tableOrQuery, offset, config.getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG), + timestampColumns, incrementingColumn, timestampDelayInterval, timestampInitialMs, incrementingOffsetInitial + ); + } + + // Mark the task as running + running.set(true); + log.info("Started JDBC source task"); + } + + /** + * Validates the configuration for the task and initializes the config object. + * Throws a ConnectException if there is an issue with the configuration. + */ + private void validateConfig(Map properties) { try { config = new JdbcSourceTaskConfig(properties); config.validate(); } catch (final ConfigException e) { throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e); } + } - final int maxConnAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG); - final long retryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG); - + /** + * Initializes the JDBC dialect based on the provided dialect name or connection URL. + * The dialect determines how SQL queries are formed for the specific database. + */ + private void initializeDialect() { final String dialectName = config.getDialectName(); if (dialectName != null && !dialectName.trim().isEmpty()) { dialect = DatabaseDialects.create(dialectName, config); @@ -137,160 +206,111 @@ public void start(final Map properties) { dialect = DatabaseDialects.findBestFor(connectionUrl, config); } log.info("Using JDBC dialect {}", dialect.name()); + } - cachedConnectionProvider = new SourceConnectionProvider(dialect, maxConnAttempts, retryBackoff); - - final List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); - final String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); - - final TableQuerier.QueryMode queryMode = !query.isEmpty() - ? TableQuerier.QueryMode.QUERY - : TableQuerier.QueryMode.TABLE; - final List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY - ? Collections.singletonList(query) - : tables; - - final String mode = config.getMode(); - //used only in table mode - final Map>> partitionsByTableFqn = new HashMap<>(); - Map, Map> offsets = null; + /** + * Processes the query mode (TABLE or QUERY) and determines the table partitions to query. + * Retrieves the offsets for each partition if the mode is supported. + */ + private Map, Map> processQueryMode( + List tables, String mode, TableQuerier.QueryMode queryMode, + Map>> partitionsByTableFqn) { + + final List> partitions = new ArrayList<>(tables.size()); + switch (queryMode) { + case TABLE: + log.trace("Starting in TABLE mode"); + // Find partitions for each table in TABLE mode + for (final String table : tables) { + final List> tablePartitions = possibleTablePartitions(table); + partitions.addAll(tablePartitions); + partitionsByTableFqn.put(table, tablePartitions); + } + break; + case QUERY: + log.trace("Starting in QUERY mode"); + // In QUERY mode, we only have one partition to check + partitions.add(Collections.singletonMap( + JdbcSourceConnectorConstants.QUERY_NAME_KEY, + JdbcSourceConnectorConstants.QUERY_NAME_VALUE + )); + break; + default: + throw new ConnectException("Unknown query mode: " + queryMode); + } + return context.offsetStorageReader().offsets(partitions); + } - if (isSupportedTaskMode(mode)) { - final List> partitions = new ArrayList<>(tables.size()); - switch (queryMode) { - case TABLE: - log.trace("Starting in TABLE mode"); - for (final String table : tables) { - // Find possible partition maps for different offset protocols - // We need to search by all offset protocol partition keys to support compatibility - final List> tablePartitions = possibleTablePartitions(table); - partitions.addAll(tablePartitions); - partitionsByTableFqn.put(table, tablePartitions); - } - break; - case QUERY: - log.trace("Starting in QUERY mode"); - partitions.add(Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, - JdbcSourceConnectorConstants.QUERY_NAME_VALUE)); - break; - default: - throw new ConnectException("Unknown query mode: " + queryMode); + /** + * Retrieves the list of table partitions to check based on the query mode. + * If validation of non-null values is enabled, it will validate the table. + */ + private List> getTablePartitionsToCheck( + TableQuerier.QueryMode queryMode, String tableOrQuery, boolean validateNonNulls, + String incrementingColumn, List timestampColumns) { + + final List> tablePartitionsToCheck; + if (queryMode == TableQuerier.QueryMode.TABLE) { + if (validateNonNulls) { + validateNonNullable(config.getMode(), tableOrQuery, incrementingColumn, timestampColumns); } - offsets = context.offsetStorageReader().offsets(partitions); - log.trace("The partition offsets are {}", offsets); + tablePartitionsToCheck = possibleTablePartitions(tableOrQuery); + } else { + // In QUERY mode, we add a single partition for the query + tablePartitionsToCheck = Collections.singletonList(Collections.singletonMap( + JdbcSourceConnectorConstants.QUERY_NAME_KEY, JdbcSourceConnectorConstants.QUERY_NAME_VALUE + )); } + return tablePartitionsToCheck; + } - final String incrementingColumn - = config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG); - final List timestampColumns - = config.getList(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG); - final Long timestampDelayInterval - = config.getLong(JdbcSourceTaskConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG); - final Long timestampInitialMs - = config.getLong(JdbcSourceTaskConfig.TIMESTAMP_INITIAL_MS_CONFIG); - final Long incrementingOffsetInitial - = config.getLong(JdbcSourceTaskConfig.INCREMENTING_INITIAL_VALUE_CONFIG); - final boolean validateNonNulls - = config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); - - for (final String tableOrQuery : tablesOrQuery) { - final List> tablePartitionsToCheck; - final Map partition; - switch (queryMode) { - case TABLE: - if (validateNonNulls) { - validateNonNullable( - mode, - tableOrQuery, - incrementingColumn, - timestampColumns - ); - } - tablePartitionsToCheck = partitionsByTableFqn.get(tableOrQuery); - break; - case QUERY: - partition = Collections.singletonMap( - JdbcSourceConnectorConstants.QUERY_NAME_KEY, - JdbcSourceConnectorConstants.QUERY_NAME_VALUE - ); - tablePartitionsToCheck = Collections.singletonList(partition); + /** + * Processes the offsets based on the provided partitions and returns the corresponding offset for the partition. + */ + private Map processOffsets(Map, Map> offsets, List> tablePartitionsToCheck) { + Map offset = null; + if (offsets != null) { + // Iterate over the partitions and retrieve the offset for the partition + for (final Map toCheckPartition : tablePartitionsToCheck) { + offset = offsets.get(toCheckPartition); + if (offset != null) { + log.info("Found offset {} for partition {}", offsets, toCheckPartition); break; - default: - throw new ConnectException("Unexpected query mode: " + queryMode); - } - - // The partition map varies by offset protocol. Since we don't know which protocol each - // table's offsets are keyed by, we need to use the different possible partitions - // (newest protocol version first) to find the actual offsets for each table. - Map offset = null; - if (offsets != null) { - for (final Map toCheckPartition : tablePartitionsToCheck) { - offset = offsets.get(toCheckPartition); - if (offset != null) { - log.info("Found offset {} for partition {}", offsets, toCheckPartition); - break; - } } } - - final String topicPrefix = config.getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG); - - if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) { - tableQueue.add( - new BulkTableQuerier(dialect, queryMode, tableOrQuery, topicPrefix) - ); - } else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) { - tableQueue.add( - new TimestampIncrementingTableQuerier( - dialect, - queryMode, - tableOrQuery, - topicPrefix, - null, - incrementingColumn, - offset, - timestampDelayInterval, - timestampInitialMs, - incrementingOffsetInitial, - config.getDBTimeZone()) - ); - } else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) { - tableQueue.add( - new TimestampIncrementingTableQuerier( - dialect, - queryMode, - tableOrQuery, - topicPrefix, - timestampColumns, - null, - offset, - timestampDelayInterval, - timestampInitialMs, - incrementingOffsetInitial, - config.getDBTimeZone()) - ); - } else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { - tableQueue.add( - new TimestampIncrementingTableQuerier( - dialect, - queryMode, - tableOrQuery, - topicPrefix, - timestampColumns, - incrementingColumn, - offset, - timestampDelayInterval, - timestampInitialMs, - incrementingOffsetInitial, - config.getDBTimeZone()) - ); - } } + return offset; + } - running.set(true); - log.info("Started JDBC source task"); + /** + * Adds the appropriate TableQuerier (Bulk, Incrementing, or Timestamp-based) to the task queue. + * Depending on the mode, different queriers are added. + */ + private void addTableQuerier( + TableQuerier.QueryMode queryMode, String mode, String tableOrQuery, + Map offset, String topicPrefix, List timestampColumns, + String incrementingColumn, Long timestampDelayInterval, Long timestampInitialMs, Long incrementingOffsetInitial) { + + // Add a BulkTableQuerier for BULK mode + if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) { + tableQueue.add(new BulkTableQuerier(dialect, queryMode, tableOrQuery, topicPrefix)); + } + // Add a TimestampIncrementingTableQuerier for INCREMENTING mode + else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) { + tableQueue.add(new TimestampIncrementingTableQuerier(dialect, queryMode, tableOrQuery, topicPrefix, null, incrementingColumn, offset, timestampDelayInterval, timestampInitialMs, incrementingOffsetInitial, config.getDBTimeZone())); + } + // Add a TimestampIncrementingTableQuerier for TIMESTAMP mode + else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) { + tableQueue.add(new TimestampIncrementingTableQuerier(dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, null, offset, timestampDelayInterval, timestampInitialMs, incrementingOffsetInitial, config.getDBTimeZone())); + } + // Add a TimestampIncrementingTableQuerier for TIMESTAMP_INCREMENTING mode + else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { + tableQueue.add(new TimestampIncrementingTableQuerier(dialect, queryMode, tableOrQuery, topicPrefix, timestampColumns, incrementingColumn, offset, timestampDelayInterval, timestampInitialMs, incrementingOffsetInitial, config.getDBTimeZone())); + } } + + //This method returns a list of possible partition maps for different offset protocols //This helps with the upgrades private List> possibleTablePartitions(final String table) { From 0540c7c8ebda0d27403045d5502805fd0b560dfe Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Thu, 28 Nov 2024 21:15:56 -0400 Subject: [PATCH 5/8] Solve-Smell: There is a feature envy smell in DbStructure file, tableDefinitionFor is at a wrong place, it should be there in TableDefinitions file --- .../io/aiven/connect/jdbc/sink/BufferedRecords.java | 6 ++++-- .../java/io/aiven/connect/jdbc/sink/DbStructure.java | 9 --------- .../io/aiven/connect/jdbc/util/TableDefinitions.java | 10 ++++++++++ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java index 305db362..34a7852f 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.stream.Collectors; +import io.aiven.connect.jdbc.util.TableDefinitions; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; @@ -51,12 +52,12 @@ public class BufferedRecords { private final DatabaseDialect dbDialect; private final DbStructure dbStructure; private final Connection connection; - private final List records = new ArrayList<>(); private final List tombstoneRecords = new ArrayList<>(); private SchemaPair currentSchemaPair; private FieldsMetadata fieldsMetadata; private TableDefinition tableDefinition; + private TableDefinitions tableDefinitions; private PreparedStatement preparedStatement; private StatementBinder preparedStatementBinder; @@ -75,6 +76,7 @@ public BufferedRecords( this.dbDialect = dbDialect; this.dbStructure = dbStructure; this.connection = connection; + tableDefinitions = new TableDefinitions(dbDialect); } public List add(final SinkRecord record) throws SQLException { @@ -167,7 +169,7 @@ private void reInitialize(final SchemaPair schemaPair) throws SQLException { fieldsMetadata ); - tableDefinition = dbStructure.tableDefinitionFor(tableId, connection); + tableDefinition = tableDefinitions.tableDefinitionFor(tableId, connection); } public List flush() throws SQLException { diff --git a/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java b/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java index 93c13f36..b276fc42 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java @@ -49,15 +49,6 @@ public DbStructure(final DatabaseDialect dbDialect) { this.tableDefns = new TableDefinitions(dbDialect); } - public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException { - final var tblDefinition = tableDefns.get(connection, tableId); - if (Objects.nonNull(tblDefinition)) { - return tblDefinition; - } else { - return tableDefns.refresh(connection, tableId); - } - } - /** * @return whether a DDL operation was performed * @throws SQLException if a DDL operation was deemed necessary but failed diff --git a/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java b/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java index b352db0b..4df284e5 100644 --- a/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java +++ b/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import io.aiven.connect.jdbc.dialect.DatabaseDialect; @@ -88,4 +89,13 @@ public TableDefinition refresh( cache.put(dbTable.id(), dbTable); return dbTable; } + + public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException { + final var tblDefinition = this.get(connection, tableId); + if (Objects.nonNull(tblDefinition)) { + return tblDefinition; + } else { + return this.refresh(connection, tableId); + } + } } From e9f306c6a7cdf1cffddb5cd5f0ad971238833bc1 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Fri, 29 Nov 2024 19:21:57 -0400 Subject: [PATCH 6/8] Solve-Smell: solve feature envy with move method in DbStrucutre.java, Moved method from DbStrucutre.java to TableDefinitions.java --- .../connect/jdbc/sink/BufferedRecords.java | 3 +- .../aiven/connect/jdbc/sink/DbStructure.java | 4 ++ .../connect/jdbc/source/JdbcSourceTask.java | 4 +- .../connect/jdbc/util/TableDefinitions.java | 47 ++++++++----------- 4 files changed, 26 insertions(+), 32 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java index 34a7852f..5bd98c6e 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java @@ -76,7 +76,6 @@ public BufferedRecords( this.dbDialect = dbDialect; this.dbStructure = dbStructure; this.connection = connection; - tableDefinitions = new TableDefinitions(dbDialect); } public List add(final SinkRecord record) throws SQLException { @@ -169,7 +168,7 @@ private void reInitialize(final SchemaPair schemaPair) throws SQLException { fieldsMetadata ); - tableDefinition = tableDefinitions.tableDefinitionFor(tableId, connection); + tableDefinition = dbStructure.tableDefinitionFor(tableId, connection); } public List flush() throws SQLException { diff --git a/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java b/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java index b276fc42..523938bc 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java @@ -78,6 +78,10 @@ public boolean createOrAmendIfNecessary( return amendIfNecessary(config, connection, tableId, fieldsMetadata, config.maxRetries); } + public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException { + return tableDefns.tableDefinitionFor(connection, tableId); + } + /** * @throws SQLException if CREATE failed */ diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index b9c12329..51c54f3d 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -309,8 +309,6 @@ else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) { } } - - //This method returns a list of possible partition maps for different offset protocols //This helps with the upgrades private List> possibleTablePartitions(final String table) { @@ -482,4 +480,4 @@ private void validateNonNullable( + " NULL", e); } } -} +} \ No newline at end of file diff --git a/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java b/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java index 4df284e5..5879357e 100644 --- a/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java +++ b/src/main/java/io/aiven/connect/jdbc/util/TableDefinitions.java @@ -1,20 +1,3 @@ -/* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors - * Copyright 2018 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.aiven.connect.jdbc.util; import java.sql.Connection; @@ -56,8 +39,8 @@ public TableDefinitions(final DatabaseDialect dialect) { * @throws SQLException if there is any problem using the connection */ public TableDefinition get( - final Connection connection, - final TableId tableId + final Connection connection, + final TableId tableId ) throws SQLException { TableDefinition dbTable = cache.get(tableId); if (dbTable == null) { @@ -81,8 +64,8 @@ public TableDefinition get( * @throws SQLException if there is any problem using the connection */ public TableDefinition refresh( - final Connection connection, - final TableId tableId + final Connection connection, + final TableId tableId ) throws SQLException { final TableDefinition dbTable = dialect.describeTable(connection, tableId); log.info("Refreshing metadata for table {} to {}", tableId, dbTable); @@ -90,12 +73,22 @@ public TableDefinition refresh( return dbTable; } - public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException { - final var tblDefinition = this.get(connection, tableId); - if (Objects.nonNull(tblDefinition)) { - return tblDefinition; - } else { - return this.refresh(connection, tableId); + /** + * Get the TableDefinition for a table, ensuring it is initialized in cache. + * + * @param connection the JDBC connection to use; may not be null + * @param tableId the table identifier; may not be null + * @return the {@link TableDefinition} for the table + * @throws SQLException if there is any problem using the connection + */ + public TableDefinition tableDefinitionFor( + final Connection connection, + final TableId tableId + ) throws SQLException { + TableDefinition tableDefn = get(connection, tableId); + if (tableDefn == null) { + tableDefn = refresh(connection, tableId); } + return tableDefn; } } From 0a308a74bf6260483b20eef698f8df2790d8150c Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Fri, 29 Nov 2024 19:43:57 -0400 Subject: [PATCH 7/8] Solve-Smell: solve Insufficient Modularization for JdbcWriter class, extract 2 more class BufferManager and TableNameGenerator --- .../connect/jdbc/sink/BufferManager.java | 48 ++++++++++++ .../aiven/connect/jdbc/sink/JdbcDbWriter.java | 76 ++++--------------- .../connect/jdbc/sink/TableNameGenerator.java | 32 ++++++++ 3 files changed, 95 insertions(+), 61 deletions(-) create mode 100644 src/main/java/io/aiven/connect/jdbc/sink/BufferManager.java create mode 100644 src/main/java/io/aiven/connect/jdbc/sink/TableNameGenerator.java diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferManager.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferManager.java new file mode 100644 index 00000000..cdfb138c --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferManager.java @@ -0,0 +1,48 @@ +package io.aiven.connect.jdbc.sink; + +import io.aiven.connect.jdbc.dialect.DatabaseDialect; +import io.aiven.connect.jdbc.util.TableId; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class BufferManager { + private final JdbcSinkConfig config; + private final DatabaseDialect dbDialect; + private final DbStructure dbStructure; + private final Connection connection; + private final Map bufferByTable = new HashMap<>(); + + public BufferManager(JdbcSinkConfig config, DatabaseDialect dbDialect, DbStructure dbStructure, Connection connection) { + this.config = config; + this.dbDialect = dbDialect; + this.dbStructure = dbStructure; + this.connection = connection; + } + + public void addRecord(SinkRecord record) throws SQLException { + final TableId tableId = destinationTable(record.topic()); + BufferedRecords buffer = bufferByTable.get(tableId); + if (buffer == null) { + buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection); + bufferByTable.put(tableId, buffer); + } + buffer.add(record); + } + + public void flushAndClose() throws SQLException { + for (Map.Entry entry : bufferByTable.entrySet()) { + final BufferedRecords buffer = entry.getValue(); + buffer.flush(); + buffer.close(); + } + bufferByTable.clear(); + } + + private TableId destinationTable(final String topic) { + return dbDialect.parseTableIdentifier(TableNameGenerator.generateTableName(config, topic)); + } +} diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java index 13b3af91..04e5bb33 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java @@ -17,39 +17,27 @@ package io.aiven.connect.jdbc.sink; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; - import io.aiven.connect.jdbc.dialect.DatabaseDialect; import io.aiven.connect.jdbc.util.CachedConnectionProvider; import io.aiven.connect.jdbc.util.TableId; - +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collection; + public class JdbcDbWriter { private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class); - private static final Pattern NORMALIZE_TABLE_NAME_FOR_TOPIC = Pattern.compile("[^a-zA-Z0-9_]"); - private final JdbcSinkConfig config; - private final DatabaseDialect dbDialect; - private final DbStructure dbStructure; - final CachedConnectionProvider cachedConnectionProvider; - JdbcDbWriter(final JdbcSinkConfig config, final DatabaseDialect dbDialect, final DbStructure dbStructure) { + public JdbcDbWriter(final JdbcSinkConfig config, final DatabaseDialect dbDialect, final DbStructure dbStructure) { this.config = config; this.dbDialect = dbDialect; this.dbStructure = dbStructure; @@ -63,62 +51,28 @@ protected void onConnect(final Connection connection) throws SQLException { }; } - void write(final Collection records) throws SQLException { + public void write(final Collection records) throws SQLException { final Connection connection = cachedConnectionProvider.getConnection(); + final BufferManager bufferManager = new BufferManager(config, dbDialect, dbStructure, connection); - final Map bufferByTable = new HashMap<>(); - for (final SinkRecord record : records) { - final TableId tableId = destinationTable(record.topic()); - BufferedRecords buffer = bufferByTable.get(tableId); - if (buffer == null) { - buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection); - bufferByTable.put(tableId, buffer); - } - buffer.add(record); - } - for (final Map.Entry entry : bufferByTable.entrySet()) { - final TableId tableId = entry.getKey(); - final BufferedRecords buffer = entry.getValue(); - log.debug("Flushing records in JDBC Writer for table ID: {}", tableId); - buffer.flush(); - buffer.close(); + for (SinkRecord record : records) { + bufferManager.addRecord(record); } + + bufferManager.flushAndClose(); connection.commit(); } - void closeQuietly() { + public void closeQuietly() { cachedConnectionProvider.close(); } TableId destinationTable(final String topic) { - final String tableName = generateTableNameFor(topic); + final String tableName = TableNameGenerator.generateTableName(config, topic); return dbDialect.parseTableIdentifier(tableName); } public String generateTableNameFor(final String topic) { - String tableName = config.tableNameFormat.replace("${topic}", topic); - if (config.tableNameNormalize) { - tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_"); - } - if (!config.topicsToTablesMapping.isEmpty()) { - tableName = config.topicsToTablesMapping.getOrDefault(topic, ""); - } - if (tableName.isEmpty()) { - final String errorMessage = - String.format( - "Destination table for the topic: '%s' " - + "couldn't be found in the topics to tables mapping: '%s' " - + "and couldn't be generated for the format string '%s'", - topic, - config.topicsToTablesMapping - .entrySet() - .stream() - .map(e -> String.join("->", e.getKey(), e.getValue())) - .collect(Collectors.joining(",")), - config.tableNameFormat); - throw new ConnectException(errorMessage); - } - return tableName; + return TableNameGenerator.generateTableName(config, topic); } - } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/TableNameGenerator.java b/src/main/java/io/aiven/connect/jdbc/sink/TableNameGenerator.java new file mode 100644 index 00000000..a4b55793 --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/sink/TableNameGenerator.java @@ -0,0 +1,32 @@ +package io.aiven.connect.jdbc.sink; + +import org.apache.kafka.connect.errors.ConnectException; + +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class TableNameGenerator { + private static final Pattern NORMALIZE_TABLE_NAME_FOR_TOPIC = Pattern.compile("[^a-zA-Z0-9_]"); + + public static String generateTableName(JdbcSinkConfig config, String topic) { + String tableName = config.tableNameFormat.replace("${topic}", topic); + if (config.tableNameNormalize) { + tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_"); + } + if (!config.topicsToTablesMapping.isEmpty()) { + tableName = config.topicsToTablesMapping.getOrDefault(topic, ""); + } + if (tableName.isEmpty()) { + throw new ConnectException(String.format( + "Destination table for the topic: '%s' couldn't be found in the topics to tables mapping: '%s' " + + "and couldn't be generated for the format string '%s'", + topic, + config.topicsToTablesMapping.entrySet().stream() + .map(e -> String.join("->", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")), + config.tableNameFormat + )); + } + return tableName; + } +} From f7ed7acb1a68fd106c80b3aa170d550794f0a942 Mon Sep 17 00:00:00 2001 From: JASH-PATEL-6566 Date: Fri, 29 Nov 2024 21:32:53 -0400 Subject: [PATCH 8/8] Solve-Smell: just pull-out the method from JdbcSinkConfig to JdbcConfig, the methods are validateDeleteEnabled, validatePKModeAgainstPKFields --- .../aiven/connect/jdbc/config/JdbcConfig.java | 72 +++++++++++++++++-- .../connect/jdbc/sink/JdbcSinkConfig.java | 53 ++------------ 2 files changed, 72 insertions(+), 53 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java index 89202e1d..8bd63fb2 100644 --- a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java @@ -17,13 +17,13 @@ package io.aiven.connect.jdbc.config; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.TimeZone; +import java.util.*; +import java.util.stream.Collectors; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.types.Password; import io.aiven.connect.jdbc.util.TimeZoneValidator; @@ -186,4 +186,68 @@ protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final JdbcConfig.SQL_QUOTE_IDENTIFIERS_DISPLAY ); } + + protected static void validatePKModeAgainstPKFields(final Config config, final String pkMode, final String pkFields) { + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue pkModeConfigValue = configValues.get(pkMode); + final ConfigValue pkFieldsConfigValue = configValues.get(pkFields); + + if (pkModeConfigValue == null || pkFieldsConfigValue == null) { + return; + } + + final String mode = (String) pkModeConfigValue.value(); + final List fields = (List) pkFieldsConfigValue.value(); + + if (mode == null) { + return; + } + + switch (mode.toLowerCase()) { + case "none": + if (fields != null && !fields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields should not be set when pkMode is 'none'." + ); + } + break; + case "kafka": + if (fields == null || fields.size() != 3) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set with three fields " + + "(topic, partition, offset) when pkMode is 'kafka'." + ); + } + break; + case "record_key": + case "record_value": + if (fields == null || fields.isEmpty()) { + pkFieldsConfigValue.addErrorMessage( + "Primary key fields must be set when pkMode is 'record_key' or 'record_value'." + ); + } + break; + default: + pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + mode); + break; + } + } + + protected static void validateDeleteEnabled(final Config config, final String deleteEnabledKey, final String pkModeKey) { + final Map configValues = config.configValues().stream() + .collect(Collectors.toMap(ConfigValue::name, v -> v)); + + final ConfigValue deleteEnabledConfigValue = configValues.get(deleteEnabledKey); + final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value(); + + final ConfigValue pkModeConfigValue = configValues.get(pkModeKey); + final String pkMode = (String) pkModeConfigValue.value(); + + if (deleteEnabled && !"record_key".equalsIgnoreCase(pkMode)) { + deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key"); + } + } + } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 03371b1c..435c5b8f 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -460,63 +460,17 @@ public String toString() { public static void main(final String... args) { System.out.println("========================================="); System.out.println("JDBC Sink connector Configuration Options"); - System.out.println("========================================="); + System.out.println("============================ ============="); System.out.println(); System.out.println(CONFIG_DEF.toEnrichedRst()); } public static void validateDeleteEnabled(final Config config) { - // Collect all configuration values - final Map configValues = config.configValues().stream() - .collect(Collectors.toMap(ConfigValue::name, v -> v)); - - // Check if DELETE_ENABLED is true - final ConfigValue deleteEnabledConfigValue = configValues.get(JdbcSinkConfig.DELETE_ENABLED); - final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value(); - - // Check if PK_MODE is RECORD_KEY - final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); - final String pkMode = (String) pkModeConfigValue.value(); - - if (deleteEnabled && !JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY.name().equalsIgnoreCase(pkMode)) { - deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key"); - } + JdbcConfig.validateDeleteEnabled(config, DELETE_ENABLED, PK_MODE); } public static void validatePKModeAgainstPKFields(final Config config) { - // Collect all configuration values - final Map configValues = config.configValues().stream() - .collect(Collectors.toMap(ConfigValue::name, v -> v)); - - final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE); - final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS); - - if (pkModeConfigValue == null || pkFieldsConfigValue == null) { - return; // If either pkMode or pkFields are not configured, there's nothing to validate - } - - final String pkMode = (String) pkModeConfigValue.value(); - final List pkFields = (List) pkFieldsConfigValue.value(); - - if (pkMode == null) { - return; // If pkMode is null, skip validation - } - - switch (pkMode.toLowerCase()) { - case "none": - validateNoPKFields(pkFieldsConfigValue, pkFields); - break; - case "kafka": - validateKafkaPKFields(pkFieldsConfigValue, pkFields); - break; - case "record_key": - case "record_value": - validatePKFieldsRequired(pkFieldsConfigValue, pkFields); - break; - default: - pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode); - break; - } + JdbcConfig.validatePKModeAgainstPKFields(config, PK_MODE, PK_FIELDS); } private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { @@ -527,6 +481,7 @@ private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, fi } } + private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List pkFields) { if (pkFields == null || pkFields.size() != 3) { pkFieldsConfigValue.addErrorMessage(