Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #50434

Draft
wants to merge 42 commits into
base: move/destination-mssql-v2
Choose a base branch
from
Draft

wip #50434

Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7603a7a
feat: implement converters for airbyte type/value to SQL
jdpgrailsdev Dec 18, 2024
4bec6b3
Revert "feat: implement converters for airbyte type/value to SQL"
jdpgrailsdev Dec 18, 2024
7b0ad97
feat: implement converters for airbyte type/value to SQL (#49924)
jdpgrailsdev Dec 19, 2024
9ff3a97
fix: add missing import (#49945)
jdpgrailsdev Dec 19, 2024
0ac5be2
Remove duplicate class
jdpgrailsdev Dec 19, 2024
3cc03b7
chore: destination MSSQL v2 add spec (#49947)
gosusnp Dec 19, 2024
a4d8f18
Test data source configuration (#49951)
jdpgrailsdev Dec 19, 2024
bec38f2
chore: add ssl method to dest mssql v2 spec (#49956)
gosusnp Dec 20, 2024
65c659d
chore: destination mssqlv2 implement configuration and datasource (#4…
gosusnp Dec 20, 2024
7cf6ff8
chore: mssqlv2 implement check (#49965)
gosusnp Dec 26, 2024
18e4c57
add cehck integration tests
gosusnp Dec 26, 2024
72e57b9
add url building tests
gosusnp Dec 26, 2024
ff78ed0
add spec tests
gosusnp Dec 26, 2024
315fb6e
wip
gosusnp Dec 28, 2024
e0cdf1c
chore(destination-mssql-v2): add datasource url tests (#50426)
gosusnp Dec 30, 2024
1ff0ebc
Merge branch 'move/destination-mssql-v2' into gosusnp/msssql-v2-basic…
jdpgrailsdev Dec 30, 2024
140aeda
refactor: use Kotlin extension function
jdpgrailsdev Dec 30, 2024
ed0b3b0
Merge branch 'move/destination-mssql-v2' into gosusnp/msssql-v2-basic…
jdpgrailsdev Dec 30, 2024
b6091bb
Formatting
jdpgrailsdev Dec 30, 2024
ab2bd5e
feat: implement converters for airbyte type/value to SQL
jdpgrailsdev Dec 18, 2024
3ccac99
Revert "feat: implement converters for airbyte type/value to SQL"
jdpgrailsdev Dec 18, 2024
536cbbf
feat: implement converters for airbyte type/value to SQL (#49924)
jdpgrailsdev Dec 19, 2024
0724cdf
fix: add missing import (#49945)
jdpgrailsdev Dec 19, 2024
f0320df
Remove duplicate class
jdpgrailsdev Dec 19, 2024
ac8e3c4
chore: destination MSSQL v2 add spec (#49947)
gosusnp Dec 19, 2024
7e5d341
Test data source configuration (#49951)
jdpgrailsdev Dec 19, 2024
d3df1eb
chore: add ssl method to dest mssql v2 spec (#49956)
gosusnp Dec 20, 2024
c7b812e
chore: destination mssqlv2 implement configuration and datasource (#4…
gosusnp Dec 20, 2024
204d8f9
chore: mssqlv2 implement check (#49965)
gosusnp Dec 26, 2024
b56fa4e
chore(destination-mssql-v2): add datasource url tests (#50426)
gosusnp Dec 30, 2024
57bcf83
refactor: use Kotlin extension function
jdpgrailsdev Dec 30, 2024
fcc5bac
Merge branch 'move/destination-mssql-v2' into gosusnp/msssql-v2-basic…
gosusnp Jan 2, 2025
27741af
interface change
gosusnp Jan 2, 2025
7413d4a
fix time related handling
gosusnp Jan 2, 2025
fa67a49
fix basic types test
gosusnp Jan 2, 2025
57e82f6
extract result set to airbyte value code from query builder
gosusnp Jan 2, 2025
960af08
extract statement building details
gosusnp Jan 3, 2025
7b96bd7
handle airbyte extracted at as long for consistency
gosusnp Jan 3, 2025
704da0b
preserve meta changes from upstream
gosusnp Jan 3, 2025
9a062bf
fix container types support
gosusnp Jan 3, 2025
d338d5f
naive union handling
gosusnp Jan 3, 2025
f4b368e
Merge branch 'move/destination-mssql-v2' into gosusnp/msssql-v2-basic…
gosusnp Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ application {
//applicationDefaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED")
}

val junitVersion = "5.11.3"
val junitVersion = "5.11.4"

configurations.configureEach {
// Exclude additional SLF4J providers from all classpaths
Expand All @@ -39,7 +39,8 @@ dependencies {
implementation("io.github.oshai:kotlin-logging-jvm:7.0.0")
implementation("jakarta.inject:jakarta.inject-api:2.0.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.8.6")
implementation("io.micronaut:micronaut-inject:4.6.1")
implementation("io.micronaut:micronaut-inject:4.7.9")
implementation("com.zaxxer:HikariCP:6.2.1")

testImplementation("io.mockk:mockk:1.13.13")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
sql-server:
image: mcr.microsoft.com/mssql/server:2022-latest
ports:
- "1433:1433"
environment:
- ACCEPT_EULA=Y
- MSSQL_SA_PASSWORD=Averycomplicatedpassword1!
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.airbyte.integrations.destination.mssql.v2

import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLDataSourceFactory
import jakarta.inject.Singleton
import java.util.UUID
import javax.sql.DataSource

@Singleton
class MSSQLChecker(private val dataSourceFactory: MSSQLDataSourceFactory) :
DestinationChecker<MSSQLConfiguration> {
override fun check(config: MSSQLConfiguration) {
val dataSource: DataSource = dataSourceFactory.getDataSource(config)
val testTableName = "check_test_${UUID.randomUUID()}"
val fullyQualifiedTableName = "[${config.rawDataSchema}].[${testTableName}]"
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
statement.executeUpdate(
"""
CREATE TABLE ${fullyQualifiedTableName} (test int);
DROP TABLE ${fullyQualifiedTableName};
""".trimIndent(),
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
package io.airbyte.integrations.destination.mssql.v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.util.TimeStringUtility.toLocalDate
import io.airbyte.cdk.load.util.TimeStringUtility.toLocalDateTime
import io.airbyte.cdk.load.util.TimeStringUtility.toOffset
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToSqlType
import io.airbyte.integrations.destination.mssql.v2.convert.SqlTypeToMssqlType
import io.airbyte.protocol.models.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.Jsons
import java.sql.Date
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.Time
import java.sql.Timestamp
import java.sql.Types
import java.time.Instant
import java.time.format.DateTimeParseException
import java.util.UUID

class MSSQLQueryBuilder(
config: MSSQLConfiguration,
private val stream: DestinationStream,
) {

companion object {
val AIRBYTE_RAW_ID = "_airbyte_raw_id"
val AIRBYTE_EXTRACTED_AT = "_airbyte_extracted_at"
val AIRBYTE_META = "_airbyte_meta"
val AIRBYTE_GENERATION_ID = "_airbyte_generation_id"

val airbyteFinalTableFields =
listOf(
NamedField(AIRBYTE_RAW_ID, FieldType(StringType, false)),
NamedField(AIRBYTE_EXTRACTED_AT, FieldType(TimestampTypeWithoutTimezone, false)),
NamedField(AIRBYTE_META, FieldType(ObjectTypeWithoutSchema, false)),
NamedField(AIRBYTE_GENERATION_ID, FieldType(IntegerType, false)),
)

val airbyteFields = airbyteFinalTableFields.map { it.name }.toSet()

inline fun <reified T> deserialize(value: String): T =
Jsons.deserialize(value, T::class.java)
}

data class NamedField(val name: String, val type: FieldType)
data class NamedValue(val name: String, val value: AirbyteValue)

private val internalSchema: String = config.rawDataSchema
private val outputSchema: String = stream.descriptor.namespace ?: config.schema
private val tableName: String = stream.descriptor.name
private val fqTableName = "$outputSchema.$tableName"

val finalTableSchema: List<NamedField> =
airbyteFinalTableFields + extractFinalTableSchema(stream.schema)

fun createFinalTableIfNotExists(): String =
createTableIfNotExists(fqTableName, finalTableSchema)

fun createFinalSchemaIfNotExists(): String = createSchemaIfNotExists(outputSchema)

fun getFinalTableInsertColumnHeader(): String =
getFinalTableInsertColumnHeader(fqTableName, finalTableSchema)

fun populateStatement(
statement: PreparedStatement,
record: DestinationRecord,
schema: List<NamedField>
) {
val toSqlType = AirbyteTypeToSqlType()
val recordObject = record.data as ObjectValue

var airbyteMetaStatementIndex: Int? = null
val airbyteMeta =
AirbyteRecordMessageMeta().apply {
changes = mutableListOf()
setAdditionalProperty("syncId", stream.syncId)
}
schema.forEachIndexed { index, field ->
val stmntIdx = index + 1
val value = recordObject.values[field.name]
val sqlType = toSqlType.convert(field.type.type)

if (value == null) {
if (field.name in airbyteFields) {
when (field.name) {
AIRBYTE_RAW_ID ->
statement.setString(stmntIdx, UUID.randomUUID().toString())
AIRBYTE_EXTRACTED_AT ->
statement.setTimestamp(
stmntIdx,
Timestamp.from(Instant.ofEpochMilli(record.emittedAtMs))
)
AIRBYTE_GENERATION_ID -> statement.setLong(stmntIdx, stream.generationId)
AIRBYTE_META -> airbyteMetaStatementIndex = stmntIdx
}
} else {
statement.setNull(stmntIdx, sqlType)
}
} else {
try {
when (value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make this cleaner to encapsulate this logic in an extension function on the value -- something like value.setStatementValue(statement=statement). The extension function would encapsulate this when block, making this code easier to test separate from the type conversion expectations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

is ObjectValue ->
statement.setString(stmntIdx, Jsons.serialize(value.values))
is ArrayValue ->
statement.setString(stmntIdx, Jsons.serialize(value.values))
is BooleanValue -> statement.setBoolean(stmntIdx, value.value)
is DateValue ->
statement.setDate(stmntIdx, Date.valueOf(toLocalDate((value.value))))
is IntegerValue -> statement.setLong(stmntIdx, value.value.toLong())
NullValue -> statement.setNull(stmntIdx, sqlType)
is NumberValue -> statement.setDouble(stmntIdx, value.value.toDouble())
is StringValue ->
if (sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR) {
statement.setString(stmntIdx, value.value)
} else {
throw IllegalArgumentException()
}
is TimeValue ->
statement.setTime(stmntIdx, Time.valueOf(toOffset(value.value)))
is TimestampValue ->
statement.setTimestamp(
stmntIdx,
Timestamp.valueOf(toLocalDateTime(value.value))
)
is UnknownValue ->
statement.setString(stmntIdx, Jsons.serialize(value.value))
}
} catch (e: DateTimeParseException) {
statement.setNull(stmntIdx, sqlType)
airbyteMeta.changes.add(
AirbyteRecordMessageMetaChange()
.withField(field.name)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_SERIALIZATION_ERROR
)
)
} catch (e: IllegalArgumentException) {
statement.setNull(stmntIdx, sqlType)
airbyteMeta.changes.add(
AirbyteRecordMessageMetaChange()
.withField(field.name)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_SERIALIZATION_ERROR
)
)
}
// when (sqlType) {
// Types.LONGVARCHAR -> statement.setString(statementIndex,
// Jsons.serialize(value))
// Types.BOOLEAN -> statement.setBoolean(statementIndex, value as
// Boolean)
// Types.DATE -> statement.setDate(statementIndex, value as Date)
// Types.BIGINT ->
// statement.setLong(statementIndex, (value as
// BigInteger).toLong())
// Types.DECIMAL ->
// statement.setDouble(statementIndex, (value as
// BigDecimal).toDouble())
// Types.VARCHAR -> statement.setString(statementIndex, value as
// String)
// Types.TIMESTAMP_WITH_TIMEZONE ->
// statement.setTimestamp(statementIndex, value as Timestamp)
// Types.TIME -> statement.setTime(statementIndex, value as Time)
// Types.TIMESTAMP -> statement.setTimestamp(statementIndex,
// value as Timestamp)
// }
}
}
airbyteMetaStatementIndex?.let { statementIndex ->
statement.setString(statementIndex, Jsons.serialize(airbyteMeta))
}
}

fun readResult(rs: ResultSet, schema: List<NamedField>): ObjectValue {
val valueMap =
schema
.filter { field -> field.name !in airbyteFields }
.map { field ->
try {
when (field.type.type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above...this feels like another good use of an extension function on the type to encapsulate this conversion logic and make it easier to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

is StringType ->
NamedValue(field.name, StringValue(rs.getString(field.name)))
is ArrayType -> TODO()
ArrayTypeWithoutSchema -> TODO()
BooleanType ->
NamedValue(field.name, BooleanValue(rs.getBoolean(field.name)))
DateType ->
NamedValue(field.name, DateValue(rs.getDate(field.name).toString()))
IntegerType ->
NamedValue(field.name, IntegerValue(rs.getLong(field.name)))
NumberType ->
NamedValue(
field.name,
NumberValue(rs.getDouble(field.name).toBigDecimal())
)
is ObjectType ->
NamedValue(
field.name,
ObjectValue.from(
deserialize<Map<String, Any?>>(rs.getString(field.name))
)
)
ObjectTypeWithEmptySchema ->
NamedValue(
field.name,
ObjectValue.from(
deserialize<Map<String, Any?>>(rs.getString(field.name))
)
)
ObjectTypeWithoutSchema ->
NamedValue(
field.name,
ObjectValue.from(
deserialize<Map<String, Any?>>(rs.getString(field.name))
)
)
TimeTypeWithTimezone ->
NamedValue(field.name, TimeValue(rs.getString(field.name)))
TimeTypeWithoutTimezone ->
NamedValue(field.name, TimeValue(rs.getString(field.name)))
TimestampTypeWithTimezone ->
NamedValue(field.name, TimestampValue(rs.getString(field.name)))
TimestampTypeWithoutTimezone ->
NamedValue(field.name, TimestampValue(rs.getString(field.name)))
is UnionType -> TODO()
is UnknownType -> TODO()
}
} catch (e: NullPointerException) {
null
}
}
.filterNotNull()
.associate { it.name to it.value }
return ObjectValue.from(valueMap)
}

fun selectAllRecords(): String = "SELECT * FROM $fqTableName"

private fun createSchemaIfNotExists(schema: String): String =
"""
IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = '$schema')
BEGIN
EXEC ('CREATE SCHEMA $schema');
END
""".trimIndent()

private fun createTableIfNotExists(fqTableName: String, schema: List<NamedField>): String =
"""
IF OBJECT_ID('$fqTableName') IS NULL
BEGIN
CREATE TABLE $fqTableName
(
${airbyteTypeToSqlSchema(schema, separator = ",\n ")}
);
END
""".trimIndent()

private fun getFinalTableInsertColumnHeader(
fqTableName: String,
schema: List<NamedField>
): String {
return StringBuilder()
.apply {
append("INSERT INTO $fqTableName(")
append(schema.map { it.name }.joinToString(", "))
append(") VALUES (")
append(schema.map { "?" }.joinToString(", "))
append(")")
}
.toString()
}

private fun extractFinalTableSchema(schema: AirbyteType): List<NamedField> =
when (schema) {
is ObjectType -> {
(stream.schema as ObjectType)
.properties
.map { NamedField(name = it.key, type = it.value) }
.toList()
}
else -> TODO("most likely fail hard")
}

private fun airbyteTypeToSqlSchema(schema: List<NamedField>, separator: String): String {
val toSqlType = AirbyteTypeToSqlType()
val toMssqlType = SqlTypeToMssqlType()
return schema
.map { "${it.name} ${toMssqlType.convert(toSqlType.convert(it.type.type)).sqlString}" }
.joinToString(separator = separator)
}
}
Loading
Loading