Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add support for default namespace (database name) #52068

Merged
merged 14 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ interface IcebergCatalogSpecifications {
GlueCatalogConfiguration(
(catalogType as GlueCatalogSpecification).glueId,
(catalogType as GlueCatalogSpecification).toAWSArnRoleConfiguration(),
(catalogType as GlueCatalogSpecification).databaseName,
)
is NessieCatalogSpecification ->
NessieCatalogConfiguration(
(catalogType as NessieCatalogSpecification).serverUri,
(catalogType as NessieCatalogSpecification).accessToken
(catalogType as NessieCatalogSpecification).accessToken,
(catalogType as NessieCatalogSpecification).namespace,
)
}

Expand Down Expand Up @@ -163,7 +165,23 @@ class NessieCatalogSpecification(
"order":2
}""",
)
val accessToken: String?
val accessToken: String?,

/**
* The namespace to be used when building the Table identifier
*
* This namespace will only be used if the stream namespace is null, meaning when the
* `Destination Namespace` setting for the connection is set to `Destination-defined` or
* `Source-defined`
*/
@get:JsonSchemaTitle("Namespace")
@get:JsonPropertyDescription(
"""The Nessie namespace to be used in the Table identifier.
|This will ONLY be used if the `Destination Namespace` setting for the connection is set to
| `Destination-defined` or `Source-defined`"""
)
@get:JsonProperty("namespace")
val namespace: String?
) : CatalogType(catalogType)

/**
Expand Down Expand Up @@ -193,6 +211,20 @@ class GlueCatalogSpecification(
@JsonSchemaInject(json = """{"order":1}""")
val glueId: String,
override val roleArn: String? = null,

/**
* The name of the database to be used when building the Table identifier
*
* This database name will only be used if the stream namespace is null, meaning when the
* `Destination Namespace` setting for the connection is set to `Destination-defined` or
* `Source-defined`
*/
@get:JsonSchemaTitle("Database Name")
@get:JsonPropertyDescription(
"""The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`"""
)
@get:JsonProperty("database_name")
val databaseName: String?
) : CatalogType(catalogType), AWSArnRoleSpecification

/**
Expand Down Expand Up @@ -239,6 +271,11 @@ data class GlueCatalogConfiguration(
@JsonPropertyDescription("The AWS Account ID associated with the Glue service.")
val glueId: String,
override val awsArnRoleConfiguration: AWSArnRoleConfiguration,
@get:JsonSchemaTitle("Database Name")
@get:JsonPropertyDescription(
"""The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`"""
)
val databaseName: String?
) : CatalogConfiguration, AWSArnRoleConfigurationProvider

/**
Expand All @@ -254,7 +291,14 @@ data class NessieCatalogConfiguration(
val serverUri: String,
@JsonSchemaTitle("Nessie Access Token")
@JsonPropertyDescription("An optional token for authentication with the Nessie server.")
val accessToken: String?
val accessToken: String?,
@get:JsonSchemaTitle("Namespace")
@get:JsonPropertyDescription(
"""The Nessie namespace to be used in the Table identifier.
|This will ONLY be used if the `Destination Namespace` setting for the connection is set to
| `Destination-defined` or `Source-defined`"""
)
val namespace: String?
) : CatalogConfiguration

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.19
dockerImageTag: 0.2.20
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class S3DataLakeSpecification :
override val mainBranchName: String = ""

@get:JsonSchemaInject(json = """{"always_show": true,"order":7}""")
override val catalogType: CatalogType = GlueCatalogSpecification(glueId = "")
override val catalogType: CatalogType = GlueCatalogSpecification(glueId = "", databaseName = "")
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3_data_lake

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration
import io.airbyte.cdk.load.command.iceberg.parquet.NessieCatalogConfiguration
import io.micronaut.context.annotation.Factory
import javax.inject.Singleton
import org.apache.iceberg.catalog.Namespace
Expand All @@ -20,24 +21,39 @@ interface TableIdGenerator {
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
}

class SimpleTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!, stream.name)
class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
val namespace = stream.namespace ?: configNamespace
return tableIdOf(namespace!!, stream.name)
}
}

/** AWS Glue requires lowercase database+table names. */
class GlueTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase())
class GlueTableIdGenerator(private val databaseName: String?) : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
val namespace = (stream.namespace ?: databaseName)?.lowercase()

return tableIdOf(namespace!!, stream.name.lowercase())
}
}

@Factory
class TableIdGeneratorFactory(private val icebergConfiguration: S3DataLakeConfiguration) {
class TableIdGeneratorFactory(private val s3DataLakeConfiguration: S3DataLakeConfiguration) {
@Singleton
fun create() =
when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration -> GlueTableIdGenerator()
else -> SimpleTableIdGenerator()
when (s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration ->
GlueTableIdGenerator(
(s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration
as GlueCatalogConfiguration)
.databaseName
)
is NessieCatalogConfiguration ->
SimpleTableIdGenerator(
(s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration
as NessieCatalogConfiguration)
.namespace
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
"airbyte_secret" : true,
"order" : 2
},
"namespace" : {
"type" : "string",
"description" : "The Nessie namespace to be used in the Table identifier. \n |This will ONLY be used if the `Destination Namespace` setting for the connection is set to\n | `Destination-defined` or `Source-defined`",
"title" : "Namespace"
}
},
"required" : [ "catalog_type", "server_uri" ]
Expand All @@ -110,6 +115,11 @@
"type" : "string",
"description" : "The Role ARN.",
"title" : "Role ARN"
},
"database_name" : {
"type" : "string",
"description" : "The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`",
"title" : "Database Name"
}
},
"required" : [ "catalog_type", "glue_id" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
"airbyte_secret" : true,
"order" : 2
},
"namespace" : {
"type" : "string",
"description" : "The Nessie namespace to be used in the Table identifier. \n |This will ONLY be used if the `Destination Namespace` setting for the connection is set to\n | `Destination-defined` or `Source-defined`",
"title" : "Namespace"
}
},
"required" : [ "catalog_type", "server_uri" ]
Expand All @@ -110,6 +115,11 @@
"type" : "string",
"description" : "The Role ARN.",
"title" : "Role ARN"
},
"database_name" : {
"type" : "string",
"description" : "The Glue database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`",
"title" : "Database Name"
}
},
"required" : [ "catalog_type", "glue_id" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ internal class S3DataLakeWriterTest {
every { mainBranchName } returns "main"
every { warehouseLocation } returns "s3://bucket/"
every { catalogConfiguration } returns
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token")
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "")
}
val icebergConfiguration: S3DataLakeConfiguration = mockk {
every { awsAccessKeyConfiguration } returns awsConfiguration
Expand Down Expand Up @@ -183,7 +183,7 @@ internal class S3DataLakeWriterTest {
every { mainBranchName } returns "main"
every { warehouseLocation } returns "s3://bucket/"
every { catalogConfiguration } returns
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token")
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "")
}
val icebergConfiguration: S3DataLakeConfiguration = mockk {
every { awsAccessKeyConfiguration } returns awsConfiguration
Expand Down Expand Up @@ -320,7 +320,7 @@ internal class S3DataLakeWriterTest {
every { mainBranchName } returns "main"
every { warehouseLocation } returns "s3://bucket/"
every { catalogConfiguration } returns
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token")
NessieCatalogConfiguration("http://localhost:8080/api/v1", "access-token", "")
}
val icebergConfiguration: S3DataLakeConfiguration = mockk {
every { awsAccessKeyConfiguration } returns awsConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ internal class S3DataLakeUtilTest {
val warehouseLocation = "s3://test/"
val s3BucketName = "test"
val s3Endpoint = "http://localhost:9000"
val databaseName = ""
val awsAccessKeyConfiguration =
AWSAccessKeyConfiguration(
accessKeyId = awsAccessKey,
Expand All @@ -349,7 +350,7 @@ internal class S3DataLakeUtilTest {
IcebergCatalogConfiguration(
warehouseLocation,
"main",
NessieCatalogConfiguration(nessieServerUri, nessieAccessToken)
NessieCatalogConfiguration(nessieServerUri, nessieAccessToken, databaseName),
)
val configuration =
S3DataLakeConfiguration(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.2.20 | 2025-01-32 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
Expand Down
Loading