Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/instatusMigrate
Browse files Browse the repository at this point in the history
  • Loading branch information
DanyloGL authored Jan 20, 2025
2 parents 820330a + f3705c9 commit ea02ea2
Show file tree
Hide file tree
Showing 1,201 changed files with 41,524 additions and 51,139 deletions.
34 changes: 27 additions & 7 deletions .github/workflows/connectors_nightly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,38 @@ on:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
workflow_dispatch:
inputs:
test-connectors-options:
default: --concurrency=5 --support-level=certified
required: true

run-name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"

jobs:
generate_matrix:
name: Generate matrix
runs-on: ubuntu-24.04
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: "connectors --support-level=certified list --output=selected_connectors.json"
- name: Generate matrix - 30 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 30 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "generated_matrix=$matrix" >> $GITHUB_OUTPUT
test_connectors:
needs: generate_matrix
name: "Test connectors: ${{ inputs.test-connectors-options || 'nightly build for Certified connectors' }}"
timeout-minutes: 720 # 12 hours
runs-on: connector-nightly-xlarge
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}

steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
Expand All @@ -32,7 +52,7 @@ jobs:
with:
context: "master"
ci_job_key: "nightly_builds"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
# dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -41,4 +61,4 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ inputs.test-connectors-options || '--concurrency=8 --support-level=certified' }} test"
subcommand: "connectors ${{ matrix.connector_names}} test"
6 changes: 6 additions & 0 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ jobs:
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Install Python
id: install_python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Check PAT rate limits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.function.Consumer

