Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve different smells in the code. #356

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
76 changes: 71 additions & 5 deletions src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package io.aiven.connect.jdbc.config;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.TimeZone;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.types.Password;

import io.aiven.connect.jdbc.util.TimeZoneValidator;
Expand All @@ -50,7 +50,9 @@ public class JdbcConfig extends AbstractConfig {
+ "querying with time-based criteria. Defaults to UTC.";
private static final String DB_TIMEZONE_CONFIG_DISPLAY = "DB time zone";

public static final String DIALECT_NAME_CONFIG = "dialect.name";
// Deficient Encapsulation : This variable in not used outside of this class, but then after it declared as a public variable
// So I change access modifier from public to private
private static final String DIALECT_NAME_CONFIG = "dialect.name";
private static final String DIALECT_NAME_DISPLAY = "Database Dialect";
private static final String DIALECT_NAME_DEFAULT = "";
private static final String DIALECT_NAME_DOC =
Expand Down Expand Up @@ -184,4 +186,68 @@ protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final
JdbcConfig.SQL_QUOTE_IDENTIFIERS_DISPLAY
);
}

protected static void validatePKModeAgainstPKFields(final Config config, final String pkMode, final String pkFields) {
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue pkModeConfigValue = configValues.get(pkMode);
final ConfigValue pkFieldsConfigValue = configValues.get(pkFields);

if (pkModeConfigValue == null || pkFieldsConfigValue == null) {
return;
}

final String mode = (String) pkModeConfigValue.value();
final List<String> fields = (List<String>) pkFieldsConfigValue.value();

if (mode == null) {
return;
}

switch (mode.toLowerCase()) {
case "none":
if (fields != null && !fields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields should not be set when pkMode is 'none'."
);
}
break;
case "kafka":
if (fields == null || fields.size() != 3) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set with three fields "
+ "(topic, partition, offset) when pkMode is 'kafka'."
);
}
break;
case "record_key":
case "record_value":
if (fields == null || fields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'."
);
}
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + mode);
break;
}
}

protected static void validateDeleteEnabled(final Config config, final String deleteEnabledKey, final String pkModeKey) {
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue deleteEnabledConfigValue = configValues.get(deleteEnabledKey);
final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value();

final ConfigValue pkModeConfigValue = configValues.get(pkModeKey);
final String pkMode = (String) pkModeConfigValue.value();

if (deleteEnabled && !"record_key".equalsIgnoreCase(pkMode)) {
deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key");
}
}

}
48 changes: 48 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/sink/BufferManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.aiven.connect.jdbc.sink;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
import io.aiven.connect.jdbc.util.TableId;
import org.apache.kafka.connect.sink.SinkRecord;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class BufferManager {
private final JdbcSinkConfig config;
private final DatabaseDialect dbDialect;
private final DbStructure dbStructure;
private final Connection connection;
private final Map<TableId, BufferedRecords> bufferByTable = new HashMap<>();

public BufferManager(JdbcSinkConfig config, DatabaseDialect dbDialect, DbStructure dbStructure, Connection connection) {
this.config = config;
this.dbDialect = dbDialect;
this.dbStructure = dbStructure;
this.connection = connection;
}

public void addRecord(SinkRecord record) throws SQLException {
final TableId tableId = destinationTable(record.topic());
BufferedRecords buffer = bufferByTable.get(tableId);
if (buffer == null) {
buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection);
bufferByTable.put(tableId, buffer);
}
buffer.add(record);
}

public void flushAndClose() throws SQLException {
for (Map.Entry<TableId, BufferedRecords> entry : bufferByTable.entrySet()) {
final BufferedRecords buffer = entry.getValue();
buffer.flush();
buffer.close();
}
bufferByTable.clear();
}

