From 7a0359e4bc7d8500a77be8be9b7a7c0b20578714 Mon Sep 17 00:00:00 2001 From: mkhokh Date: Tue, 30 Nov 2021 13:41:57 +0200 Subject: [PATCH 1/8] Source-MySql: do not check cdc required param binlog_row_image for standard replication --- .../source/mysql/MySqlSource.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 0ed372d7971e6..1e943c1e65baa 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -130,24 +130,24 @@ public List> getCheckOperations(final J throw new RuntimeException("The variable binlog_format should be set to ROW, but it is : " + binlogFormat); } }); - } - checkOperations.add(database -> { - final List image = database.resultSetQuery(connection -> { - final String sql = "show variables where Variable_name = 'binlog_row_image'"; + checkOperations.add(database -> { + final List image = database.resultSetQuery(connection -> { + final String sql = "show variables where Variable_name = 'binlog_row_image'"; - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); - if (image.size() != 1) { - throw new RuntimeException("Could not query the variable binlog_row_image"); - } + if (image.size() != 1) { + throw new RuntimeException("Could not query the variable binlog_row_image"); + } - final String binlogRowImage = image.get(0); - if (!binlogRowImage.equalsIgnoreCase("FULL")) { - throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage); - } - }); + final String binlogRowImage = image.get(0); + if (!binlogRowImage.equalsIgnoreCase("FULL")) { + throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage); + } + }); + } return checkOperations; } From 5a94474a52ac935b6d9b41fb22998ba338f365f3 Mon Sep 17 00:00:00 2001 From: mkhokh Date: Tue, 30 Nov 2021 13:46:43 +0200 Subject: [PATCH 2/8] Source-MySql: fix formatting --- .../source_mixpanel/schemas/export.json | 4 +- .../source-mixpanel/source_mixpanel/source.py | 2 +- .../source/postgres/PostgresSource.java | 14 +++---- .../postgres/PostgresSourceOperations.java | 4 ++ .../sources/PostresSourceDatatypeTest.java | 9 +++-- .../PostgresSourceOperationsTest.java | 4 ++ .../source/postgres/PostgresSourceTest.java | 40 +++++++++---------- .../source-trello/source_trello/spec.json | 2 +- 8 files changed, 45 insertions(+), 34 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json index 3c34db892269b..fadad1c88582d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json @@ -16,8 +16,8 @@ "labels": { "type": ["null", "array"], "items": { - "type": ["null", "string"] - } + "type": ["null", "string"] + } }, "sampling_factor": { "type": ["null", "integer"] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index eaa83bbe0a73d..e24a32ced486d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -16,10 +16,10 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, TokenAuthenticator +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer class MixpanelStream(HttpStream, ABC): diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 1a34b0320cb13..e5ec80be7d129 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -230,13 +230,13 @@ private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stre @Override public Set getPrivilegesTableForCurrentUser(JdbcDatabase database, String schema) throws SQLException { return database.query(connection -> { - final PreparedStatement ps = connection.prepareStatement( - "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" - + "FROM information_schema.table_privileges\n" - + "WHERE grantee = ? AND privilege_type = 'SELECT'"); - ps.setString(1, database.getDatabaseConfig().get("username").asText()); - return ps; - }, sourceOperations::rowToJson) + final PreparedStatement ps = connection.prepareStatement( + "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" + + "FROM information_schema.table_privileges\n" + + "WHERE grantee = ? AND privilege_type = 'SELECT'"); + ps.setString(1, database.getDatabaseConfig().get("username").asText()); + return ps; + }, sourceOperations::rowToJson) .collect(toSet()) .stream() .map(e -> JdbcPrivilegeDto.builder() diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index e8197830e3d9e..9b84d1ee4c050 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.postgres; import com.fasterxml.jackson.databind.JsonNode; diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java index 87789e365d530..e55a958b31de7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java @@ -58,9 +58,12 @@ protected Database setupDatabase() throws SQLException { ctx.execute("CREATE SCHEMA TEST;"); ctx.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');"); ctx.execute("CREATE TYPE inventory_item AS (name text, supplier_id integer, price numeric);"); - // In one of the test case, we have some money values with currency symbol. Postgres can only understand - // those money values if the symbol corresponds to the monetary locale setting. For example, if the locale - // is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is necessary here to + // In one of the test case, we have some money values with currency symbol. Postgres can only + // understand + // those money values if the symbol corresponds to the monetary locale setting. For example, if the + // locale + // is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is necessary here + // to // make sure the unit test can pass, no matter what the locale the runner VM has. ctx.execute("SET lc_monetary TO 'en_US.utf8';"); return null; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java index 6ac88fb3cfb42..da961cb5ff951 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.postgres; import static org.junit.jupiter.api.Assertions.*; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 3248c29518e32..32148908b31a5 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -52,40 +52,40 @@ class PostgresSourceTest { private static final String STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW = "id_and_name_3_view"; private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( CatalogHelpers.createAirbyteStream( - STREAM_NAME, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + STREAM_NAME, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))), CatalogHelpers.createAirbyteStream( STREAM_NAME + "2", SCHEMA_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)), CatalogHelpers.createAirbyteStream( - "names", - SCHEMA_NAME, - Field.of("first_name", JsonSchemaPrimitive.STRING), - Field.of("last_name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + "names", + SCHEMA_NAME, + Field.of("first_name", JsonSchemaPrimitive.STRING), + Field.of("last_name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))), CatalogHelpers.createAirbyteStream( - STREAM_NAME_PRIVILEGES_TEST_CASE, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING)) + STREAM_NAME_PRIVILEGES_TEST_CASE, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))), CatalogHelpers.createAirbyteStream( - STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING)) + STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); diff --git a/airbyte-integrations/connectors/source-trello/source_trello/spec.json b/airbyte-integrations/connectors/source-trello/source_trello/spec.json index 1dfea36939f83..3aedc8467fcd1 100644 --- a/airbyte-integrations/connectors/source-trello/source_trello/spec.json +++ b/airbyte-integrations/connectors/source-trello/source_trello/spec.json @@ -28,7 +28,7 @@ }, "board_ids": { "type": "array", - "items":{ + "items": { "type": "string", "pattern": "^[0-9a-fA-F]{24}$" }, From 769e2d5cf1e580fa027a65670c608002f08cf41b Mon Sep 17 00:00:00 2001 From: mkhokh Date: Tue, 30 Nov 2021 15:20:39 +0200 Subject: [PATCH 3/8] Revert "Source-MySql: fix formatting" This reverts commit 5a94474a52ac935b6d9b41fb22998ba338f365f3. --- .../source_mixpanel/schemas/export.json | 4 +- .../source-mixpanel/source_mixpanel/source.py | 2 +- .../source/postgres/PostgresSource.java | 14 +++---- .../postgres/PostgresSourceOperations.java | 4 -- .../sources/PostresSourceDatatypeTest.java | 9 ++--- .../PostgresSourceOperationsTest.java | 4 -- .../source/postgres/PostgresSourceTest.java | 40 +++++++++---------- .../source-trello/source_trello/spec.json | 2 +- 8 files changed, 34 insertions(+), 45 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json index fadad1c88582d..3c34db892269b 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json @@ -16,8 +16,8 @@ "labels": { "type": ["null", "array"], "items": { - "type": ["null", "string"] - } + "type": ["null", "string"] + } }, "sampling_factor": { "type": ["null", "integer"] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index e24a32ced486d..eaa83bbe0a73d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -16,10 +16,10 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, TokenAuthenticator -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer class MixpanelStream(HttpStream, ABC): diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index e5ec80be7d129..1a34b0320cb13 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -230,13 +230,13 @@ private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stre @Override public Set getPrivilegesTableForCurrentUser(JdbcDatabase database, String schema) throws SQLException { return database.query(connection -> { - final PreparedStatement ps = connection.prepareStatement( - "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" - + "FROM information_schema.table_privileges\n" - + "WHERE grantee = ? AND privilege_type = 'SELECT'"); - ps.setString(1, database.getDatabaseConfig().get("username").asText()); - return ps; - }, sourceOperations::rowToJson) + final PreparedStatement ps = connection.prepareStatement( + "SELECT DISTINCT table_catalog, table_schema, table_name, privilege_type\n" + + "FROM information_schema.table_privileges\n" + + "WHERE grantee = ? AND privilege_type = 'SELECT'"); + ps.setString(1, database.getDatabaseConfig().get("username").asText()); + return ps; + }, sourceOperations::rowToJson) .collect(toSet()) .stream() .map(e -> JdbcPrivilegeDto.builder() diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 9b84d1ee4c050..e8197830e3d9e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -1,7 +1,3 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - package io.airbyte.integrations.source.postgres; import com.fasterxml.jackson.databind.JsonNode; diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java index e55a958b31de7..87789e365d530 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostresSourceDatatypeTest.java @@ -58,12 +58,9 @@ protected Database setupDatabase() throws SQLException { ctx.execute("CREATE SCHEMA TEST;"); ctx.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');"); ctx.execute("CREATE TYPE inventory_item AS (name text, supplier_id integer, price numeric);"); - // In one of the test case, we have some money values with currency symbol. Postgres can only - // understand - // those money values if the symbol corresponds to the monetary locale setting. For example, if the - // locale - // is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is necessary here - // to + // In one of the test case, we have some money values with currency symbol. Postgres can only understand + // those money values if the symbol corresponds to the monetary locale setting. For example, if the locale + // is 'en_GB', '£100' is valid, but '$100' is not. So setting the monetary locate is necessary here to // make sure the unit test can pass, no matter what the locale the runner VM has. ctx.execute("SET lc_monetary TO 'en_US.utf8';"); return null; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java index da961cb5ff951..6ac88fb3cfb42 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceOperationsTest.java @@ -1,7 +1,3 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - package io.airbyte.integrations.source.postgres; import static org.junit.jupiter.api.Assertions.*; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 32148908b31a5..3248c29518e32 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -52,40 +52,40 @@ class PostgresSourceTest { private static final String STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW = "id_and_name_3_view"; private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( CatalogHelpers.createAirbyteStream( - STREAM_NAME, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + STREAM_NAME, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))), CatalogHelpers.createAirbyteStream( STREAM_NAME + "2", SCHEMA_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + Field.of("name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)), CatalogHelpers.createAirbyteStream( - "names", - SCHEMA_NAME, - Field.of("first_name", JsonSchemaPrimitive.STRING), - Field.of("last_name", JsonSchemaPrimitive.STRING), - Field.of("power", JsonSchemaPrimitive.NUMBER)) + "names", + SCHEMA_NAME, + Field.of("first_name", JsonSchemaPrimitive.STRING), + Field.of("last_name", JsonSchemaPrimitive.STRING), + Field.of("power", JsonSchemaPrimitive.NUMBER)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name"))), CatalogHelpers.createAirbyteStream( - STREAM_NAME_PRIVILEGES_TEST_CASE, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING)) + STREAM_NAME_PRIVILEGES_TEST_CASE, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))), CatalogHelpers.createAirbyteStream( - STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW, - SCHEMA_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING)) + STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW, + SCHEMA_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); diff --git a/airbyte-integrations/connectors/source-trello/source_trello/spec.json b/airbyte-integrations/connectors/source-trello/source_trello/spec.json index 3aedc8467fcd1..1dfea36939f83 100644 --- a/airbyte-integrations/connectors/source-trello/source_trello/spec.json +++ b/airbyte-integrations/connectors/source-trello/source_trello/spec.json @@ -28,7 +28,7 @@ }, "board_ids": { "type": "array", - "items": { + "items":{ "type": "string", "pattern": "^[0-9a-fA-F]{24}$" }, From 74b833a90c12ffe922f1362e730e490ae67788fe Mon Sep 17 00:00:00 2001 From: mkhokh Date: Wed, 1 Dec 2021 13:28:21 +0200 Subject: [PATCH 4/8] Source-MySql: made a code improvement --- .../source/mysql/MySqlSource.java | 73 ++++++------------- 1 file changed, 22 insertions(+), 51 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 1e943c1e65baa..f2babe7eebca2 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -97,58 +97,10 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { public List> getCheckOperations(final JsonNode config) throws Exception { final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); if (isCdc(config)) { - checkOperations.add(database -> { - final List log = database.resultSetQuery(connection -> { - final String sql = "show variables where Variable_name = 'log_bin'"; - - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); - - if (log.size() != 1) { - throw new RuntimeException("Could not query the variable log_bin"); - } - - final String logBin = log.get(0); - if (!logBin.equalsIgnoreCase("ON")) { - throw new RuntimeException("The variable log_bin should be set to ON, but it is : " + logBin); - } - }); - - checkOperations.add(database -> { - final List format = database.resultSetQuery(connection -> { - final String sql = "show variables where Variable_name = 'binlog_format'"; - - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); - - if (format.size() != 1) { - throw new RuntimeException("Could not query the variable binlog_format"); - } - - final String binlogFormat = format.get(0); - if (!binlogFormat.equalsIgnoreCase("ROW")) { - throw new RuntimeException("The variable binlog_format should be set to ROW, but it is : " + binlogFormat); - } - }); - - checkOperations.add(database -> { - final List image = database.resultSetQuery(connection -> { - final String sql = "show variables where Variable_name = 'binlog_row_image'"; - - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); - - if (image.size() != 1) { - throw new RuntimeException("Could not query the variable binlog_row_image"); - } - - final String binlogRowImage = image.get(0); - if (!binlogRowImage.equalsIgnoreCase("FULL")) { - throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage); - } - }); + checkOperations.addAll(List.of(getCheckOperation("log_bin", "ON"), + getCheckOperation("binlog_format", "ROW"), + getCheckOperation("binlog_row_image", "FULL"))); } - return checkOperations; } @@ -253,4 +205,23 @@ public enum ReplicationMethod { CDC } + private CheckedConsumer getCheckOperation(String name, String value) { + return database -> { + final List result = database.resultSetQuery(connection -> { + final String sql = String.format("show variables where Variable_name = '%s'", name); + + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); + + if (result.size() != 1) { + throw new RuntimeException(String.format("Could not query the variable %s", name)); + } + + final String resultValue = result.get(0); + if (!resultValue.equalsIgnoreCase(value)) { + throw new RuntimeException(String.format("The variable %s should be set to %s, but it is : %s", name, value, resultValue)); + } + }; + } + } From 9d17ab569312b73f02f8bd2e940011ce3dee4168 Mon Sep 17 00:00:00 2001 From: mkhokh Date: Fri, 3 Dec 2021 16:22:17 +0200 Subject: [PATCH 5/8] Source-MySql: bump versions --- .../435bb9a5-7887-4809-aa58-28c27df0d7ad.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../connectors/source-mysql-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-mysql/Dockerfile | 2 +- docs/integrations/sources/mysql.md | 1 + 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index ed5e4e6ebf4d9..6844826ada84d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.4.11", + "dockerImageTag": "0.4.12", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index cfbb4d5c7eff0..1ffdd6607628a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -414,7 +414,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.4.11 + dockerImageTag: 0.4.12 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 327da6f4d45e6..6bfc288e9c919 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 9e42efb5467e7..0cf768cb9af6d 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.4.11 +LABEL io.airbyte.version=0.4.12 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 9a797c62e646e..9b03f7290359b 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -180,6 +180,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.4.12| 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication | | 0.4.11| 2021-11-19 | [8047](https://github.com/airbytehq/airbyte/pull/8047) | Source MySQL: transform binary data base64 format | | 0.4.10| 2021-11-15 | [7820](https://github.com/airbytehq/airbyte/pull/7820) | Added basic performance test | | 0.4.9 | 2021-11-02 | [7559](https://github.com/airbytehq/airbyte/pull/7559) | Correctly process large unsigned short integer values which may fall outside java's `Short` data type capability | From da3136536572380f08f7b4a26dfefd20182c7ac1 Mon Sep 17 00:00:00 2001 From: mkhokh Date: Fri, 3 Dec 2021 19:51:43 +0200 Subject: [PATCH 6/8] Source-MySql: fix version in source_specs.yaml --- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e1ef8e250319b..3dc0d0fdfa563 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3924,7 +3924,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.4.11" +- dockerImage: "airbyte/source-mysql:0.4.12" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: From 119c4c3f84c179440b8a132c468bc72506d0114b Mon Sep 17 00:00:00 2001 From: mkhokh Date: Mon, 6 Dec 2021 12:55:18 +0200 Subject: [PATCH 7/8] Source-MySql: bump versions --- .../435bb9a5-7887-4809-aa58-28c27df0d7ad.json | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-mysql-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-mysql/Dockerfile | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 6844826ada84d..6959c593b842b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.4.12", + "dockerImageTag": "0.4.13", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 225c2f29c5fbd..c53a1fdd91c5a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3963,7 +3963,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.4.12" +- dockerImage: "airbyte/source-mysql:0.4.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 6bfc288e9c919..851d901bd9632 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 0cf768cb9af6d..8a2c756c61761 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.4.12 +LABEL io.airbyte.version=0.4.13 LABEL io.airbyte.name=airbyte/source-mysql From ec79f76897f585eec58e148da798a846532c6a20 Mon Sep 17 00:00:00 2001 From: mkhokh Date: Mon, 6 Dec 2021 16:59:46 +0200 Subject: [PATCH 8/8] Source-MySql:update source_definitions.yaml with new version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 3149c2da77a2a..c2ac7675341fc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -414,7 +414,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.4.12 + dockerImageTag: 0.4.13 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database