diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java index 0feb56ca9a9e..5e04c615ad83 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.java @@ -57,6 +57,8 @@ import org.jooq.DSLContext; import org.jooq.DataType; import org.jooq.Field; +import org.jooq.InsertOnDuplicateStep; +import org.jooq.InsertReturningStep; import org.jooq.InsertValuesStepN; import org.jooq.Name; import org.jooq.Record; @@ -286,10 +288,14 @@ public Sql updateTable(final StreamConfig streamConfig, @Override public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) { return transactionally( - dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED), - alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix)) - .renameTo(name(stream.finalName())) - .getSQL()); + getDslContext().dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED), + renameTable(stream, finalSuffix)); + } + + protected String renameTable(final StreamId stream, final String finalSuffix) { + return getDslContext().alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix)) + .renameTo(name(stream.finalName())) + .getSQL(); } @Override @@ -299,7 +305,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi return transactionally( dsl.createSchemaIfNotExists(streamId.rawNamespace()).getSQL(), dsl.dropTableIfExists(rawTableName).getSQL(), - DSL.createTable(rawTableName) + getDslContext().createTable(rawTableName) .column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false)) .column(COLUMN_NAME_AB_EXTRACTED_AT, getTimestampWithTimeZoneType().nullable(false)) .column(COLUMN_NAME_AB_LOADED_AT, getTimestampWithTimeZoneType().nullable(false)) @@ -377,16 +383,15 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig, .from(filteredRows) .where(field(name(ROW_NUMBER_COLUMN_NAME), Integer.class).eq(1)) // Can refer by CTE.field but no use since we don't strongly type // them. - ) - .getSQL(ParamType.INLINED); + ).getSQL(ParamType.INLINED); // Used for append and overwrite modes. final String insertStmt = insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true)) .select(with(rawTableRowsWithCast) .select(finalTableFields) - .from(rawTableRowsWithCast)) - .getSQL(ParamType.INLINED); + .from(rawTableRowsWithCast) + ).getSQL(ParamType.INLINED); final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor()); final String deleteCdcDeletesStmt = streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : ""; @@ -488,18 +493,17 @@ private String checkpointRawTable(final String schemaName, final String tableNam protected Field castedField( final Field field, final AirbyteType type, - final String alias, final boolean useExpensiveSaferCasting) { if (type instanceof final AirbyteProtocolType airbyteProtocolType) { - return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias)); + return castedField(field, airbyteProtocolType, useExpensiveSaferCasting); } // Redshift SUPER can silently cast an array type to struct and vice versa. return switch (type.getTypeName()) { - case Struct.TYPE, UnsupportedOneOf.TYPE -> cast(field, getStructType()).as(quotedName(alias)); - case Array.TYPE -> cast(field, getArrayType()).as(quotedName(alias)); + case Struct.TYPE, UnsupportedOneOf.TYPE -> cast(field, getStructType()); + case Array.TYPE -> cast(field, getArrayType()); // No nested Unions supported so this will definitely not result in infinite recursion. - case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting); + case Union.TYPE -> castedField(field, ((Union) type).chooseType(), useExpensiveSaferCasting); default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type); }; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java index a71711cf17c4..a617ea64e94e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java @@ -55,7 +55,7 @@ private DataType getTimestampWithTimeZoneType() { protected abstract SQLDialect getSqlDialect(); - private DSLContext getDslContext() { + protected DSLContext getDslContext() { return DSL.using(getSqlDialect()); } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index d01f47060ba4..1586008dc5e2 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -14,6 +14,7 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java index 0b20a1cb02d7..88f407f8f262 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import java.time.Instant; +import java.util.List; import java.util.Optional; public interface DestinationHandler { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java index a1c1f8cc1684..cc8e2aefb4af 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; import java.time.Instant; +import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory;