From e6acca7b651e8b41c8ff7d60b97b0644f7ebfc1d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 3 Jun 2024 14:52:08 -0700 Subject: [PATCH] delete insert-specific stuff --- .../destination-redshift/build.gradle | 2 +- .../redshift/RedshiftDestination.kt | 89 --------- .../redshift/RedshiftInsertDestination.kt | 181 ------------------ .../redshift/RedshiftStagingS3Destination.kt | 96 ++++++++-- .../src/main/resources/spec.json | 11 -- .../redshift/RedshiftConnectionTest.java | 4 +- .../RedshiftDestinationAcceptanceTest.java | 2 +- ...dshiftInsertDestinationAcceptanceTest.java | 24 --- ...dshiftInsertDestinationAcceptanceTest.java | 33 ---- ...RedshiftDestinationBaseAcceptanceTest.java | 2 +- .../AbstractRedshiftTypingDedupingTest.java | 6 +- .../RedshiftSqlGeneratorIntegrationTest.java | 4 +- ...hemaOverrideDisableTypingDedupingTest.java | 36 ---- ...hiftStandardInsertsTypingDedupingTest.java | 88 --------- .../redshift/RedshiftDestinationTest.java | 62 ------ .../redshift/RedshiftSpecTest.java | 2 +- 16 files changed, 96 insertions(+), 546 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 251d71c4b9f13..2cad829cae467 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -24,7 +24,7 @@ compileKotlin { airbyteJavaConnector.addCdkDependencies() application { - mainClass = 'io.airbyte.integrations.destination.redshift.RedshiftDestination' + mainClass = 'io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination' applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=100',] diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt deleted file mode 100644 index dcef6414ea748..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.cdk.integrations.base.Destination -import io.airbyte.cdk.integrations.base.IntegrationRunner -import io.airbyte.cdk.integrations.destination.jdbc.copy.SwitchingDestination -import io.airbyte.commons.json.Jsons.deserialize -import io.airbyte.commons.resources.MoreResources.readResource -import io.airbyte.integrations.destination.redshift.util.RedshiftUtil -import io.airbyte.protocol.models.v0.ConnectorSpecification -import java.util.function.Function -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL - * Insert statement. Although less efficient, this requires less user set up. See - * [RedshiftInsertDestination] for more detail. The second inserts via streaming the data to an S3 - * bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for - * production workloads, but does require users to set up an S3 bucket and pass in additional - * credentials. See [RedshiftStagingS3Destination] for more detail. This class inspect the given - * arguments to determine which strategy to use. - */ -class RedshiftDestination : - SwitchingDestination( - DestinationType::class.java, - Function { config: JsonNode -> getTypeFromConfig(config) }, - destinationMap - ) { - enum class DestinationType { - STANDARD, - COPY_S3 - } - - @Throws(Exception::class) - override fun spec(): ConnectorSpecification { - // inject the standard ssh configuration into the spec. - val originalSpec = super.spec() - val propNode = originalSpec.connectionSpecification["properties"] as ObjectNode - propNode.set("tunnel_method", deserialize(readResource("ssh-tunnel-spec.json"))) - return originalSpec - } - - override val isV2Destination: Boolean - get() = true - - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(RedshiftDestination::class.java) - - private val destinationMap: Map = - java.util.Map.of( - DestinationType.STANDARD, - RedshiftInsertDestination.Companion.sshWrappedDestination(), - DestinationType.COPY_S3, - RedshiftStagingS3Destination.Companion.sshWrappedDestination() - ) - - private fun getTypeFromConfig(config: JsonNode): DestinationType { - return determineUploadMode(config) - } - - @JvmStatic - fun determineUploadMode(config: JsonNode): DestinationType { - val jsonNode = RedshiftUtil.findS3Options(config) - - if (RedshiftUtil.anyOfS3FieldsAreNullOrEmpty(jsonNode)) { - LOGGER.warn( - "The \"standard\" upload mode is not performant, and is not recommended for production. " + - "Please use the Amazon S3 upload mode if you are syncing a large amount of data." - ) - return DestinationType.STANDARD - } - return DestinationType.COPY_S3 - } - - @Throws(Exception::class) - @JvmStatic - fun main(args: Array) { - val destination: Destination = RedshiftDestination() - LOGGER.info("starting destination: {}", RedshiftDestination::class.java) - IntegrationRunner(destination).run(args) - LOGGER.info("completed destination: {}", RedshiftDestination::class.java) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt deleted file mode 100644 index 2510e545fbab8..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.fasterxml.jackson.databind.JsonNode -import com.google.common.collect.ImmutableMap -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.factory.DataSourceFactory.create -import io.airbyte.cdk.db.factory.DatabaseDriver -import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcSourceOperations -import io.airbyte.cdk.db.jdbc.JdbcUtils -import io.airbyte.cdk.integrations.base.Destination -import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination -import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator -import io.airbyte.commons.json.Jsons.jsonNode -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator -import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer -import io.airbyte.integrations.destination.redshift.util.RedshiftUtil -import java.time.Duration -import java.util.* -import javax.sql.DataSource - -class RedshiftInsertDestination : - AbstractJdbcDestination( - DRIVER_CLASS, - REDSHIFT_OPTIMAL_BATCH_SIZE_FOR_FLUSH, - RedshiftSQLNameTransformer(), - RedshiftSqlOperations() - ) { - override fun toJdbcConfig(redshiftConfig: JsonNode): JsonNode { - return getJdbcConfig(redshiftConfig) - } - - override fun getDataSource(config: JsonNode): DataSource { - val jdbcConfig = getJdbcConfig(config) - return create( - jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), - if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() - else null, - DRIVER_CLASS, - jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), - getDefaultConnectionProperties(config), - Duration.ofMinutes(2) - ) - } - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - @Throws(Exception::class) - override fun destinationSpecificTableOperations(database: JdbcDatabase?) { - RedshiftUtil.checkSvvTableAccess(database!!) - } - - override fun getDatabase(dataSource: DataSource): JdbcDatabase { - return DefaultJdbcDatabase(dataSource) - } - - fun getDatabase(dataSource: DataSource, sourceOperations: JdbcSourceOperations?): JdbcDatabase { - return DefaultJdbcDatabase(dataSource, sourceOperations) - } - - override fun getDefaultConnectionProperties(config: JsonNode): Map { - // The following properties can be overriden through jdbcUrlParameters in the config. - val connectionOptions: MutableMap = HashMap() - // Redshift properties - // https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option - // connectTimeout is different from Hikari pool's connectionTimout, driver defaults to - // 10seconds so - // increase it to match hikari's default - connectionOptions["connectTimeout"] = "120" - // See RedshiftProperty.LOG_SERVER_ERROR_DETAIL, defaults to true - connectionOptions["logservererrordetail"] = "false" - // HikariPool properties - // https://github.com/brettwooldridge/HikariCP?tab=readme-ov-file#frequently-used - // TODO: Change data source factory to configure these properties - connectionOptions.putAll(SSL_JDBC_PARAMETERS) - return connectionOptions - } - - override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { - return RedshiftSqlGenerator(super.namingResolver, config) - } - - override fun getDestinationHandler( - databaseName: String, - database: JdbcDatabase, - rawTableSchema: String - ): JdbcDestinationHandler { - return RedshiftDestinationHandler(databaseName, database, rawTableSchema) - } - - override fun getMigrations( - database: JdbcDatabase, - databaseName: String, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler - ): List> { - return java.util.List.of>( - RedshiftRawTableAirbyteMetaMigration(database, databaseName) - ) - } - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - override fun getDataTransformer( - parsedCatalog: ParsedCatalog?, - defaultNamespace: String? - ): StreamAwareDataTransformer { - return RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace!!) - } - - companion object { - val DRIVER_CLASS: String = DatabaseDriver.REDSHIFT.driverClassName - @JvmField - val SSL_JDBC_PARAMETERS: Map = - ImmutableMap.of( - JdbcUtils.SSL_KEY, - "true", - "sslfactory", - "com.amazon.redshift.ssl.NonValidatingFactory" - ) - - // insert into stmt has ~200 bytes - // Per record overhead of ~150 bytes for strings in statement like JSON_PARSE.. uuid etc - // If the flush size allows the max batch of 10k records, then net overhead is ~1.5MB. - // Lets round it to 2MB for wiggle room and keep a max buffer of 14MB per flush. - // This will allow not sending record set larger than 14M limiting the batch insert - // statement. - private const val REDSHIFT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 14 * 1024 * 1024L - - fun sshWrappedDestination(): Destination { - return SshWrappedDestination( - RedshiftInsertDestination(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY - ) - } - - fun getJdbcConfig(redshiftConfig: JsonNode): JsonNode { - val schema = - Optional.ofNullable(redshiftConfig[JdbcUtils.SCHEMA_KEY]) - .map { obj: JsonNode -> obj.asText() } - .orElse("public") - val configBuilder = - ImmutableMap.builder() - .put(JdbcUtils.USERNAME_KEY, redshiftConfig[JdbcUtils.USERNAME_KEY].asText()) - .put(JdbcUtils.PASSWORD_KEY, redshiftConfig[JdbcUtils.PASSWORD_KEY].asText()) - .put( - JdbcUtils.JDBC_URL_KEY, - String.format( - "jdbc:redshift://%s:%s/%s", - redshiftConfig[JdbcUtils.HOST_KEY].asText(), - redshiftConfig[JdbcUtils.PORT_KEY].asText(), - redshiftConfig[JdbcUtils.DATABASE_KEY].asText() - ) - ) - .put(JdbcUtils.SCHEMA_KEY, schema) - - if (redshiftConfig.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { - configBuilder.put( - JdbcUtils.JDBC_URL_PARAMS_KEY, - redshiftConfig[JdbcUtils.JDBC_URL_PARAMS_KEY] - ) - } - - return jsonNode(configBuilder.build()) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt index c59c8969c57e3..633eb767a4815 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt @@ -4,15 +4,20 @@ package io.airbyte.integrations.destination.redshift import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.google.common.collect.ImmutableMap import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.db.factory.DataSourceFactory.close import io.airbyte.cdk.db.factory.DataSourceFactory.create +import io.airbyte.cdk.db.factory.DatabaseDriver import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcSourceOperations import io.airbyte.cdk.db.jdbc.JdbcUtils import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride @@ -33,7 +38,10 @@ import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory.Companion.builder import io.airbyte.commons.exceptions.ConnectionErrorException +import io.airbyte.commons.json.Jsons.deserialize import io.airbyte.commons.json.Jsons.emptyObject +import io.airbyte.commons.json.Jsons.jsonNode +import io.airbyte.commons.resources.MoreResources.readResource import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler @@ -55,7 +63,9 @@ import io.airbyte.integrations.destination.redshift.util.RedshiftUtil import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.protocol.models.v0.ConnectorSpecification import java.time.Duration +import java.util.Optional import java.util.function.Consumer import javax.sql.DataSource import org.apache.commons.lang3.NotImplementedException @@ -65,7 +75,7 @@ import org.slf4j.LoggerFactory class RedshiftStagingS3Destination : AbstractJdbcDestination( - RedshiftInsertDestination.DRIVER_CLASS, + DRIVER_CLASS, RedshiftSQLNameTransformer(), RedshiftSqlOperations() ), @@ -79,6 +89,15 @@ class RedshiftStagingS3Destination : encryptionConfig.keyType == AesCbcEnvelopeEncryption.KeyType.EPHEMERAL } + @Throws(Exception::class) + override fun spec(): ConnectorSpecification { + // inject the standard ssh configuration into the spec. + val originalSpec = super.spec() + val propNode = originalSpec.connectionSpecification["properties"] as ObjectNode + propNode.set("tunnel_method", deserialize(readResource("ssh-tunnel-spec.json"))) + return originalSpec + } + override fun check(config: JsonNode): AirbyteConnectionStatus? { val s3Config: S3DestinationConfig = S3DestinationConfig.getS3DestinationConfig(RedshiftUtil.findS3Options(config)) @@ -148,26 +167,33 @@ class RedshiftStagingS3Destination : } } + override val isV2Destination: Boolean = true + override fun getDataSource(config: JsonNode): DataSource { - val jdbcConfig: JsonNode = RedshiftInsertDestination.Companion.getJdbcConfig(config) + val jdbcConfig: JsonNode = getJdbcConfig(config) return create( jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() else null, - RedshiftInsertDestination.Companion.DRIVER_CLASS, + DRIVER_CLASS, jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), getDefaultConnectionProperties(config), Duration.ofMinutes(2) ) } + override fun getDatabase(dataSource: DataSource): JdbcDatabase { + return DefaultJdbcDatabase(dataSource) + } + + fun getDatabase(dataSource: DataSource, sourceOperations: JdbcSourceOperations?): JdbcDatabase { + return DefaultJdbcDatabase(dataSource, sourceOperations) + } + override val namingResolver: NamingConventionTransformer get() = RedshiftSQLNameTransformer() override fun getDefaultConnectionProperties(config: JsonNode): Map { - // TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination - // into a - // base class. // The following properties can be overriden through jdbcUrlParameters in the config. val connectionOptions: MutableMap = HashMap() // Redshift properties @@ -183,7 +209,7 @@ class RedshiftStagingS3Destination : // sitting idle // in the pool. connectionOptions["keepaliveTime"] = Duration.ofSeconds(30).toMillis().toString() - connectionOptions.putAll(RedshiftInsertDestination.Companion.SSL_JDBC_PARAMETERS) + connectionOptions.putAll(SSL_JDBC_PARAMETERS) return connectionOptions } @@ -204,13 +230,13 @@ class RedshiftStagingS3Destination : return RedshiftDestinationHandler(databaseName, database, rawTableSchema) } - protected override fun getMigrations( + override fun getMigrations( database: JdbcDatabase, databaseName: String, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler ): List> { - return java.util.List.of>( + return listOf>( RedshiftRawTableAirbyteMetaMigration(database, databaseName) ) } @@ -240,7 +266,7 @@ class RedshiftStagingS3Destination : config: JsonNode, catalog: ConfiguredAirbyteCatalog, outputRecordCollector: Consumer - ): SerializedAirbyteMessageConsumer? { + ): SerializedAirbyteMessageConsumer { val encryptionConfig = if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) fromJson( @@ -332,12 +358,60 @@ class RedshiftStagingS3Destination : private val LOGGER: Logger = LoggerFactory.getLogger(RedshiftStagingS3Destination::class.java) - fun sshWrappedDestination(): Destination { + val DRIVER_CLASS: String = DatabaseDriver.REDSHIFT.driverClassName + val SSL_JDBC_PARAMETERS: Map = + ImmutableMap.of( + JdbcUtils.SSL_KEY, + "true", + "sslfactory", + "com.amazon.redshift.ssl.NonValidatingFactory" + ) + + private fun sshWrappedDestination(): Destination { return SshWrappedDestination( RedshiftStagingS3Destination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY ) } + + fun getJdbcConfig(redshiftConfig: JsonNode): JsonNode { + val schema = + Optional.ofNullable(redshiftConfig[JdbcUtils.SCHEMA_KEY]) + .map { obj: JsonNode -> obj.asText() } + .orElse("public") + val configBuilder = + ImmutableMap.builder() + .put(JdbcUtils.USERNAME_KEY, redshiftConfig[JdbcUtils.USERNAME_KEY].asText()) + .put(JdbcUtils.PASSWORD_KEY, redshiftConfig[JdbcUtils.PASSWORD_KEY].asText()) + .put( + JdbcUtils.JDBC_URL_KEY, + String.format( + "jdbc:redshift://%s:%s/%s", + redshiftConfig[JdbcUtils.HOST_KEY].asText(), + redshiftConfig[JdbcUtils.PORT_KEY].asText(), + redshiftConfig[JdbcUtils.DATABASE_KEY].asText() + ) + ) + .put(JdbcUtils.SCHEMA_KEY, schema) + + if (redshiftConfig.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { + configBuilder.put( + JdbcUtils.JDBC_URL_PARAMS_KEY, + redshiftConfig[JdbcUtils.JDBC_URL_PARAMS_KEY] + ) + } + + return jsonNode(configBuilder.build()) + } + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + val destination: Destination = sshWrappedDestination() + LOGGER.info("starting destination: {}", RedshiftStagingS3Destination::class.java) + IntegrationRunner(destination).run(args) + LOGGER.info("completed destination: {}", RedshiftStagingS3Destination::class.java) + } } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 6372072375c4e..7f291ffc3ba50 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -225,17 +225,6 @@ "order": 7 } } - }, - { - "title": "Standard", - "required": ["method"], - "description": "(not recommended) Direct loading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In all other cases, you should use S3 uploading.", - "properties": { - "method": { - "type": "string", - "const": "Standard" - } - } } ] }, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java index dfefbf0c0f100..72b6bcf43af3a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java @@ -17,8 +17,8 @@ public class RedshiftConnectionTest { - private final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); - private final RedshiftDestination destination = new RedshiftDestination(); + private final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); + private final RedshiftStagingS3Destination destination = new RedshiftStagingS3Destination(); private AirbyteConnectionStatus status; @Test diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java index 4ea666f811dac..195bd54a52f0a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java @@ -223,7 +223,7 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { protected Database createDatabase() { connection = ConnectionFactory.create(baseConfig.get(JdbcUtils.USERNAME_KEY).asText(), baseConfig.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftInsertDestination.SSL_JDBC_PARAMETERS, + RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), baseConfig.get(JdbcUtils.HOST_KEY).asText(), baseConfig.get(JdbcUtils.PORT_KEY).asInt(), diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java deleted file mode 100644 index 59e9130af79fd..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import org.junit.jupiter.api.Disabled; - -/** - * Integration test testing the {@link RedshiftInsertDestination}. - */ -@Disabled -public class RedshiftInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { - - public JsonNode getStaticConfig() throws IOException { - return Jsons.deserialize(Files.readString(Path.of("secrets/config.json"))); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java deleted file mode 100644 index 4c2da0a04ce77..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.nio.file.Path; -import org.junit.jupiter.api.Disabled; - -/* - * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL - * Insert mechanism for upload of data and "key" authentication for the SSH bastion configuration. - */ -@Disabled -public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { - - @Override - public TunnelMethod getTunnelMethod() { - return TunnelMethod.SSH_KEY_AUTH; - } - - public JsonNode getStaticConfig() throws IOException { - final Path configPath = Path.of("secrets/config.json"); - final String configAsString = IOs.readFile(configPath); - return Jsons.deserialize(configAsString); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java index 22ef33b5b0988..1bd18c2eae949 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -146,7 +146,7 @@ protected TestDataComparator getTestDataComparator() { private Database createDatabaseFromConfig(final JsonNode config) { connection = ConnectionFactory.create(config.get(JdbcUtils.USERNAME_KEY).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftInsertDestination.SSL_JDBC_PARAMETERS, + RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asInt(), diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java index 729a1c90245ce..a4c11560a591a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; -import static io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer.*; +import static io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; -import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; +import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -41,7 +41,7 @@ protected String getImageName() { @Override protected DataSource getDataSource(final JsonNode config) { - return new RedshiftInsertDestination().getDataSource(config); + return new RedshiftStagingS3Destination().getDataSource(config); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index ddcf82773bd37..aeba929cf24ad 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -29,7 +29,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; -import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; +import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; import java.nio.file.Files; import java.nio.file.Path; @@ -138,7 +138,7 @@ public static void setupJdbcDatasource() throws Exception { databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); // TODO: Its sad to instantiate unneeded dependency to construct database and datsources. pull it to // static methods. - final RedshiftInsertDestination insertDestination = new RedshiftInsertDestination(); + final RedshiftStagingS3Destination insertDestination = new RedshiftStagingS3Destination(); dataSource = insertDestination.getDataSource(config); database = insertDestination.getDatabase(dataSource, new RedshiftSourceOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java deleted file mode 100644 index b7c78a4cec8e0..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.typing_deduping; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.nio.file.Path; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -public class RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { - - @Override - protected ObjectNode getBaseConfig() { - return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config_raw_schema_override.json"))); - } - - @Override - protected String getRawSchema() { - return "overridden_raw_dataset"; - } - - @Override - protected boolean disableFinalTableComparison() { - return true; - } - - @Disabled - @Test - @Override - public void identicalNameSimultaneousSync() {} - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java deleted file mode 100644 index 7f99362777dde..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.typing_deduping; - -import static org.junit.jupiter.api.Assertions.*; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class RedshiftStandardInsertsTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { - - @Override - protected ObjectNode getBaseConfig() { - return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config.json"))); - } - - @Test - public void testStandardInsertBatchSizeGtThan16Mb() throws Exception { - final String placeholderRecord = """ - {"type": "RECORD", - "record":{ - "emitted_at": 1000, - "data": { - "id1": 1, - "id2": 200, - "updated_at": "2000-01-01T00:00:00Z", - "_ab_cdc_deleted_at": null, - "name": "PLACE_HOLDER", - "address": {"city": "San Francisco", "state": "CA"}} - } - } - """; - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withStream(new AirbyteStream() - .withNamespace(getStreamNamespace()) - .withName(getStreamName()) - .withJsonSchema(getSchema())))); - List messages = new ArrayList<>(); - final int numberOfRecords = 1000; - for (int i = 0; i < numberOfRecords; ++i) { - // Stuff the record with 40Kb string, making the total record size to 41233 bytes - // Total sync generates ~39MB in 1000 records. - // Standard insert should not fail and chunk it into smaller inserts < 16MB statement length - final AirbyteMessage placeHolderMessage = Jsons.deserialize(placeholderRecord, AirbyteMessage.class); - placeHolderMessage.getRecord().setNamespace(getStreamNamespace()); - placeHolderMessage.getRecord().setStream(getStreamName()); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("id1", i); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("id2", 200 + i); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("name", generateRandomString(40 * 1024)); - messages.add(placeHolderMessage); - } - runSync(catalog, messages); - // we just need to iterate over final tables to verify the count and confirm they are inserted - // properly. - List finalTableResults = dumpFinalTableRecords(getStreamNamespace(), getStreamName()); - assertEquals(1000, finalTableResults.size()); - // getJsons query doesn't have order by clause, so using sum of n-numbers math to assert all IDs are - // inserted - int id1sum = 0; - int id2sum = 0; - int id1ExpectedSum = ((numberOfRecords - 1) * (numberOfRecords)) / 2; // n(n+1)/2 - int id2ExpectedSum = (200 * numberOfRecords) + id1ExpectedSum; // 200*n + id1Sum - for (JsonNode record : finalTableResults) { - id1sum += record.get("id1").asInt(); - id2sum += record.get("id2").asInt(); - } - assertEquals(id1ExpectedSum, id1sum); - assertEquals(id2ExpectedSum, id2sum); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java deleted file mode 100644 index 8467d6a65fb6e..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airbyte.commons.jackson.MoreMappers; -import io.airbyte.integrations.destination.redshift.RedshiftDestination.DestinationType; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -@DisplayName("RedshiftDestination") -public class RedshiftDestinationTest { - - private static final ObjectMapper mapper = MoreMappers.initMapper(); - - @Test - @DisplayName("When not given S3 credentials should use INSERT") - public void useStandardInsert() { - final var standardInsertConfigStub = mapper.createObjectNode(); - standardInsertConfigStub.put("method", "Standard"); - final var uploadingMethodStub = mapper.createObjectNode(); - uploadingMethodStub.set("uploading_method", standardInsertConfigStub); - assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(uploadingMethodStub)); - } - - @Test - @DisplayName("When given standard backward compatibility test") - public void useStandardInsertBackwardCompatibility() { - final var standardInsertConfigStub = mapper.createObjectNode(); - assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(standardInsertConfigStub)); - } - - @Test - @DisplayName("When given S3 credentials should use COPY") - public void useS3Staging() { - final var s3StagingStub = mapper.createObjectNode(); - final var uploadingMethodStub = mapper.createObjectNode(); - s3StagingStub.put("s3_bucket_name", "fake-bucket"); - s3StagingStub.put("s3_bucket_region", "fake-region"); - s3StagingStub.put("access_key_id", "test"); - s3StagingStub.put("secret_access_key", "test key"); - s3StagingStub.put("method", "S3 Staging"); - uploadingMethodStub.set("uploading_method", s3StagingStub); - assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(uploadingMethodStub)); - } - - @Test - @DisplayName("When given S3 backward compatibility test") - public void useS3StagingBackwardCompatibility() { - final var s3StagingStub = mapper.createObjectNode(); - s3StagingStub.put("s3_bucket_name", "fake-bucket"); - s3StagingStub.put("s3_bucket_region", "fake-region"); - s3StagingStub.put("access_key_id", "test"); - s3StagingStub.put("secret_access_key", "test key"); - assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(s3StagingStub)); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftSpecTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftSpecTest.java index 308232c05cb6a..697fbba777d17 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftSpecTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftSpecTest.java @@ -97,7 +97,7 @@ void testWithJdbcAdditionalProperty() { @Test void testJdbcAdditionalProperty() throws Exception { - final ConnectorSpecification spec = new RedshiftDestination().spec(); + final ConnectorSpecification spec = new RedshiftStagingS3Destination().spec(); assertNotNull(spec.getConnectionSpecification().get("properties").get("jdbc_url_params")); }