From 9b3d406f2b7d0db88f900d0e60b76f771c3a05e2 Mon Sep 17 00:00:00 2001 From: Francis Genet Date: Mon, 6 Jan 2025 16:10:41 -0800 Subject: [PATCH] Something that passes the tests? --- .../base/src/main/resources/application.yaml | 6 ++-- .../NonDockerizedDestination.kt | 6 ++-- .../destination-iceberg-v2/build.gradle | 1 + .../destination/iceberg/v2/io/IcebergUtil.kt | 36 ++++++++++++++----- .../iceberg/v2/IcebergV2DataDumper.kt | 2 +- .../iceberg/v2/IcebergV2TestUtil.kt | 30 ++++++++++++++-- .../iceberg/v2/IcebergV2WriteTest.kt | 19 +++++----- 7 files changed, 73 insertions(+), 27 deletions(-) diff --git a/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml b/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml index 7daaaba91f2f..0e26a0414b6c 100644 --- a/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml +++ b/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml @@ -11,6 +11,6 @@ airbyte: destination: record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null} aws: - external-id: ${AWS_ASSUME_ROLE_EXTERNAL_ID:extid} - access-key-id: ${AWS_ACCESS_KEY_ID:keyid} - secret-access-key: ${AWS_SECRET_ACCESS_KEY:accesskey} \ No newline at end of file + external-id: ${AWS_ASSUME_ROLE_EXTERNAL_ID} + access-key-id: ${AWS_ACCESS_KEY_ID} + secret-access-key: ${AWS_SECRET_ACCESS_KEY} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt index 7c2050855dfc..b229ebf827a1 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt @@ -45,8 +45,10 @@ class NonDockerizedDestination( private val file = File("/tmp/test_file") init { - envVars.forEach { (key, value) -> IntegrationTest.nonDockerMockEnvVars.set(key, value) } - logger.info { "Env vars: $envVars loaded" } + envVars.forEach { (key, value) -> + IntegrationTest.nonDockerMockEnvVars.set(key, value) + logger.info { "Env vars: $key loaded" } + } if (useFileTransfer) { IntegrationTest.nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true") diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle b/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle index 69cdecf20932..e98b6e5392a6 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle @@ -60,6 +60,7 @@ dependencies { integrationTestImplementation("org.apache.iceberg:iceberg-data:${project.ext.apacheIcebergVersion}") integrationTestImplementation("com.squareup.okhttp3:okhttp:4.12.0") integrationTestImplementation("org.projectnessie.nessie:nessie-minio-testcontainer:0.101.1") + integrationTestImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2") } test { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 689b84a6d0fd..d376b04031d7 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.iceberg.v2.io +import com.fasterxml.jackson.annotation.JsonProperty import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.ImportType @@ -23,7 +24,6 @@ import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Value import jakarta.inject.Singleton import java.time.Duration import org.apache.hadoop.conf.Configuration @@ -52,17 +52,22 @@ import software.amazon.awssdk.services.glue.model.ConcurrentModificationExceptio private val logger = KotlinLogging.logger {} const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" +const val EXTERNAL_ID = "AWS_ASSUME_ROLE_EXTERNAL_ID" +const val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" +const val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" + +data class AWSSystemCredentials( + @get:JsonProperty("AWS_ACCESS_KEY_ID") + val AWS_ACCESS_KEY_ID: String, + @get:JsonProperty("AWS_SECRET_ACCESS_KEY") + val AWS_SECRET_ACCESS_KEY: String, + @get:JsonProperty("AWS_ASSUME_ROLE_EXTERNAL_ID") + val AWS_ASSUME_ROLE_EXTERNAL_ID: String +) /** Collection of Iceberg related utilities. */ @Singleton -class IcebergUtil(private val tableIdGenerator: TableIdGenerator, - @Value("\${airbyte.aws.external-id}") val externalId: String, - @Value("\${airbyte.aws.access-key-id}") val accessKeyId: String, - @Value("\${airbyte.aws.secret-access-key}") val secretAccessKey: String) { - - init { - println(externalId) - } +class IcebergUtil(private val tableIdGenerator: TableIdGenerator, val awsSystemCredentials: AWSSystemCredentials? = null) { internal class InvalidFormatException(message: String) : Exception(message) @@ -283,6 +288,19 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator, config: IcebergV2Configuration ): Map { val region = config.s3BucketConfiguration.s3BucketRegion.region + val (accessKeyId, secretAccessKey, externalId) = if (awsSystemCredentials != null) { + Triple( + awsSystemCredentials.AWS_ACCESS_KEY_ID, + awsSystemCredentials.AWS_SECRET_ACCESS_KEY, + awsSystemCredentials.AWS_ASSUME_ROLE_EXTERNAL_ID + ) + } else { + Triple( + System.getenv(AWS_ACCESS_KEY_ID), + System.getenv(AWS_SECRET_ACCESS_KEY), + System.getenv(EXTERNAL_ID) + ) + } return mapOf( AwsProperties.REST_ACCESS_KEY_ID to accessKeyId, diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 012533314939..3aac18532163 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -72,7 +72,7 @@ object IcebergV2DataDumper : DestinationDataDumper { stream: DestinationStream ): List { val config = IcebergV2TestUtil.getConfig(spec) - val catalog = IcebergV2TestUtil.getCatalog(config) + val catalog = IcebergV2TestUtil.getCatalog(config, IcebergV2TestUtil.getAWSSystemCredentials()) val table = catalog.loadTable( TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 55c6897fdb42..dbe45a51ca61 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.iceberg.v2 +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils +import com.fasterxml.jackson.module.kotlin.KotlinModule +import io.airbyte.integrations.destination.iceberg.v2.io.AWSSystemCredentials import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import java.nio.file.Files import java.nio.file.Path @@ -13,18 +17,38 @@ import java.nio.file.Path object IcebergV2TestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") val GLUE_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_assume_role.json") + private val GLUE_AWS_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_aws_assume_role.json") fun parseConfig(path: Path) = getConfig( ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) ) + fun getAWSSystemCredentials(): AWSSystemCredentials { + val mapper = ObjectMapper().registerModule( + KotlinModule.Builder().build() + ) + val configFile = GLUE_AWS_ASSUME_ROLE_CONFIG_PATH.toFile() + return mapper.readValue(configFile, AWSSystemCredentials::class.java) + } + + private val mapper = ObjectMapper().registerModule( + KotlinModule.Builder().build() + ) + + fun getAWSSystemCredentialsAsMap(): Map { + val credentials = getAWSSystemCredentials() + // Convert the data class to a Map using Jackson + return mapper.convertValue(credentials, object : TypeReference>() {}) + } + fun getConfig(spec: ConfigurationSpecification) = IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) - fun getCatalog(config: IcebergV2Configuration) = - IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + fun getCatalog(config: IcebergV2Configuration, awsSystemCredentials: AWSSystemCredentials) = + IcebergUtil(SimpleTableIdGenerator(), awsSystemCredentials).let { icebergUtil -> val props = icebergUtil.toCatalogProperties(config) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) - } + } + } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 6d4354385ea2..79b8ae7f5cff 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -10,6 +10,9 @@ import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped +import io.airbyte.integrations.destination.iceberg.v2.io.EXTERNAL_ID +import io.airbyte.integrations.destination.iceberg.v2.io.AWS_ACCESS_KEY_ID +import io.airbyte.integrations.destination.iceberg.v2.io.AWS_SECRET_ACCESS_KEY import java.nio.file.Files import java.util.Base64 import okhttp3.FormBody @@ -88,7 +91,8 @@ class IcebergGlueWriteTest : Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), IcebergDestinationCleaner( IcebergV2TestUtil.getCatalog( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergV2TestUtil.getAWSSystemCredentials() ) ) ) { @@ -107,19 +111,16 @@ class IcebergGlueAssumeRoleWriteTest : Files.readString(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), IcebergDestinationCleaner( IcebergV2TestUtil.getCatalog( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH) + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), + IcebergV2TestUtil.getAWSSystemCredentials() ) ), + IcebergV2TestUtil.getAWSSystemCredentialsAsMap() ) { @Test - override fun testBasicWrite() { - super.testBasicWrite() - } - - @Test - @Disabled("dest iceberge-v2 doesn't support unknown types") + @Disabled("dest iceberg-v2 doesn't support unknown types") override fun testUnknownTypes() {} - } +} @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally"