Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add support for default namespace (database name) #52068

Merged
merged 14 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ interface IcebergCatalogSpecifications {
@get:JsonProperty("main_branch_name", defaultValue = "main")
val mainBranchName: String

/**
* The name of the database to be used when building the Table identifier
*
* This database name will only be used if the stream namespace is null, meaning when the
* `Destination Namespace` setting for the connection is set to `Destination-defined` or
* `Source-defined`
*/
@get:JsonSchemaTitle("Database Name")
@get:JsonPropertyDescription(
"""The database name. This will ONLY be used if the `Destination Namespace` setting for the connection is set to `Destination-defined` or `Source-defined`"""
)
@get:JsonProperty("database_name")
val databaseName: String?
frifriSF59 marked this conversation as resolved.
Show resolved Hide resolved

/**
* The catalog type.
*
Expand Down Expand Up @@ -88,7 +102,12 @@ interface IcebergCatalogSpecifications {
)
}

return IcebergCatalogConfiguration(warehouseLocation, mainBranchName, catalogConfiguration)
return IcebergCatalogConfiguration(
warehouseLocation,
mainBranchName,
databaseName,
catalogConfiguration
)
}
}

Expand Down Expand Up @@ -216,6 +235,7 @@ data class IcebergCatalogConfiguration(
@JsonPropertyDescription(
"The specific configuration details of the chosen Iceberg catalog type."
)
val databaseName: String? = null,
val catalogConfiguration: CatalogConfiguration
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.19
dockerImageTag: 0.2.20
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,33 @@ interface TableIdGenerator {
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
}

class SimpleTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!, stream.name)
class SimpleTableIdGenerator(private val s3DataLakeConfiguration: S3DataLakeConfiguration) :
frifriSF59 marked this conversation as resolved.
Show resolved Hide resolved
TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
val namespace =
stream.namespace ?: s3DataLakeConfiguration.icebergCatalogConfiguration.databaseName
return tableIdOf(namespace!!, stream.name)
}
}

/** AWS Glue requires lowercase database+table names. */
class GlueTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase())
class GlueTableIdGenerator(private val s3DataLakeConfiguration: S3DataLakeConfiguration) :
TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
val fallbackNamespace = s3DataLakeConfiguration.icebergCatalogConfiguration.databaseName
val namespace = (stream.namespace ?: fallbackNamespace)?.lowercase()

return tableIdOf(namespace!!, stream.name.lowercase())
}
}

@Factory
class TableIdGeneratorFactory(private val icebergConfiguration: S3DataLakeConfiguration) {
class TableIdGeneratorFactory(private val s3DataLakeConfiguration: S3DataLakeConfiguration) {
@Singleton
fun create() =
when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration -> GlueTableIdGenerator()
else -> SimpleTableIdGenerator()
when (s3DataLakeConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration -> GlueTableIdGenerator(s3DataLakeConfiguration)
else -> SimpleTableIdGenerator(s3DataLakeConfiguration)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ internal class S3DataLakeUtilTest {
val warehouseLocation = "s3://test/"
val s3BucketName = "test"
val s3Endpoint = "http://localhost:9000"
val databasename = ""
val awsAccessKeyConfiguration =
AWSAccessKeyConfiguration(
accessKeyId = awsAccessKey,
Expand All @@ -349,6 +350,7 @@ internal class S3DataLakeUtilTest {
IcebergCatalogConfiguration(
warehouseLocation,
"main",
databasename,
NessieCatalogConfiguration(nessieServerUri, nessieAccessToken)
)
val configuration =
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.2.20 | 2025-01-32 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
Expand Down
Loading