Skip to content

Commit

Permalink
Something that passes the tests?
Browse files Browse the repository at this point in the history
  • Loading branch information
frifriSF59 committed Jan 7, 2025
1 parent 5acf361 commit 9b3d406
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ airbyte:
destination:
record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null}
aws:
external-id: ${AWS_ASSUME_ROLE_EXTERNAL_ID:extid}
access-key-id: ${AWS_ACCESS_KEY_ID:keyid}
secret-access-key: ${AWS_SECRET_ACCESS_KEY:accesskey}
external-id: ${AWS_ASSUME_ROLE_EXTERNAL_ID}
access-key-id: ${AWS_ACCESS_KEY_ID}
secret-access-key: ${AWS_SECRET_ACCESS_KEY}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class NonDockerizedDestination(
private val file = File("/tmp/test_file")

init {
envVars.forEach { (key, value) -> IntegrationTest.nonDockerMockEnvVars.set(key, value) }
logger.info { "Env vars: $envVars loaded" }
envVars.forEach { (key, value) ->
IntegrationTest.nonDockerMockEnvVars.set(key, value)
logger.info { "Env vars: $key loaded" }
}

if (useFileTransfer) {
IntegrationTest.nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
integrationTestImplementation("org.apache.iceberg:iceberg-data:${project.ext.apacheIcebergVersion}")
integrationTestImplementation("com.squareup.okhttp3:okhttp:4.12.0")
integrationTestImplementation("org.projectnessie.nessie:nessie-minio-testcontainer:0.101.1")
integrationTestImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import com.fasterxml.jackson.annotation.JsonProperty
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.ImportType
Expand All @@ -23,7 +24,6 @@ import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY
import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import java.time.Duration
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -52,17 +52,22 @@ import software.amazon.awssdk.services.glue.model.ConcurrentModificationExceptio
private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"
const val EXTERNAL_ID = "AWS_ASSUME_ROLE_EXTERNAL_ID"
const val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
const val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"

data class AWSSystemCredentials(
@get:JsonProperty("AWS_ACCESS_KEY_ID")
val AWS_ACCESS_KEY_ID: String,
@get:JsonProperty("AWS_SECRET_ACCESS_KEY")
val AWS_SECRET_ACCESS_KEY: String,
@get:JsonProperty("AWS_ASSUME_ROLE_EXTERNAL_ID")
val AWS_ASSUME_ROLE_EXTERNAL_ID: String
)

/** Collection of Iceberg related utilities. */
@Singleton
class IcebergUtil(private val tableIdGenerator: TableIdGenerator,
@Value("\${airbyte.aws.external-id}") val externalId: String,
@Value("\${airbyte.aws.access-key-id}") val accessKeyId: String,
@Value("\${airbyte.aws.secret-access-key}") val secretAccessKey: String) {

init {
println(externalId)
}
class IcebergUtil(private val tableIdGenerator: TableIdGenerator, val awsSystemCredentials: AWSSystemCredentials? = null) {

internal class InvalidFormatException(message: String) : Exception(message)

Expand Down Expand Up @@ -283,6 +288,19 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator,
config: IcebergV2Configuration
): Map<String, String> {
val region = config.s3BucketConfiguration.s3BucketRegion.region
val (accessKeyId, secretAccessKey, externalId) = if (awsSystemCredentials != null) {
Triple(
awsSystemCredentials.AWS_ACCESS_KEY_ID,
awsSystemCredentials.AWS_SECRET_ACCESS_KEY,
awsSystemCredentials.AWS_ASSUME_ROLE_EXTERNAL_ID
)
} else {
Triple(
System.getenv(AWS_ACCESS_KEY_ID),
System.getenv(AWS_SECRET_ACCESS_KEY),
System.getenv(EXTERNAL_ID)
)
}

return mapOf(
AwsProperties.REST_ACCESS_KEY_ID to accessKeyId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object IcebergV2DataDumper : DestinationDataDumper {
stream: DestinationStream
): List<OutputRecord> {
val config = IcebergV2TestUtil.getConfig(spec)
val catalog = IcebergV2TestUtil.getCatalog(config)
val catalog = IcebergV2TestUtil.getCatalog(config, IcebergV2TestUtil.getAWSSystemCredentials())
val table =
catalog.loadTable(
TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,51 @@

package io.airbyte.integrations.destination.iceberg.v2

import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import com.fasterxml.jackson.module.kotlin.KotlinModule
import io.airbyte.integrations.destination.iceberg.v2.io.AWSSystemCredentials
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import java.nio.file.Files
import java.nio.file.Path

object IcebergV2TestUtil {
val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json")
val GLUE_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_assume_role.json")
private val GLUE_AWS_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_aws_assume_role.json")

fun parseConfig(path: Path) =
getConfig(
ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))
)

fun getAWSSystemCredentials(): AWSSystemCredentials {
val mapper = ObjectMapper().registerModule(
KotlinModule.Builder().build()
)
val configFile = GLUE_AWS_ASSUME_ROLE_CONFIG_PATH.toFile()
return mapper.readValue(configFile, AWSSystemCredentials::class.java)
}

private val mapper = ObjectMapper().registerModule(
KotlinModule.Builder().build()
)

fun getAWSSystemCredentialsAsMap(): Map<String, String> {
val credentials = getAWSSystemCredentials()
// Convert the data class to a Map using Jackson
return mapper.convertValue(credentials, object : TypeReference<Map<String, String>>() {})
}

fun getConfig(spec: ConfigurationSpecification) =
IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification)

fun getCatalog(config: IcebergV2Configuration) =
IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil ->
fun getCatalog(config: IcebergV2Configuration, awsSystemCredentials: AWSSystemCredentials) =
IcebergUtil(SimpleTableIdGenerator(), awsSystemCredentials).let { icebergUtil ->
val props = icebergUtil.toCatalogProperties(config)
icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.integrations.destination.iceberg.v2.io.EXTERNAL_ID
import io.airbyte.integrations.destination.iceberg.v2.io.AWS_ACCESS_KEY_ID
import io.airbyte.integrations.destination.iceberg.v2.io.AWS_SECRET_ACCESS_KEY
import java.nio.file.Files
import java.util.Base64
import okhttp3.FormBody
Expand Down Expand Up @@ -88,7 +91,8 @@ class IcebergGlueWriteTest :
Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH),
IcebergDestinationCleaner(
IcebergV2TestUtil.getCatalog(
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH),
IcebergV2TestUtil.getAWSSystemCredentials()
)
)
) {
Expand All @@ -107,19 +111,16 @@ class IcebergGlueAssumeRoleWriteTest :
Files.readString(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH),
IcebergDestinationCleaner(
IcebergV2TestUtil.getCatalog(
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH)
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH),
IcebergV2TestUtil.getAWSSystemCredentials()
)
),
IcebergV2TestUtil.getAWSSystemCredentialsAsMap()
) {
@Test
override fun testBasicWrite() {
super.testBasicWrite()
}

@Test
@Disabled("dest iceberge-v2 doesn't support unknown types")
@Disabled("dest iceberg-v2 doesn't support unknown types")
override fun testUnknownTypes() {}
}
}

@Disabled(
"This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally"
Expand Down

0 comments on commit 9b3d406

Please sign in to comment.