/** Emits the [AirbyteMessage] instances produced by the connector. */
@DefaultImplementation(StdoutOutputConsumer::class)
interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
val emittedAt: Instant
abstract class OutputConsumer(private val clock: Clock) : Consumer<AirbyteMessage>, AutoCloseable {
/**
* The constant emittedAt timestamp we use for record timestamps.
*
* TODO: use the correct emittedAt time for each record. Ryan: not changing this now as it could
* have performance implications for sources given the delicate serialization logic in place
* here.
*/
val recordEmittedAt: Instant = Instant.ofEpochMilli(clock.millis())

fun accept(record: AirbyteRecordMessage) {
record.emittedAt = emittedAt.toEpochMilli()
open fun accept(record: AirbyteRecordMessage) {
record.emittedAt = recordEmittedAt.toEpochMilli()
accept(AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(record))
}

Expand Down Expand Up @@ -66,7 +73,9 @@ interface OutputConsumer : Consumer<AirbyteMessage>, AutoCloseable {
}

fun accept(trace: AirbyteTraceMessage) {
trace.emittedAt = emittedAt.toEpochMilli().toDouble()
// Use the correct emittedAt timestamp for trace messages. This allows platform and other
// downstream consumers to take emission time into account for error classification.
trace.emittedAt = clock.millis().toDouble()
accept(AirbyteMessage().withType(AirbyteMessage.Type.TRACE).withTrace(trace))
}

Expand Down Expand Up @@ -107,7 +116,7 @@ const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
@Secondary
private class StdoutOutputConsumer(
val stdout: PrintStream,
clock: Clock,
private val clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
Expand All @@ -132,9 +141,7 @@ private class StdoutOutputConsumer(
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)

) : OutputConsumer(clock) {
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
Expand Down Expand Up @@ -233,7 +240,7 @@ private class StdoutOutputConsumer(
namespacedTemplates.getOrPut(namespace) { StreamToTemplateMap() }
}
return streamToTemplateMap.getOrPut(stream) {
RecordTemplate.create(stream, namespace, emittedAt)
RecordTemplate.create(stream, namespace, recordEmittedAt)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.time.Clock
import java.time.Instant

/** [OutputConsumer] implementation for unit tests. Collects everything into thread-safe buffers. */
@Singleton
@Requires(notEnv = [Environment.CLI])
@Replaces(OutputConsumer::class)
class BufferingOutputConsumer(
clock: Clock,
) : OutputConsumer {
override val emittedAt: Instant = Instant.now(clock)
) : OutputConsumer(clock) {

private val records = mutableListOf<AirbyteRecordMessage>()
private val states = mutableListOf<AirbyteStateMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.DoubleCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
Expand Down Expand Up @@ -63,8 +64,8 @@ interface MetaField : FieldOrMetaField {
enum class CommonMetaField(
override val type: FieldType,
) : MetaField {
CDC_UPDATED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_DELETED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_UPDATED_AT(CdcStringMetaFieldType),
CDC_DELETED_AT(CdcStringMetaFieldType),
;

override val id: String
Expand All @@ -89,3 +90,9 @@ data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
}

data object CdcNumberMetaFieldType : LosslessFieldType {
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.NUMBER
override val jsonEncoder: JsonEncoder<Double> = DoubleCodec
override val jsonDecoder: JsonDecoder<Double> = DoubleCodec
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ sealed class FeedBootstrap<T : Feed>(
stream.schema.forEach { recordData.putNull(it.id) }
if (feed is Stream && precedingGlobalFeed != null) {
metaFieldDecorator.decorateRecordData(
timestamp = outputConsumer.emittedAt.atOffset(ZoneOffset.UTC),
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
globalStateValue = stateQuerier.current(precedingGlobalFeed),
stream,
recordData,
Expand All @@ -125,7 +125,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
)

Expand All @@ -138,7 +138,7 @@ sealed class FeedBootstrap<T : Feed>(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withEmittedAt(outputConsumer.emittedAt.toEpochMilli())
.withEmittedAt(outputConsumer.recordEmittedAt.toEpochMilli())
.withData(reusedRecordData)
.withMeta(reusedRecordMeta)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FeedBootstrapTest {
FeedBootstrap.create(outputConsumer, metaFieldDecorator, stateQuerier, this)

fun expected(vararg data: String): List<String> {
val ts = outputConsumer.emittedAt.toEpochMilli()
val ts = outputConsumer.recordEmittedAt.toEpochMilli()
return data.map { """{"namespace":"ns","stream":"tbl","data":$it,"emitted_at":$ts}""" }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
Expand All @@ -23,7 +25,9 @@ class MockBasicFunctionalityIntegrationTest :
isStreamSchemaRetroactive = false,
supportsDedup = true,
stringifySchemalessObjects = false,
promoteUnionToObject = false,
unionBehavior = UnionBehavior.PASS_THROUGH,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package io.airbyte.cdk.load.data

import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.node.NullNode
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
Expand Down Expand Up @@ -48,6 +54,11 @@ sealed interface AirbyteValue {
// (mostly the date/timestamp/time types - everything else is fine)
data object NullValue : AirbyteValue, Comparable<NullValue> {
override fun compareTo(other: NullValue): Int = 0

// make sure that we serialize this as a NullNode, rather than an empty object.
// We can't just return `null`, because jackson treats that as an error
// and falls back to its normal serialization behavior.
@JsonValue fun toJson(): NullNode = NullNode.instance
}

@JvmInline
Expand Down Expand Up @@ -75,34 +86,39 @@ value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<Number
value class DateValue(val value: LocalDate) : AirbyteValue, Comparable<DateValue> {
constructor(date: String) : this(LocalDate.parse(date))
override fun compareTo(other: DateValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimestampWithTimezoneValue(val value: OffsetDateTime) :
AirbyteValue, Comparable<TimestampWithTimezoneValue> {
constructor(timestamp: String) : this(OffsetDateTime.parse(timestamp))
override fun compareTo(other: TimestampWithTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimestampWithoutTimezoneValue(val value: LocalDateTime) :
AirbyteValue, Comparable<TimestampWithoutTimezoneValue> {
constructor(timestamp: String) : this(LocalDateTime.parse(timestamp))
override fun compareTo(other: TimestampWithoutTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimeWithTimezoneValue(val value: OffsetTime) :
AirbyteValue, Comparable<TimeWithTimezoneValue> {
constructor(time: String) : this(OffsetTime.parse(time))
override fun compareTo(other: TimeWithTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimeWithoutTimezoneValue(val value: LocalTime) :
AirbyteValue, Comparable<TimeWithoutTimezoneValue> {
constructor(time: String) : this(LocalTime.parse(time))
override fun compareTo(other: TimeWithoutTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
Expand All @@ -112,12 +128,28 @@ value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue {
}
}

// jackson can't figure out how to serialize this class,
// so write a custom serializer that just serializes the map directly.
// For some reason, the more obvious `@JsonValue fun toJson() = values`
// doesn't work either.
@JsonSerialize(using = ObjectValueSerializer::class)
@JvmInline
value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue {
@JsonValue fun toJson() = values
companion object {
fun from(map: Map<String, Any?>): ObjectValue =
ObjectValue(map.mapValuesTo(linkedMapOf()) { (_, v) -> AirbyteValue.from(v) })
}
}

private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
override fun serialize(
value: ObjectValue,
gen: JsonGenerator,
serializers: SerializerProvider,
) {
gen.writePOJO(value.values)
}
}

@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
Loading

0 comments on commit ea02ea2

Please sign in to comment.