Skip to content

Commit

Permalink
Merge branch 'master' into tope/adjust/migrate-manifest-only
Browse files Browse the repository at this point in the history
  • Loading branch information
topefolorunso authored Oct 24, 2024
2 parents 03b00bd + cb1bfbe commit 19c0f37
Show file tree
Hide file tree
Showing 156 changed files with 26,776 additions and 1,907 deletions.
5 changes: 5 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
useJUnitPlatform()

jvmArgs = project.test.jvmArgs
systemProperties = project.test.systemProperties
maxParallelForks = project.test.maxParallelForks
maxHeapSize = project.test.maxHeapSize
}
configurations {
integrationTestImplementation.extendsFrom testImplementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class MockBasicFunctionalityIntegrationTest :
MockDestinationDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
NoopNameMapper
NoopNameMapper,
isStreamSchemaRetroactive = false,
) {
@Test
override fun testBasicWrite() {
Expand All @@ -38,4 +39,19 @@ class MockBasicFunctionalityIntegrationTest :
override fun testFunkyStreamAndColumnNames() {
super.testFunkyStreamAndColumnNames()
}

@Test
override fun testTruncateRefresh() {
super.testTruncateRefresh()
}

@Test
override fun testAppend() {
super.testAppend()
}

@Test
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ object MockDestinationBackend {
return getFile(filename)
}

fun deleteOldRecords(filename: String, minGenerationId: Long) {
getFile(filename).removeAll {
it.generationId == null || it.generationId!! < minGenerationId
}
}

private fun getFile(filename: String): MutableList<OutputRecord> {
return files.getOrPut(filename) { mutableListOf() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.mock_integration_test.MockStreamLoader.Companion.getFilename
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
Expand All @@ -31,6 +32,13 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override val state = Batch.State.PERSISTED
}

override suspend fun start() {
MockDestinationBackend.deleteOldRecords(
getFilename(stream.descriptor),
stream.minimumGenerationId
)
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
Expand All @@ -50,7 +58,10 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId),
OutputRecord.Meta(
changes = it.meta?.changes ?: mutableListOf(),
syncId = stream.syncId
),
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

interface AirbyteSchemaMapper {
fun map(schema: AirbyteType): AirbyteType =
when (schema) {
is NullType -> mapNull(schema)
is StringType -> mapString(schema)
is BooleanType -> mapBoolean(schema)
is IntegerType -> mapInteger(schema)
is NumberType -> mapNumber(schema)
is ArrayType -> mapArray(schema)
is ArrayTypeWithoutSchema -> mapArrayWithoutSchema(schema)
is ObjectType -> mapObject(schema)
is ObjectTypeWithoutSchema -> mapObjectWithoutSchema(schema)
is ObjectTypeWithEmptySchema -> mapObjectWithEmptySchema(schema)
is UnionType -> mapUnion(schema)
is DateType -> mapDate(schema)
is TimeTypeWithTimezone -> mapTimeTypeWithTimezone(schema)
is TimeTypeWithoutTimezone -> mapTimeTypeWithoutTimezone(schema)
is TimestampTypeWithTimezone -> mapTimestampTypeWithTimezone(schema)
is TimestampTypeWithoutTimezone -> mapTimestampTypeWithoutTimezone(schema)
is UnknownType -> mapUnknown(schema)
}

fun mapField(field: FieldType): FieldType
fun mapNull(schema: NullType): AirbyteType
fun mapString(schema: StringType): AirbyteType
fun mapBoolean(schema: BooleanType): AirbyteType
fun mapInteger(schema: IntegerType): AirbyteType
fun mapNumber(schema: NumberType): AirbyteType
fun mapArray(schema: ArrayType): AirbyteType
fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType
fun mapObject(schema: ObjectType): AirbyteType
fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType
fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType
fun mapUnion(schema: UnionType): AirbyteType
fun mapDate(schema: DateType): AirbyteType
fun mapTimeTypeWithTimezone(schema: TimeTypeWithTimezone): AirbyteType
fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType
fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType
fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType
fun mapUnknown(schema: UnknownType): AirbyteType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason

open class AirbyteValueIdentityMapper(
open val meta: DestinationRecord.Meta,
) {
private fun collectFailure(
path: List<String>,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
) {
meta.changes.add(DestinationRecord.Change(path.joinToString("."), Change.NULLED, reason))
}

fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String> = emptyList()
): AirbyteValue =
try {
when (schema) {
is ObjectType -> mapObject(value as ObjectValue, schema, path)
is ObjectTypeWithoutSchema ->
mapObjectWithoutSchema(value as ObjectValue, schema, path)
is ObjectTypeWithEmptySchema ->
mapObjectWithEmptySchema(value as ObjectValue, schema, path)
is ArrayType -> mapArray(value as ArrayValue, schema, path)
is ArrayTypeWithoutSchema ->
mapArrayWithoutSchema(value as ArrayValue, schema, path)
is UnionType -> mapUnion(value, schema, path)
is BooleanType -> mapBoolean(value as BooleanValue, path)
is NumberType -> mapNumber(value as NumberValue, path)
is StringType -> mapString(value as StringValue, path)
is IntegerType -> mapInteger(value as IntegerValue, path)
is DateType -> mapDate(value as DateValue, path)
is TimeTypeWithTimezone -> mapTimeWithTimezone(value as TimeValue, path)
is TimeTypeWithoutTimezone -> mapTimeWithoutTimezone(value as TimeValue, path)
is TimestampTypeWithTimezone ->
mapTimestampWithTimezone(value as TimestampValue, path)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
}
} catch (e: Exception) {
collectFailure(path)
mapNull(path)
}

open fun mapObject(value: ObjectValue, schema: ObjectType, path: List<String>): AirbyteValue {
val values = LinkedHashMap<String, AirbyteValue>()
schema.properties.forEach { (name, field) ->
values[name] = map(value.values[name] ?: NullValue, field.type, path + name)
}
return ObjectValue(values)
}

open fun mapObjectWithoutSchema(
value: ObjectValue,
schema: ObjectTypeWithoutSchema,
path: List<String>
): AirbyteValue = value

open fun mapObjectWithEmptySchema(
value: ObjectValue,
schema: ObjectTypeWithEmptySchema,
path: List<String>
): AirbyteValue = value

open fun mapArray(value: ArrayValue, schema: ArrayType, path: List<String>): AirbyteValue {
return ArrayValue(
value.values.mapIndexed { index, element ->
map(element, schema.items.type, path + "[$index]")
}
)
}

open fun mapArrayWithoutSchema(
value: ArrayValue,
schema: ArrayTypeWithoutSchema,
path: List<String>
): AirbyteValue = value

open fun mapUnion(value: AirbyteValue, schema: UnionType, path: List<String>): AirbyteValue =
value

open fun mapBoolean(value: BooleanValue, path: List<String>): AirbyteValue = value

open fun mapNumber(value: NumberValue, path: List<String>): AirbyteValue = value

open fun mapString(value: StringValue, path: List<String>): AirbyteValue = value

open fun mapInteger(value: IntegerValue, path: List<String>): AirbyteValue = value

open fun mapDate(value: DateValue, path: List<String>): AirbyteValue = value

open fun mapTimeWithTimezone(value: TimeValue, path: List<String>): AirbyteValue = value

open fun mapTimeWithoutTimezone(value: TimeValue, path: List<String>): AirbyteValue = value

open fun mapTimestampWithTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value

open fun mapTimestampWithoutTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value

open fun mapNull(path: List<String>): AirbyteValue = NullValue

open fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue = value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

interface AirbyteValueMapper {
fun collectFailure(path: List<String>)
fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String> = emptyList()
): AirbyteValue =
try {
when (schema) {
is ObjectType -> mapObject(value as ObjectValue, schema, path)
is ObjectTypeWithoutSchema ->
mapObjectWithoutSchema(value as ObjectValue, schema, path)
is ObjectTypeWithEmptySchema ->
mapObjectWithEmptySchema(value as ObjectValue, schema, path)
is ArrayType -> mapArray(value as ArrayValue, schema, path)
is ArrayTypeWithoutSchema ->
mapArrayWithoutSchema(value as ArrayValue, schema, path)
is UnionType -> mapUnion(value, schema, path)
is BooleanType -> mapBoolean(value as BooleanValue, path)
is NumberType -> mapNumber(value as NumberValue, path)
is StringType -> mapString(value as StringValue, path)
is IntegerType -> mapInteger(value as IntegerValue, path)
is DateType -> mapDate(value as DateValue, path)
is TimeTypeWithTimezone -> mapTimeWithTimezone(value as TimeValue, path)
is TimeTypeWithoutTimezone -> mapTimeWithoutTimezone(value as TimeValue, path)
is TimestampTypeWithTimezone ->
mapTimestampWithTimezone(value as TimestampValue, path)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
}
} catch (e: Exception) {
collectFailure(path)
mapNull(path)
}

fun mapObject(value: ObjectValue, schema: ObjectType, path: List<String>): AirbyteValue
fun mapObjectWithoutSchema(
value: ObjectValue,
schema: ObjectTypeWithoutSchema,
path: List<String>
): AirbyteValue
fun mapObjectWithEmptySchema(
value: ObjectValue,
schema: ObjectTypeWithEmptySchema,
path: List<String>
): AirbyteValue
fun mapArray(value: ArrayValue, schema: ArrayType, path: List<String>): AirbyteValue
fun mapArrayWithoutSchema(
value: ArrayValue,
schema: ArrayTypeWithoutSchema,
path: List<String>
): AirbyteValue
fun mapUnion(value: AirbyteValue, schema: UnionType, path: List<String>): AirbyteValue
fun mapBoolean(value: BooleanValue, path: List<String>): AirbyteValue
fun mapNumber(value: NumberValue, path: List<String>): AirbyteValue
fun mapString(value: StringValue, path: List<String>): AirbyteValue
fun mapInteger(value: IntegerValue, path: List<String>): AirbyteValue
fun mapDate(value: DateValue, path: List<String>): AirbyteValue
fun mapTimeWithTimezone(value: TimeValue, path: List<String>): AirbyteValue
fun mapTimeWithoutTimezone(value: TimeValue, path: List<String>): AirbyteValue
fun mapTimestampWithTimezone(value: TimestampValue, path: List<String>): AirbyteValue
fun mapTimestampWithoutTimezone(value: TimestampValue, path: List<String>): AirbyteValue
fun mapNull(path: List<String>): AirbyteValue
fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

class MergeUnions : AirbyteSchemaIdentityMapper {
override fun mapUnion(schema: UnionType): AirbyteType {
// Map the options first so they're in their final form
val mappedOptions = schema.options.map { map(it) }
val mergedOptions = mergeOptions(mappedOptions)
if (mergedOptions.size == 1) {
return mergedOptions.first()
}
return UnionType(mergedOptions.toList())
}

private fun mergeOptions(options: List<AirbyteType>): Set<AirbyteType> {
val mergedOptions = mutableSetOf<AirbyteType>()
mergeOptions(mergedOptions, options)
return mergedOptions
}

private fun mergeOptions(into: MutableSet<AirbyteType>, from: List<AirbyteType>) {
for (option in from) {
if (option is UnionType) {
// If this is a union of a union, recursively merge the other union's options in
mergeOptions(into, option.options)
} else if (option is ObjectType) {
val existingObjOption: ObjectType? = into.find { it is ObjectType } as ObjectType?
if (existingObjOption == null) {
// No other object in the set, so just add this one
into.add(option)
continue
}

into.remove(existingObjOption)
val newProperties = existingObjOption.properties
for ((name, field) in option.properties) {
val existingField = newProperties[name]
newProperties[name] = field
if (existingField == null) {
// If no field exists with the same name, just adding this one is fine
continue
}

if (existingField != field) {
throw IllegalArgumentException(
"Cannot merge unions of objects with different types for the same field"
)
}

// If the fields are identical, we can just keep the existing field
}
into.add(ObjectType(newProperties))
} else {
into.add(option)
}
}
}
}
Loading

0 comments on commit 19c0f37

Please sign in to comment.