Skip to content

Commit

Permalink
Merge branch 'master' into dlecocq/source-datadog-slo-history
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosmarxm authored Jun 14, 2024
2 parents c68e773 + 3c3a80a commit 1a03975
Show file tree
Hide file tree
Showing 282 changed files with 16,036 additions and 5,611 deletions.
1 change: 1 addition & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- airbyte-ci/connectors/pipelines/**
- airbyte-ci/connectors/base_images/**
- airbyte-ci/connectors/common_utils/**
- airbyte-ci/connectors/connectors_insights/**
- airbyte-ci/connectors/connector_ops/**
- airbyte-ci/connectors/connectors_qa/**
- airbyte-ci/connectors/ci_credentials/**
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/connectors_insights.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: Connectors Insights

on:
schedule:
- cron: "0 0,12 * * *" # Run every 12 hours UTC
workflow_dispatch:

jobs:
connectors_insights:
name: Connectors Insights generation
runs-on: connector-nightly-xlarge
timeout-minutes: 1440 # 24 hours
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Get Dagger Engine Image
uses: ./.github/actions/get-dagger-engine-image
with:
dagger_engine_image: "registry.dagger.io/engine:v0.9.6"
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}
- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry -C airbyte-ci/connectors/connectors_insights install --no-interaction --no-root
- name: Install project
run: poetry -C airbyte-ci/connectors/connectors_insights install --no-interaction
- name: Write Google service account key to file
run: echo "$GCP_SA_KEY" > $HOME/gcp-sa-key.json
env:
GCP_SA_KEY: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
- name: Set GOOGLE_APPLICATION_CREDENTIALS
run: echo "GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp-sa-key.json" >> $GITHUB_ENV
- name: Run connectors insights
run: |
poetry -C airbyte-ci/connectors/connectors_insights run connectors-insights generate --gcs-uri=gs://prod-airbyte-cloud-connector-metadata-service/connector_insights --connector-directory airbyte-integrations/connectors/ --concurrency 10
10 changes: 6 additions & 4 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
| 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file |
| 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas |
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -25,4 +26,11 @@ object DbAnalyticsUtils {
.withType(DATA_TYPES_SERIALIZATION_ERROR_KEY)
.withValue("1")
}

@JvmStatic
fun cdcSnapshotForceShutdownMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage()
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object AirbyteTraceMessageUtility {
outputRecordCollector.accept(message)
}

private fun makeErrorTraceAirbyteMessage(
fun makeErrorTraceAirbyteMessage(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ object ConnectorExceptionUtil {
"temporary file size exceeds temp_file_limit"
)
private val TRANSIENT_EOF_EXCEPTION_MESSAGE: Array<String> =
arrayOf("connection was unexpectedly lost")
arrayOf(
"connection was unexpectedly lost",
"can not read response from server. expected to read"
)
private val RECOVERY_CONNECTION_EXCEPTION_MESSAGE: Array<String> =
arrayOf("due to conflict with recovery")

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.37.2
version=0.38.1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.util.*
abstract class DebeziumPropertiesManager(
private val properties: Properties,
private val config: JsonNode,
private val catalog: ConfiguredAirbyteCatalog
private val catalog: ConfiguredAirbyteCatalog,
private val streamNames: List<String>
) {
fun getDebeziumProperties(offsetManager: AirbyteFileOffsetBackingStore): Properties {
return getDebeziumProperties(offsetManager, Optional.empty())
Expand Down Expand Up @@ -73,7 +74,7 @@ abstract class DebeziumPropertiesManager(
// following
props.setProperty("value.converter.replace.null.with.default", "false")
// includes
props.putAll(getIncludeConfiguration(catalog, config))
props.putAll(getIncludeConfiguration(catalog, config, streamNames))

return props
}
Expand All @@ -84,7 +85,8 @@ abstract class DebeziumPropertiesManager(

protected abstract fun getIncludeConfiguration(
catalog: ConfiguredAirbyteCatalog,
config: JsonNode?
config: JsonNode?,
streamNames: List<String>
): Properties

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import org.codehaus.plexus.util.StringUtils
class RelationalDbDebeziumPropertiesManager(
properties: Properties,
config: JsonNode,
catalog: ConfiguredAirbyteCatalog
) : DebeziumPropertiesManager(properties, config, catalog) {
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
) : DebeziumPropertiesManager(properties, config, catalog, completedStreamNames) {
override fun getConnectionConfiguration(config: JsonNode): Properties {
val properties = Properties()

Expand All @@ -42,20 +43,24 @@ class RelationalDbDebeziumPropertiesManager(

override fun getIncludeConfiguration(
catalog: ConfiguredAirbyteCatalog,
config: JsonNode?
config: JsonNode?,
streamNames: List<String>
): Properties {
val properties = Properties()

// table selection
properties.setProperty("table.include.list", getTableIncludelist(catalog))
properties.setProperty("table.include.list", getTableIncludelist(catalog, streamNames))
// column selection
properties.setProperty("column.include.list", getColumnIncludeList(catalog))
properties.setProperty("column.include.list", getColumnIncludeList(catalog, streamNames))

return properties
}

companion object {
fun getTableIncludelist(catalog: ConfiguredAirbyteCatalog): String {
fun getTableIncludelist(
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
): String {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1
Expand All @@ -69,13 +74,17 @@ class RelationalDbDebeziumPropertiesManager(
.filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL }
.map { obj: ConfiguredAirbyteStream -> obj.stream }
.map { stream: AirbyteStream -> stream.namespace + "." + stream.name }
.filter { streamName: String -> completedStreamNames.contains(streamName) }
// debezium needs commas escaped to split properly
.joinToString(",") { x: String ->
StringUtils.escape(Pattern.quote(x), ",".toCharArray(), "\\,")
}
}

fun getColumnIncludeList(catalog: ConfiguredAirbyteCatalog): String {
fun getColumnIncludeList(
catalog: ConfiguredAirbyteCatalog,
completedStreamNames: List<String>
): String {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1"
Expand All @@ -92,6 +101,9 @@ class RelationalDbDebeziumPropertiesManager(
return catalog.streams
.filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL }
.map { obj: ConfiguredAirbyteStream -> obj.stream }
.filter { stream: AirbyteStream ->
completedStreamNames.contains(stream.namespace + "." + stream.name)
}
.map { s: AirbyteStream ->
val fields = parseFields(s.jsonSchema["properties"].fieldNames())
Pattern.quote(s.namespace + "." + s.name) +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.streamstatus

import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.makeErrorTraceAirbyteMessage
import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class TransientErrorTraceEmitterIterator(private val e: Throwable) :
AutoCloseableIterator<AirbyteMessage?> {
private var emitted = false

override fun hasNext(): Boolean {
return !emitted
}

override fun next(): AirbyteMessage {
emitted = true
return makeErrorTraceAirbyteMessage(
e,
e.message,
AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR
)
}

@Throws(Exception::class)
override fun close() {
// no-op
}

override fun remove() {
// no-op
}

companion object {
private val LOGGER: Logger =
LoggerFactory.getLogger(TransientErrorTraceEmitterIterator::class.java)
}
}
Loading

0 comments on commit 1a03975

Please sign in to comment.