private TableId destinationTable(final String topic) {
return dbDialect.parseTableIdentifier(TableNameGenerator.generateTableName(config, topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.stream.Collectors;

import io.aiven.connect.jdbc.util.TableDefinitions;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

Expand All @@ -51,12 +52,12 @@ public class BufferedRecords {
private final DatabaseDialect dbDialect;
private final DbStructure dbStructure;
private final Connection connection;

private final List<SinkRecord> records = new ArrayList<>();
private final List<SinkRecord> tombstoneRecords = new ArrayList<>();
private SchemaPair currentSchemaPair;
private FieldsMetadata fieldsMetadata;
private TableDefinition tableDefinition;
private TableDefinitions tableDefinitions;
private PreparedStatement preparedStatement;
private StatementBinder preparedStatementBinder;

Expand Down
13 changes: 4 additions & 9 deletions src/main/java/io/aiven/connect/jdbc/sink/DbStructure.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ public DbStructure(final DatabaseDialect dbDialect) {
this.tableDefns = new TableDefinitions(dbDialect);
}

public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException {
final var tblDefinition = tableDefns.get(connection, tableId);
if (Objects.nonNull(tblDefinition)) {
return tblDefinition;
} else {
return tableDefns.refresh(connection, tableId);
}
}

/**
* @return whether a DDL operation was performed
* @throws SQLException if a DDL operation was deemed necessary but failed
Expand Down Expand Up @@ -87,6 +78,10 @@ public boolean createOrAmendIfNecessary(
return amendIfNecessary(config, connection, tableId, fieldsMetadata, config.maxRetries);
}

public TableDefinition tableDefinitionFor(final TableId tableId, final Connection connection) throws SQLException {
return tableDefns.tableDefinitionFor(connection, tableId);
}

/**
* @throws SQLException if CREATE failed
*/
Expand Down
76 changes: 15 additions & 61 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcDbWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,27 @@

package io.aiven.connect.jdbc.sink;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
import io.aiven.connect.jdbc.util.CachedConnectionProvider;
import io.aiven.connect.jdbc.util.TableId;

import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;

public class JdbcDbWriter {

private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class);

private static final Pattern NORMALIZE_TABLE_NAME_FOR_TOPIC = Pattern.compile("[^a-zA-Z0-9_]");

private final JdbcSinkConfig config;

private final DatabaseDialect dbDialect;

private final DbStructure dbStructure;

final CachedConnectionProvider cachedConnectionProvider;

JdbcDbWriter(final JdbcSinkConfig config, final DatabaseDialect dbDialect, final DbStructure dbStructure) {
public JdbcDbWriter(final JdbcSinkConfig config, final DatabaseDialect dbDialect, final DbStructure dbStructure) {
this.config = config;
this.dbDialect = dbDialect;
this.dbStructure = dbStructure;
Expand All @@ -63,62 +51,28 @@ protected void onConnect(final Connection connection) throws SQLException {
};
}

void write(final Collection<SinkRecord> records) throws SQLException {
public void write(final Collection<SinkRecord> records) throws SQLException {
final Connection connection = cachedConnectionProvider.getConnection();
final BufferManager bufferManager = new BufferManager(config, dbDialect, dbStructure, connection);

final Map<TableId, BufferedRecords> bufferByTable = new HashMap<>();
for (final SinkRecord record : records) {
final TableId tableId = destinationTable(record.topic());
BufferedRecords buffer = bufferByTable.get(tableId);
if (buffer == null) {
buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection);
bufferByTable.put(tableId, buffer);
}
buffer.add(record);
}
for (final Map.Entry<TableId, BufferedRecords> entry : bufferByTable.entrySet()) {
final TableId tableId = entry.getKey();
final BufferedRecords buffer = entry.getValue();
log.debug("Flushing records in JDBC Writer for table ID: {}", tableId);
buffer.flush();
buffer.close();
for (SinkRecord record : records) {
bufferManager.addRecord(record);
}

bufferManager.flushAndClose();
connection.commit();
}

void closeQuietly() {
public void closeQuietly() {
cachedConnectionProvider.close();
}

TableId destinationTable(final String topic) {
final String tableName = generateTableNameFor(topic);
final String tableName = TableNameGenerator.generateTableName(config, topic);
return dbDialect.parseTableIdentifier(tableName);
}

public String generateTableNameFor(final String topic) {
String tableName = config.tableNameFormat.replace("${topic}", topic);
if (config.tableNameNormalize) {
tableName = NORMALIZE_TABLE_NAME_FOR_TOPIC.matcher(tableName).replaceAll("_");
}
if (!config.topicsToTablesMapping.isEmpty()) {
tableName = config.topicsToTablesMapping.getOrDefault(topic, "");
}
if (tableName.isEmpty()) {
final String errorMessage =
String.format(
"Destination table for the topic: '%s' "
+ "couldn't be found in the topics to tables mapping: '%s' "
+ "and couldn't be generated for the format string '%s'",
topic,
config.topicsToTablesMapping
.entrySet()
.stream()
.map(e -> String.join("->", e.getKey(), e.getValue()))
.collect(Collectors.joining(",")),
config.tableNameFormat);
throw new ConnectException(errorMessage);
}
return tableName;
return TableNameGenerator.generateTableName(config, topic);
}

}
53 changes: 4 additions & 49 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,63 +460,17 @@ public String toString() {
public static void main(final String... args) {
System.out.println("=========================================");
System.out.println("JDBC Sink connector Configuration Options");
System.out.println("=========================================");
System.out.println("============================ =============");
System.out.println();
System.out.println(CONFIG_DEF.toEnrichedRst());
}

public static void validateDeleteEnabled(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

// Check if DELETE_ENABLED is true
final ConfigValue deleteEnabledConfigValue = configValues.get(JdbcSinkConfig.DELETE_ENABLED);
final boolean deleteEnabled = (boolean) deleteEnabledConfigValue.value();

// Check if PK_MODE is RECORD_KEY
final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE);
final String pkMode = (String) pkModeConfigValue.value();

if (deleteEnabled && !JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY.name().equalsIgnoreCase(pkMode)) {
deleteEnabledConfigValue.addErrorMessage("Delete support only works with pk.mode=record_key");
}
JdbcConfig.validateDeleteEnabled(config, DELETE_ENABLED, PK_MODE);
}

public static void validatePKModeAgainstPKFields(final Config config) {
// Collect all configuration values
final Map<String, ConfigValue> configValues = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, v -> v));

final ConfigValue pkModeConfigValue = configValues.get(JdbcSinkConfig.PK_MODE);
final ConfigValue pkFieldsConfigValue = configValues.get(JdbcSinkConfig.PK_FIELDS);

if (pkModeConfigValue == null || pkFieldsConfigValue == null) {
return; // If either pkMode or pkFields are not configured, there's nothing to validate
}

final String pkMode = (String) pkModeConfigValue.value();
final List<String> pkFields = (List<String>) pkFieldsConfigValue.value();

if (pkMode == null) {
return; // If pkMode is null, skip validation
}

switch (pkMode.toLowerCase()) {
case "none":
validateNoPKFields(pkFieldsConfigValue, pkFields);
break;
case "kafka":
validateKafkaPKFields(pkFieldsConfigValue, pkFields);
break;
case "record_key":
case "record_value":
validatePKFieldsRequired(pkFieldsConfigValue, pkFields);
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode);
break;
}
JdbcConfig.validatePKModeAgainstPKFields(config, PK_MODE, PK_FIELDS);
}

private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
Expand All @@ -527,6 +481,7 @@ private static void validateNoPKFields(final ConfigValue pkFieldsConfigValue, fi
}
}


private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.size() != 3) {
pkFieldsConfigValue.addErrorMessage(
Expand Down
Loading