diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt index b257fc3564f0..4412d1918dbf 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt @@ -80,11 +80,13 @@ interface IcebergCatalogSpecifications { GlueCatalogConfiguration( (catalogType as GlueCatalogSpecification).glueId, (catalogType as GlueCatalogSpecification).toAWSArnRoleConfiguration(), + (catalogType as GlueCatalogSpecification).databaseName, ) is NessieCatalogSpecification -> NessieCatalogConfiguration( (catalogType as NessieCatalogSpecification).serverUri, - (catalogType as NessieCatalogSpecification).accessToken + (catalogType as NessieCatalogSpecification).accessToken, + (catalogType as NessieCatalogSpecification).namespace, ) } @@ -163,7 +165,23 @@ class NessieCatalogSpecification( "order":2 }""", ) - val accessToken: String? + val accessToken: String?, + + /** + * The namespace to be used when building the Table identifier + * + * This namespace will only be used if the stream namespace is null, meaning when the + * `Destination Namespace` setting for the connection is set to `Destination-defined` or + * `Source-defined` + */ + @get:JsonSchemaTitle("Namespace") + @get:JsonPropertyDescription( + """The Nessie namespace to be used in the Table identifier. + |This will ONLY be used if the `Destination Namespace` setting for the connection is set to + | `Destination-defined` or `Source-defined`""" + ) + @get:JsonProperty("namespace") + val namespace: String? ) : CatalogType(catalogType) /** @@ -193,6 +211,20 @@ class GlueCatalogSpecification( @JsonSchemaInject(json = """{"order":1}""") val glueId: String, override val roleArn: String? = null, + + /** + * The name of the database to be used when building the Table identifier + * + * This database name will only be used if the stream namespace is null, meaning when the + * `Destination Namespace` setting for the connection is set to `Destination-defined` or + * `Source-defined` + */ + @get:JsonSchemaTitle("Database Name") + @get:JsonPropertyDescription( + """The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`""" + ) + @get:JsonProperty("database_name") + val databaseName: String? ) : CatalogType(catalogType), AWSArnRoleSpecification /** @@ -239,6 +271,11 @@ data class GlueCatalogConfiguration( @JsonPropertyDescription("The AWS Account ID associated with the Glue service.") val glueId: String, override val awsArnRoleConfiguration: AWSArnRoleConfiguration, + @get:JsonSchemaTitle("Database Name") + @get:JsonPropertyDescription( + """The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`""" + ) + val databaseName: String? ) : CatalogConfiguration, AWSArnRoleConfigurationProvider /** @@ -254,7 +291,14 @@ data class NessieCatalogConfiguration( val serverUri: String, @JsonSchemaTitle("Nessie Access Token") @JsonPropertyDescription("An optional token for authentication with the Nessie server.") - val accessToken: String? + val accessToken: String?, + @get:JsonSchemaTitle("Namespace") + @get:JsonPropertyDescription( + """The Nessie namespace to be used in the Table identifier. + |This will ONLY be used if the `Destination Namespace` setting for the connection is set to + | `Destination-defined` or `Source-defined`""" + ) + val namespace: String? ) : CatalogConfiguration /** diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index 2c864876d03f..0f664afdb0ae 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -26,7 +26,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.2.19 + dockerImageTag: 0.2.20 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt index c0cbabbd855a..cffce9727291 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSpecification.kt @@ -66,7 +66,7 @@ class S3DataLakeSpecification : override val mainBranchName: String = "" @get:JsonSchemaInject(json = """{"always_show": true,"order":7}""") - override val catalogType: CatalogType = GlueCatalogSpecification(glueId = "") + override val catalogType: CatalogType = GlueCatalogSpecification(glueId = "", databaseName = "") } @Singleton diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt index d83db691315e..4bafebfb25e3 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration +import io.airbyte.cdk.load.command.iceberg.parquet.NessieCatalogConfiguration import io.micronaut.context.annotation.Factory import javax.inject.Singleton import org.apache.iceberg.catalog.Namespace @@ -20,24 +21,39 @@ interface TableIdGenerator { fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier } -class SimpleTableIdGenerator : TableIdGenerator { - override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = - tableIdOf(stream.namespace!!, stream.name) +class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier { + val namespace = stream.namespace ?: configNamespace + return tableIdOf(namespace!!, stream.name) + } } /** AWS Glue requires lowercase database+table names. */ -class GlueTableIdGenerator : TableIdGenerator { - override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = - tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) +class GlueTableIdGenerator(private val databaseName: String?) : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier { + val namespace = (stream.namespace ?: databaseName)?.lowercase() + + return tableIdOf(namespace!!, stream.name.lowercase()) + } } @Factory -class TableIdGeneratorFactory(private val icebergConfiguration: S3DataLakeConfiguration) { +class TableIdGeneratorFactory(private val s3DataLakeConfiguration: S3DataLakeConfiguration) { @Singleton fun create() = - when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) { - is GlueCatalogConfiguration -> GlueTableIdGenerator() - else -> SimpleTableIdGenerator() + when (s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration) { + is GlueCatalogConfiguration -> + GlueTableIdGenerator( + (s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration + as GlueCatalogConfiguration) + .databaseName + ) + is NessieCatalogConfiguration -> + SimpleTableIdGenerator( + (s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration + as NessieCatalogConfiguration) + .namespace + ) } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json index f4a4b5dbb739..5542031cf4d9 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json @@ -86,6 +86,11 @@ "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ], "airbyte_secret" : true, "order" : 2 + }, + "namespace" : { + "type" : "string", + "description" : "The Nessie namespace to be used in the Table identifier. \n |This will ONLY be used if the `Destination Namespace` setting for the connection is set to\n | `Destination-defined` or `Source-defined`", + "title" : "Namespace" } }, "required" : [ "catalog_type", "server_uri" ] @@ -110,6 +115,11 @@ "type" : "string", "description" : "The Role ARN.", "title" : "Role ARN" + }, + "database_name" : { + "type" : "string", + "description" : "The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`", + "title" : "Database Name" } }, "required" : [ "catalog_type", "glue_id" ] diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json index f4a4b5dbb739..5542031cf4d9 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json @@ -86,6 +86,11 @@ "examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ], "airbyte_secret" : true, "order" : 2 + }, + "namespace" : { + "type" : "string", + "description" : "The Nessie namespace to be used in the Table identifier. \n |This will ONLY be used if the `Destination Namespace` setting for the connection is set to\n | `Destination-defined` or `Source-defined`", + "title" : "Namespace" } }, "required" : [ "catalog_type", "server_uri" ] @@ -110,6 +115,11 @@ "type" : "string", "description" : "The Role ARN.", "title" : "Role ARN" + }, + "database_name" : { + "type" : "string", + "description" : "The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`", + "title" : "Database Name" } }, "required" : [ "catalog_type", "glue_id" ] diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt index 31bfa0f06b2e..827705d0c6e9 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriterTest.kt @@ -113,7 +113,7 @@ internal class S3DataLakeWriterTest { every { mainBranchName } returns "main" every { warehouseLocation } returns "s3://bucket/" every { catalogConfiguration } returns - NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") + NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "") } val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration @@ -183,7 +183,7 @@ internal class S3DataLakeWriterTest { every { mainBranchName } returns "main" every { warehouseLocation } returns "s3://bucket/" every { catalogConfiguration } returns - NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") + NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "") } val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration @@ -320,7 +320,7 @@ internal class S3DataLakeWriterTest { every { mainBranchName } returns "main" every { warehouseLocation } returns "s3://bucket/" every { catalogConfiguration } returns - NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token") + NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "") } val icebergConfiguration: S3DataLakeConfiguration = mockk { every { awsAccessKeyConfiguration } returns awsConfiguration diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt index ce12721a7419..e87f00ed0684 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt @@ -334,6 +334,7 @@ internal class S3DataLakeUtilTest { val warehouseLocation = "s3://test/" val s3BucketName = "test" val s3Endpoint = "http://localhost:9000" + val databaseName = "" val awsAccessKeyConfiguration = AWSAccessKeyConfiguration( accessKeyId = awsAccessKey, @@ -349,7 +350,7 @@ internal class S3DataLakeUtilTest { IcebergCatalogConfiguration( warehouseLocation, "main", - NessieCatalogConfiguration(nessieServerUri, nessieAccessToken) + NessieCatalogConfiguration(nessieServerUri, nessieAccessToken, databaseName), ) val configuration = S3DataLakeConfiguration( diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index 1e42d26308c7..7e4da34b3f1c 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -17,6 +17,7 @@ for more information. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------| +| 0.2.20 | 2025-01-32 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) | | 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options | | 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. | | 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |