diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f7a2891b540e..fee7d0f96374 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,8 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag | +| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts | | 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state | | 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB | | 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 4433e215f812..b0d83063013b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.23.0 +version=0.23.2 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index c060133a5546..b12fd56c93f7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -317,7 +317,7 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName); final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database); - final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); + final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final TyperDeduper typerDeduper; if (disableTypeDedupe) { typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle index 87d5e2b99a77..60e06e23de2d 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.4' + cdkVersionRequired = '0.23.2' features = ['db-destinations', 'typing-deduping', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index fb475fa2995e..43e8aa23e427 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 2.0.0 + dockerImageTag: 2.0.1 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index 54398f71bd38..ab746b991351 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.4' + cdkVersionRequired = '0.23.2' features = ['db-destinations', 'datastore-postgres', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index d380f44acd77..af88d008829f 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 2.0.0 + dockerImageTag: 2.0.1 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 62e975dbee86..93c51df74259 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -14,13 +14,16 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DataSourceFactory; import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler; import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() { return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); } + @Override + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + return new PostgresDestinationHandler(databaseName, database); + } + @Override public boolean isV2Destination() { return true; diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java new file mode 100644 index 000000000000..21cc549b3d38 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresDestinationHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.postgres.typing_deduping; + +import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; + +public class PostgresDestinationHandler extends JdbcDestinationHandler { + + public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) { + super(databaseName, jdbcDatabase); + } + + @Override + protected String toJdbcTypeName(AirbyteType airbyteType) { + // This is mostly identical to the postgres implementation, but swaps jsonb to super + if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) { + return toJdbcTypeName(airbyteProtocolType); + } + return switch (airbyteType.getTypeName()) { + case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb"; + // No nested Unions supported so this will definitely not result in infinite recursion. + case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType()); + default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType); + }; + } + + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { + return switch (airbyteProtocolType) { + case STRING -> "varchar"; + case NUMBER -> "numeric"; + case INTEGER -> "int8"; + case BOOLEAN -> "bool"; + case TIMESTAMP_WITH_TIMEZONE -> "timestamptz"; + case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp"; + case TIME_WITH_TIMEZONE -> "timetz"; + case TIME_WITHOUT_TIMEZONE -> "time"; + case DATE -> "date"; + case UNKNOWN -> "jsonb"; + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index 0918226b3227..9d7217e3f826 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -20,10 +20,7 @@ import static org.jooq.impl.DSL.rowNumber; import static org.jooq.impl.DSL.val; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; @@ -37,7 +34,6 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -54,13 +50,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator { public static final DataType JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb"); - private static final Map POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of( - "numeric", "decimal", - "int8", "bigint", - "bool", "boolean", - "timestamptz", "timestamp with time zone", - "timetz", "time with time zone"); - public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) { super(namingTransformer); } @@ -309,29 +298,6 @@ protected Field getRowNumber(final List primaryKeys, final Op .orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { - // Check that the columns match, with special handling for the metadata columns. - // This is mostly identical to the redshift implementation, but swaps super to jsonb - final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()), - LinkedHashMap::putAll); - final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream() - .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())), - LinkedHashMap::putAll); - - final boolean sameColumns = actualColumns.equals(intendedColumns) - && "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type()) - && "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type()) - && "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type()); - - return sameColumns; - } - /** * Extract a raw field, leaving it as jsonb */ @@ -343,8 +309,4 @@ private Field jsonTypeof(final Field field) { return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field); } - private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) { - return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType); - } - } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java index 3f744c846b08..6efac136e4c3 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGeneratorIntegrationTest.java @@ -5,24 +5,23 @@ package io.airbyte.integrations.destination.postgres.typing_deduping; import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE; -import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.destination.postgres.PostgresDestination; import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; -import java.util.Optional; +import java.util.List; import javax.sql.DataSource; import org.jooq.DataType; import org.jooq.Field; @@ -76,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected DestinationHandler getDestinationHandler() { - return new JdbcDestinationHandler(databaseName, database); + protected DestinationHandler getDestinationHandler() { + return new PostgresDestinationHandler(databaseName, database); } @Override @@ -96,29 +95,11 @@ public void testCreateTableIncremental() throws Exception { final Sql sql = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(sql); - final Optional existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id()); - - assertTrue(existingTable.isPresent()); - assertAll( - () -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()), - () -> assertEquals("int8", existingTable.get().columns().get("id1").type()), - () -> assertEquals("int8", existingTable.get().columns().get("id2").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("array").type()), - () -> assertEquals("varchar", existingTable.get().columns().get("string").type()), - () -> assertEquals("numeric", existingTable.get().columns().get("number").type()), - () -> assertEquals("int8", existingTable.get().columns().get("integer").type()), - () -> assertEquals("bool", existingTable.get().columns().get("boolean").type()), - () -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()), - () -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()), - () -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()), - () -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()), - () -> assertEquals("date", existingTable.get().columns().get("date").type()), - () -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type())); - // TODO assert on table indexing, etc. + List initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + final DestinationInitialState initialState = initialStates.getFirst(); + assertTrue(initialState.isFinalTablePresent()); + assertFalse(initialState.isSchemaMismatch()); } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java index 50b1da44fa6c..128d8d2de1cf 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/testFixtures/java/io/airbyte/integrations/destination/postgres/typing_deduping/AbstractPostgresTypingDedupingTest.java @@ -44,7 +44,7 @@ private String generateBigString() { } @Override - protected SqlGenerator getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new PostgresSqlGenerator(new PostgresSQLNameTransformer()); } diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index eb07756183d4..50bd15cc864a 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -193,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 2.0.1 | 2024-02-22 | [35385](https://github.com/airbytehq/airbyte/pull/35385) | Upgrade CDK to 0.23.0; Gathering required initial state upfront | | 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. | | 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults | | 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib | @@ -220,4 +221,4 @@ Now that you have set up the Postgres destination connector, check out the follo | 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | | 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support | -| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing | \ No newline at end of file +| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |