Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jan 21, 2025
1 parent efb93a6 commit b74e040
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,19 @@ import java.util.UUID

fun <T> String.executeQuery(connection: Connection, vararg args: String, f: (ResultSet) -> T): T {
connection.prepareStatement(this.trimIndent()).use { statement ->
args.forEachIndexed { index, arg ->
statement.setString(index + 1, arg)
}
args.forEachIndexed { index, arg -> statement.setString(index + 1, arg) }
return statement.executeQuery().use(f)
}
}

fun String.executeUpdate(connection: Connection, vararg args: String) {
connection.prepareStatement(this.trimIndent()).use { statement ->
args.forEachIndexed { index, arg ->
statement.setString(index + 1, arg)
}
args.forEachIndexed { index, arg -> statement.setString(index + 1, arg) }
statement.executeUpdate()
}
}

fun String.toQuery(vararg args: String): String =
this.trimIndent().replace("?", "%s").format(*args)
fun String.toQuery(vararg args: String): String = this.trimIndent().replace("?", "%s").format(*args)

const val GET_EXISTING_SCHEMA_QUERY =
"""
Expand All @@ -71,18 +66,15 @@ const val CREATE_SCHEMA_QUERY =
END
"""

const val ALTER_TABLE_ADD =
"""
const val ALTER_TABLE_ADD = """
ALTER TABLE ?
ADD ? ? NULL;
"""
const val ALTER_TABLE_DROP =
"""
const val ALTER_TABLE_DROP = """
ALTER TABLE ?
DROP COLUMN ?;
"""
const val ALTER_TABLE_MODIFY =
"""
const val ALTER_TABLE_MODIFY = """
ALTER TABLE ?
ALTER COLUMN ? ? NULL;
"""
Expand All @@ -93,14 +85,12 @@ const val DELETE_WHERE_COL_IS_NOT_NULL =
WHERE ? is not NULL
"""

const val DELETE_WHERE_COL_LESS_THAN =
"""
const val DELETE_WHERE_COL_LESS_THAN = """
DELETE FROM ?
WHERE ? < ?
"""

const val SELECT_FROM =
"""
const val SELECT_FROM = """
SELECT *
FROM ?
"""
Expand Down Expand Up @@ -201,19 +191,22 @@ class MSSQLQueryBuilder(
val toAlter =
expectedFields.filter { it.key in existingFields && it.value != existingFields[it.key] }

val query = StringBuilder()
.apply {
toDelete.entries.forEach {
appendLine(ALTER_TABLE_DROP.toQuery(fqTableName, it.key))
}
toAdd.entries.forEach {
appendLine(ALTER_TABLE_ADD.toQuery(fqTableName, it.key, it.value.sqlString))
}
toAlter.entries.forEach {
appendLine(ALTER_TABLE_MODIFY.toQuery(fqTableName, it.key, it.value.sqlString))
val query =
StringBuilder()
.apply {
toDelete.entries.forEach {
appendLine(ALTER_TABLE_DROP.toQuery(fqTableName, it.key))
}
toAdd.entries.forEach {
appendLine(ALTER_TABLE_ADD.toQuery(fqTableName, it.key, it.value.sqlString))
}
toAlter.entries.forEach {
appendLine(
ALTER_TABLE_MODIFY.toQuery(fqTableName, it.key, it.value.sqlString)
)
}
}
}
.toString()
.toString()

query.executeUpdate(connection)
}
Expand All @@ -230,13 +223,15 @@ class MSSQLQueryBuilder(
getFinalTableInsertColumnHeader(fqTableName, finalTableSchema)

fun deleteCdc(connection: Connection) =
DELETE_WHERE_COL_IS_NOT_NULL
.toQuery(fqTableName, AIRBYTE_CDC_DELETED_AT)
DELETE_WHERE_COL_IS_NOT_NULL.toQuery(fqTableName, AIRBYTE_CDC_DELETED_AT)
.executeUpdate(connection)

fun deletePreviousGenerations(connection: Connection, minGenerationId: Long) =
DELETE_WHERE_COL_LESS_THAN
.toQuery(fqTableName, AIRBYTE_GENERATION_ID, minGenerationId.toString())
DELETE_WHERE_COL_LESS_THAN.toQuery(
fqTableName,
AIRBYTE_GENERATION_ID,
minGenerationId.toString()
)
.executeUpdate(connection)

fun populateStatement(
Expand Down Expand Up @@ -307,7 +302,10 @@ class MSSQLQueryBuilder(
}

private fun createTableIfNotExists(fqTableName: String, schema: List<NamedField>): String {
val index = if (uniquenessKey.isNotEmpty()) createIndex(fqTableName, uniquenessKey, clustered = false) else ""
val index =
if (uniquenessKey.isNotEmpty())
createIndex(fqTableName, uniquenessKey, clustered = false)
else ""
val cdcIndex = if (hasCdc) createIndex(fqTableName, listOf(AIRBYTE_CDC_DELETED_AT)) else ""

return """
Expand All @@ -323,7 +321,11 @@ class MSSQLQueryBuilder(
""".trimIndent()
}

private fun createIndex(fqTableName: String, columns: List<String>, clustered: Boolean = false): String {
private fun createIndex(
fqTableName: String,
columns: List<String>,
clustered: Boolean = false
): String {
val name = "${fqTableName.replace('.', '_')}_${columns.hashCode()}"
val indexType = if (clustered) "CLUSTERED" else ""
return "CREATE $indexType INDEX $name ON $fqTableName (${columns.joinToString(", ")})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class MSSQLDataDumper : DestinationDataDumper {
val dataSource = DataSourceFactory().dataSource(config)
val output = mutableListOf<OutputRecord>()
dataSource.connection.use { connection ->
SELECT_FROM
.toQuery(sqlBuilder.fqTableName)
.executeQuery(connection) { rs ->
SELECT_FROM.toQuery(sqlBuilder.fqTableName).executeQuery(connection) { rs ->
while (rs.next()) {
val objectValue = sqlBuilder.readResult(rs, sqlBuilder.finalTableSchema)
val record =
Expand Down

0 comments on commit b74e040

Please sign in to comment.