Skip to content

Commit

Permalink
Destination redshift: Upgrade cdk (#35316)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
edgao and gisripa authored Mar 5, 2024
1 parent a090088 commit b254a64
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
plugins {
id 'application'
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.8
dockerImageTag: 2.1.9
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -115,8 +116,10 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<RedshiftState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
String rawTableSchema) {
return new RedshiftDestinationHandler(databaseName, database, rawTableSchema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -58,6 +60,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;
Expand Down Expand Up @@ -176,8 +179,10 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new RedshiftDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<RedshiftState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
String rawTableSchema) {
return new RedshiftDestinationHandler(databaseName, database, rawTableSchema);
}

@Override
Expand Down Expand Up @@ -217,22 +222,26 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database);
final CatalogParser catalogParser;
final String rawNamespace;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get();
catalogParser = new CatalogParser(sqlGenerator, rawNamespace);
} else {
rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
catalogParser = new CatalogParser(sqlGenerator);
}
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database, rawNamespace);
parsedCatalog = catalogParser.parseCatalog(catalog);
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper =
new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
}
return StagingConsumerFactory.builder(
outputRecordCollector,
Expand All @@ -252,7 +261,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
/**
* Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of
* file buffers and sets the minimum number to the default
*
* <p>
* NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has
* not been thoroughly load tested across all instance sizes
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

package io.airbyte.integrations.destination.redshift.typing_deduping;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
Expand All @@ -20,12 +19,14 @@
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.jooq.SQLDialect;

@Slf4j
public class RedshiftDestinationHandler extends JdbcDestinationHandler {
public class RedshiftDestinationHandler extends JdbcDestinationHandler<RedshiftState> {

public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase, String rawNamespace) {
// :shrug: apparently this works better than using POSTGRES
super(databaseName, jdbcDatabase, rawNamespace, SQLDialect.DEFAULT);
}

@Override
Expand Down Expand Up @@ -69,6 +70,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected RedshiftState toDestinationState(JsonNode json) {
return new RedshiftState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.typing_deduping

import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState

data class RedshiftState(val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}

override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination;
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
Expand All @@ -46,7 +46,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest {
public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<RedshiftState> {

/**
* Redshift's JDBC driver doesn't map certain data types onto {@link java.sql.JDBCType} usefully.
Expand Down Expand Up @@ -151,8 +151,8 @@ protected DSLContext getDslContext() {
}

@Override
protected DestinationHandler getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database);
protected DestinationHandler<RedshiftState> getDestinationHandler() {
return new RedshiftDestinationHandler(databaseName, database, namespace);
}

@Override
Expand Down Expand Up @@ -180,11 +180,11 @@ protected Field<?> toJsonValue(final String valueAsString) {
public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
List<DestinationInitialStatus<RedshiftState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<RedshiftState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
assertFalse(initialStatus.isSchemaMismatch());
// TODO assert on table clustering, etc.
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.9 | 2024-03-04 | [\#35316](https://github.com/airbytehq/airbyte/pull/35316) | Update to CDK 0.23.11; Adopt migration framework |
| 2.1.8 | 2024-02-09 | [\#35354](https://github.com/airbytehq/airbyte/pull/35354) | Update to CDK 0.23.0; Gather required initial state upfront, remove dependency on svv_table_info for table empty check |
| 2.1.7 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Switch back to jooq-based sql execution for standard insert |
| 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 |
Expand Down

0 comments on commit b254a64

Please sign in to comment.