Skip to content

Commit

Permalink
Destination Redshift: CDK T+D initial state refactor (airbytehq#35354)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
gisripa authored and jatinyadav-cc committed Feb 26, 2024
1 parent ec28fd1 commit ba7c3d7
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.0'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.7
dockerImageTag: 2.1.8
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final int defaultThreadCount = 8;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
defaultThreadCount);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
new DefaultTyperDeduper(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
}
return StagingConsumerFactory.builder(
outputRecordCollector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.integrations.destination.redshift.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -49,37 +55,36 @@ public void execute(final Sql sql) throws Exception {
}
}

/**
* Issuing a select 1 limit 1 query can be expensive, so relying on SVV_TABLE_INFO system table.
* EXPLAIN of the select 1 from table limit 1 query: (seq scan and then limit is applied, read from
* bottom to top) XN Lim it (co st=0. 0 .0.01 rows=1 width=0) -> XN Seq Scan on _airbyte_raw_ users
* (cost=0.00..1000.00 rows=100000 width=0)
*
* @param id
* @return
* @throws Exception
*/
@Override
public boolean isFinalTableEmpty(final StreamId id) throws Exception {
// Redshift doesn't have an information_schema.tables table, so we have to use SVV_TABLE_INFO.
// From https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_TABLE_INFO.html:
// > The SVV_TABLE_INFO view doesn't return any information for empty tables.
// So we just query for our specific table, and if we get no rows back,
// then we assume the table is empty.
// Note that because the column names are reserved words (table, schema, database),
// we need to enquote them.
final List<JsonNode> query = jdbcDatabase.queryJsons(
"""
SELECT 1
FROM SVV_TABLE_INFO
WHERE "database" = ?
AND "schema" = ?
AND "table" = ?
""",
databaseName,
id.finalNamespace(),
id.finalName());
return query.isEmpty();
protected String toJdbcTypeName(AirbyteType airbyteType) {
// This is mostly identical to the postgres implementation, but swaps jsonb to super
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
return toJdbcTypeName(airbyteProtocolType);
}
return switch (airbyteType.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "super";
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
};
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
case NUMBER -> "numeric";
case INTEGER -> "int8";
case BOOLEAN -> "bool";
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
case TIME_WITH_TIMEZONE -> "timetz";
case TIME_WITHOUT_TIMEZONE -> "time";
case DATE -> "date";
case UNKNOWN -> "super";
};
}

// Do not use SVV_TABLE_INFO to get isFinalTableEmpty.
// See https://github.com/airbytehq/airbyte/issues/34357

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@
import static org.jooq.impl.DSL.rowNumber;
import static org.jooq.impl.DSL.val;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.jooq.Condition;
Expand All @@ -47,12 +42,6 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {

public static final String CASE_STATEMENT_SQL_TEMPLATE = "CASE WHEN {0} THEN {1} ELSE {2} END ";
public static final String CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE = "CASE WHEN {0} THEN {1} END ";
private static final Map<String, String> REDSHIFT_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
"numeric", "decimal",
"int8", "bigint",
"bool", "boolean",
"timestamptz", "timestamp with time zone",
"timetz", "time with time zone");
private static final String COLUMN_ERROR_MESSAGE_FORMAT = "Problem with `%s`";
private static final String AIRBYTE_META_COLUMN_ERRORS_KEY = "errors";

Expand Down Expand Up @@ -168,7 +157,6 @@ Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
}

Field<?> result = arrays.get(0);
String renderedSql = getDslContext().render(result);
for (int i = 1; i < arrays.size(); i++) {
// We lose some nice indentation but thats ok. Queryparts
// are intentionally rendered here to avoid deep stack for function sql rendering.
Expand Down Expand Up @@ -199,29 +187,6 @@ protected Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteT

}

@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps jsonb to super
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
LinkedHashMap::putAll);
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromRedshiftTypeName(column.getValue().type())),
LinkedHashMap::putAll);

final boolean sameColumns = actualColumns.equals(intendedColumns)
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
&& "super".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());

return sameColumns;
}

/**
* Return ROW_NUMBER() OVER (PARTITION BY primaryKeys ORDER BY cursor DESC NULLS LAST,
* _airbyte_extracted_at DESC)
Expand Down Expand Up @@ -265,8 +230,4 @@ public boolean shouldRetry(final Exception e) {
return false;
}

private static String jdbcTypeNameFromRedshiftTypeName(final String redshiftType) {
return REDSHIFT_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,4 @@ protected int getMaxRecordValueLimit() {
return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
}

@Override
protected int getGenerateBigStringAddExtraCharacters() {
return 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
protected SqlGenerator getSqlGenerator() {
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {

// Override only for tests to print formatted SQL. The actual implementation should use unformatted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;
import static io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.escapeStringLiteral;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -17,11 +17,11 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination;
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
Expand All @@ -33,7 +33,7 @@
import java.time.LocalDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.List;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.DataType;
Expand Down Expand Up @@ -151,7 +151,7 @@ protected DSLContext getDslContext() {
}

@Override
protected DestinationHandler<TableDefinition> getDestinationHandler() {
protected DestinationHandler getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database);
}

Expand Down Expand Up @@ -180,29 +180,11 @@ protected Field<?> toJsonValue(final String valueAsString) {
public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());

assertTrue(existingTable.isPresent());
assertAll(
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
() -> assertEquals("super", existingTable.get().columns().get("_airbyte_meta").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
() -> assertEquals("super", existingTable.get().columns().get("struct").type()),
() -> assertEquals("super", existingTable.get().columns().get("array").type()),
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
() -> assertEquals("super", existingTable.get().columns().get("unknown").type()));
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
// TODO assert on table clustering, etc.
}

Expand Down
Loading

0 comments on commit ba7c3d7

Please sign in to comment.