Skip to content

Commit

Permalink
cdk updates
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 29, 2024
1 parent b37efe9 commit 08729b0
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertOnDuplicateStep;
import org.jooq.InsertReturningStep;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Record;
Expand Down Expand Up @@ -286,10 +288,14 @@ public Sql updateTable(final StreamConfig streamConfig,
@Override
public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) {
return transactionally(
dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED),
alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix))
.renameTo(name(stream.finalName()))
.getSQL());
getDslContext().dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED),
renameTable(stream, finalSuffix));
}

protected String renameTable(final StreamId stream, final String finalSuffix) {
return getDslContext().alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix))
.renameTo(name(stream.finalName()))
.getSQL();
}

@Override
Expand All @@ -299,7 +305,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi
return transactionally(
dsl.createSchemaIfNotExists(streamId.rawNamespace()).getSQL(),
dsl.dropTableIfExists(rawTableName).getSQL(),
DSL.createTable(rawTableName)
getDslContext().createTable(rawTableName)
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_AB_EXTRACTED_AT, getTimestampWithTimeZoneType().nullable(false))
.column(COLUMN_NAME_AB_LOADED_AT, getTimestampWithTimeZoneType().nullable(false))
Expand Down Expand Up @@ -377,16 +383,15 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
.from(filteredRows)
.where(field(name(ROW_NUMBER_COLUMN_NAME), Integer.class).eq(1)) // Can refer by CTE.field but no use since we don't strongly type
// them.
)
.getSQL(ParamType.INLINED);
).getSQL(ParamType.INLINED);

// Used for append and overwrite modes.
final String insertStmt =
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.select(finalTableFields)
.from(rawTableRowsWithCast))
.getSQL(ParamType.INLINED);
.from(rawTableRowsWithCast)
).getSQL(ParamType.INLINED);
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor());
final String deleteCdcDeletesStmt =
streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
Expand Down Expand Up @@ -488,18 +493,17 @@ private String checkpointRawTable(final String schemaName, final String tableNam
protected Field<?> castedField(
final Field<?> field,
final AirbyteType type,
final String alias,
final boolean useExpensiveSaferCasting) {
if (type instanceof final AirbyteProtocolType airbyteProtocolType) {
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias));
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting);
}

// Redshift SUPER can silently cast an array type to struct and vice versa.
return switch (type.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE -> cast(field, getStructType()).as(quotedName(alias));
case Array.TYPE -> cast(field, getArrayType()).as(quotedName(alias));
case Struct.TYPE, UnsupportedOneOf.TYPE -> cast(field, getStructType());
case Array.TYPE -> cast(field, getArrayType());
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting);
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), useExpensiveSaferCasting);
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private DataType<?> getTimestampWithTimeZoneType() {

protected abstract SQLDialect getSqlDialect();

private DSLContext getDslContext() {
protected DSLContext getDslContext() {
return DSL.using(getSqlDialect());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

public interface DestinationHandler<DialectTableDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down

0 comments on commit 08729b0

Please sign in to comment.