diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt index ac7f3a2d6878..751a150c91d0 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt @@ -27,175 +27,17 @@ import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFacto import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import io.mockk.every import io.mockk.mockk -import java.time.Duration -import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.CatalogUtil import org.apache.iceberg.Schema import org.apache.iceberg.Table -import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.catalog.Catalog -import org.apache.iceberg.catalog.Namespace -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.types.Types import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.slf4j.LoggerFactory internal class IcebergV2WriterTest { - class IcebergCatalogFactory( - private val warehousePath: String, - private val region: String, - private val roleArn: String, - private val baseAccessKeyId: String, - private val baseSecretKey: String - ) { - private val logger = LoggerFactory.getLogger(IcebergCatalogFactory::class.java) - - fun createCatalog(catalogName: String): Catalog { - logger.info("Creating catalog with name: $catalogName, region: $region, role: $roleArn") - - // System.setProperty("aws.accessKeyId", baseAccessKeyId) - // System.setProperty("aws.secretAccessKey", baseSecretKey) - - val properties = buildCatalogProperties() - logProperties(properties) - - val hadoopConfig = Configuration() - // hadoopConfig.set("aws.region", region) - - return CatalogUtil.buildIcebergCatalog(catalogName, properties, hadoopConfig) - } - - private fun buildCatalogProperties(): Map { - return mapOf( - "catalog-impl" to "org.apache.iceberg.aws.glue.GlueCatalog", - "warehouse" to warehousePath, - "aws.region" to region, - "io-impl" to "org.apache.iceberg.aws.s3.S3FileIO", - - // Base credentials - "rest.access-key-id" to baseAccessKeyId, - "rest.secret-access-key" to baseSecretKey, - AwsProperties.CLIENT_FACTORY to "org.apache.iceberg.aws.AssumeRoleAwsClientFactory", - AwsProperties.CLIENT_ASSUME_ROLE_ARN to roleArn, - AwsProperties.CLIENT_ASSUME_ROLE_REGION to region, - AwsProperties.CLIENT_ASSUME_ROLE_TIMEOUT_SEC to - Duration.ofHours(1).toSeconds().toString(), - AwsProperties.CLIENT_ASSUME_ROLE_EXTERNAL_ID to "8eee93517501b43a" - ) - } - - private fun logProperties(properties: Map) { - logger.info("Catalog properties:") - properties.forEach { (key, value) -> - val sanitizedValue = - if (key.contains("secret", ignoreCase = true)) "****" else value - logger.info(" $key: $sanitizedValue") - } - } - } - - class CatalogTestHelper(private val catalog: Catalog, private val databaseName: String) { - private val logger = LoggerFactory.getLogger(CatalogTestHelper::class.java) - - fun validateCatalogConnection(): Boolean { - return try { - val namespace = Namespace.of(databaseName) - // List tables in the namespace - logger.info("Listing tables in namespace: $databaseName") - catalog.listTables(namespace) - logger.info("Successfully connected to catalog and listed tables") - true - } catch (e: Exception) { - logger.error("Failed to validate catalog connection", e) - false - } - } - - fun testTableOperations(tableName: String): Boolean { - return try { - val tableId = TableIdentifier.of(databaseName, tableName) - val schema = Schema() // Empty schema for test - - logger.info("Creating test table: $tableName in database: $databaseName") - catalog.createTable(tableId, schema) - - logger.info("Loading test table: $tableName") - val table = catalog.loadTable(tableId) - logger.info("Loaded test table: $table") - - logger.info("Dropping test table: $tableName") - catalog.dropTable(tableId) - - logger.info("Table operations completed successfully") - true - } catch (e: Exception) { - logger.error("Failed during table operations test", e) - false - } - } - } - - // Usage example - @Test - fun mainTest() { - val region = "us-east-2" - // System.setProperty("aws.region", region) - - try { - val catalog = - createAuthenticatedCatalog( - catalogName = "frifri-test", - warehousePath = "s3://airbyte-integration-test-destination-s3/frifri-test", - region = region, - roleArn = - "arn:aws:iam::317283927606:role/s3_acceptance_test_iam_assume_role_role", - baseAccessKeyId = "", - baseSecretKey = "" - ) - - val databaseName = "frifri_test_db" // Specify your database name - val helper = CatalogTestHelper(catalog, databaseName) - - // Test connection - assert(helper.validateCatalogConnection()) { "Failed to validate catalog connection" } - - // Test table operations - assert(helper.testTableOperations("test_table")) { - "Failed to perform table operations" - } - - println("All tests passed successfully!") - } catch (e: Exception) { - println("Test failed with exception: ${e.message}") - e.printStackTrace() - throw e - } - } - - fun createAuthenticatedCatalog( - catalogName: String, - warehousePath: String, - region: String, - roleArn: String, - baseAccessKeyId: String, - baseSecretKey: String - ): Catalog { - val factory = - IcebergCatalogFactory( - warehousePath = warehousePath, - region = region, - roleArn = roleArn, - baseAccessKeyId = baseAccessKeyId, - baseSecretKey = baseSecretKey - ) - - return factory.createCatalog(catalogName) - } - @Test fun testCreateStreamLoader() { val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name")