Skip to content

Commit

Permalink
Merge pull request #118 from aiven/fix-regression-pg-partitioned-tables
Browse files Browse the repository at this point in the history
Fix regression with partitioned tables in PostgreSQL
  • Loading branch information
AnatolyPopov authored Mar 3, 2022
2 parents e5b2b14 + 9fc5d7f commit 881e177
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -166,7 +165,7 @@ protected GenericDatabaseDialect(
if (config instanceof JdbcSinkConfig) {
catalogPattern = JdbcSourceTaskConfig.CATALOG_PATTERN_DEFAULT;
schemaPattern = JdbcSourceTaskConfig.SCHEMA_PATTERN_DEFAULT;
tableTypes = new HashSet<>(Arrays.asList(JdbcSourceTaskConfig.TABLE_TYPE_DEFAULT));
tableTypes = new HashSet<>(getDefaultSinkTableTypes());
} else {
catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG);
schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
Expand All @@ -182,6 +181,10 @@ protected GenericDatabaseDialect(
quoteIdentifiers = config.isQuoteSqlIdentifiers();
}

protected List<String> getDefaultSinkTableTypes() {
return List.of(JdbcSourceTaskConfig.TABLE_TYPE_DEFAULT);
}

@Override
public String name() {
return getClass().getSimpleName().replace("DatabaseDialect", "");
Expand Down Expand Up @@ -496,12 +499,14 @@ public boolean tableExists(
final Connection connection,
final TableId tableId
) throws SQLException {
final String[] tableTypes = tableTypes(connection.getMetaData(), this.tableTypes);

log.info("Checking {} dialect for existence of table {}", this, tableId);
try (final ResultSet rs = connection.getMetaData().getTables(
tableId.catalogName(),
tableId.schemaName(),
tableId.tableName(),
new String[]{"TABLE"}
tableTypes
)) {
final boolean exists = rs.next();
log.info("Using {} dialect table {} {}", this, tableId, exists ? "present" : "absent");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class PostgreSqlDatabaseDialect extends GenericDatabaseDialect {
Schema.Type.STRING, String.class
);

private static final List<String> SINK_TABLE_TYPE_DEFAULT = List.of("TABLE", "PARTITIONED TABLE");

/**
* The provider for {@link PostgreSqlDatabaseDialect}.
*/
Expand Down Expand Up @@ -414,4 +416,8 @@ private String cast(final TableDefinition tableDfn, final ColumnId columnId) {
return "";
}

@Override
protected List<String> getDefaultSinkTableTypes() {
return SINK_TABLE_TYPE_DEFAULT;
}
}

0 comments on commit 881e177

Please sign in to comment.