From d6199c67998f6370791facee437d7dc4cb2a2bf2 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 16 Feb 2024 13:53:14 -0800 Subject: [PATCH 1/3] wild idea --- .../jdbc/JdbcBufferedConsumerFactory.java | 3 +- .../staging/GeneralStagingFunctions.java | 8 +- .../typing_deduping/DefaultTyperDeduper.java | 23 +++--- .../NoOpTyperDeduperWithV1V2Migrations.java | 41 ++-------- .../typing_deduping/NoopTyperDeduper.java | 12 ++- .../typing_deduping/TyperDeduper.java | 40 +++++++++- .../typing_deduping/TyperDeduperUtil.kt | 79 ++++++++++++++++--- .../DefaultTyperDeduperTest.java | 34 +++++--- 8 files changed, 167 insertions(+), 73 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index d0d488c71284..ce4be3856c24 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -161,7 +161,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, final Collection writeConfigs, final TyperDeduper typerDeduper) { return () -> { - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size()); final List queryList = new ArrayList<>(); for (final WriteConfig writeConfig : writeConfigs) { @@ -181,6 +181,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, } sqlOperations.executeTransaction(database, queryList); LOGGER.info("Preparing raw tables in destination completed."); + typerDeduper.prepareFinalTables(); }; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java index 0eef0c5343bf..dd59a80e7dd2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java @@ -39,7 +39,10 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, final TyperDeduper typerDeduper) { return () -> { log.info("Preparing raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareTables(); + + typerDeduper.prepareSchemasAndRawTables(); + + // Create raw tables final List queryList = new ArrayList<>(); for (final WriteConfig writeConfig : writeConfigs) { final String schema = writeConfig.getOutputSchemaName(); @@ -69,6 +72,9 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, log.info("Preparing staging area in destination completed for schema {} stream {}", schema, stream); } + + typerDeduper.prepareFinalTables(); + log.info("Executing finalization of tables."); stagingOperations.executeTransaction(database, queryList); }; diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index ec49be79cb57..4cee3375ec63 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -8,7 +8,6 @@ import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.*; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions; -import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas; import static java.util.Collections.singleton; import io.airbyte.cdk.integrations.destination.StreamSyncSummary; @@ -43,7 +42,7 @@ *

* In a typical sync, destinations should call the methods: *

    - *
  1. {@link #prepareTables()} once at the start of the sync
  2. + *
  3. {@link #prepareFinalTables()} once at the start of the sync
  4. *
  5. {@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync
  6. *
  7. {@link #commitFinalTables()} once at the end of the sync
  8. *
@@ -104,27 +103,23 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator, this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator()); } - private void prepareSchemas(final ParsedCatalog parsedCatalog) throws Exception { - prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler); + @Override + public void prepareSchemasAndRawTables() { + // Technically kind of weird to call this here, but it's the best place we have. + // Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas + // until prepareFinalTables... but it doesn't really matter. + TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog); + TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog); } @Override - public void prepareTables() throws Exception { + public void prepareFinalTables() throws Exception { if (overwriteStreamsWithTmpTable != null) { throw new IllegalStateException("Tables were already prepared."); } overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet(); LOGGER.info("Preparing tables"); - // This is intentionally not done in parallel to avoid rate limits in some destinations. - prepareSchemas(parsedCatalog); - - // TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables. - // unify the logic with current state of raw tables & final tables. This is done first before gather - // initial state to avoid recreating final tables later again. - final List> runMigrationsResult = - CompletableFutures.allOf(parsedCatalog.streams().stream().map(this::runMigrationsAsync).toList()).toCompletableFuture().join(); - getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult); final List initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams()); final List> prepareTablesFutureResult = CompletableFutures.allOf( initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join(); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java index f76bd2e07019..2276def0de96 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java @@ -6,22 +6,14 @@ import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads; -import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions; -import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas; import io.airbyte.cdk.integrations.destination.StreamSyncSummary; import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.HashSet; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; -import kotlin.NotImplementedError; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.concurrent.BasicThreadFactory; /** @@ -54,31 +46,14 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator, } @Override - public void prepareTables() throws Exception { - try { - log.info("Ensuring schemas exist for prepareTables with V1V2 migrations"); - prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler); - final Set>> prepareTablesTasks = new HashSet<>(); - for (final StreamConfig stream : parsedCatalog.streams()) { - prepareTablesTasks.add(CompletableFuture.supplyAsync(() -> { - // Migrate the Raw Tables if this is the first v2 sync after a v1 sync - try { - log.info("Migrating V1->V2 for stream {}", stream.id()); - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream); - log.info("Migrating V2 legacy for stream {}", stream.id()); - v2TableMigrator.migrateIfNecessary(stream); - return Optional.empty(); - } catch (final Exception e) { - return Optional.of(e); - } - }, executorService)); - } - CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join(); - reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n"); - } catch (NotImplementedError | NotImplementedException e) { - log.warn( - "Could not prepare schemas or tables because this is not implemented for this destination, this should not be required for this destination to succeed"); - } + public void prepareSchemasAndRawTables() { + TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog); + TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog); + } + + @Override + public void prepareFinalTables() { + log.info("Skipping prepareFinalTables"); } @Override diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java index af8529e3d2b2..88151485665a 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java @@ -11,10 +11,20 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +/** + * This class should be used while upgrading a destination from V1 to V2. V2 destinations should use + * {@link NoOpTyperDeduperWithV1V2Migrations} for disabling T+D, because it correctly handles + * various migration operations. + */ public class NoopTyperDeduper implements TyperDeduper { @Override - public void prepareTables() { + public void prepareSchemasAndRawTables() throws Exception { + + } + + @Override + public void prepareFinalTables() { } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java index 263c9a11742c..f31fabcc68c4 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java @@ -9,15 +9,53 @@ import java.util.Map; import java.util.concurrent.locks.Lock; +/* + * This class wants to do three separate things, but not all of them actually happen here right now: + * * A migration runner, which handles any changes in raw tables (#prepareSchemasAndRawTables) * A + * raw table creator, which creates any missing raw tables (currently handled in e.g. + * GeneralStagingFunctions.onStartFunction, BigQueryStagingConsumerFactory.onStartFunction, etc.) * + * A T+D runner, which manages the final tables (#prepareFinalTables, #typeAndDedupe, etc.) + * + * These would be injectable to the relevant locations, so that we can have: * DV2 destinations with + * T+D enabled (i.e. all three objects instantiated for real) * DV2 destinations with T+D disabled + * (i.e. noop T+D runner but the other two objects for real) * DV1 destinations (i.e. all three + * objects as noop) + * + * Even more ideally, we'd create an instance per stream, instead of having one instance for the + * entire sync. This would massively simplify all the state contained in our implementations - see + * DefaultTyperDeduper's pile of Sets and Maps. + * + * Unfortunately, it's just a pain to inject these objects to everywhere they need to be, and we'd + * need to refactor part of the async framework on top of that. There's an obvious overlap with the + * async framework's onStart function... which we should deal with eventually. + */ public interface TyperDeduper { + /** + * Does two things: Set up the schemas for the sync (both airbyte_internal and final table schemas), + * and execute any raw table migrations. These migrations might include: Upgrading v1 raw tables to + * v2, adding a column to the raw tables, etc. In general, this method shouldn't actually create the + * raw tables; the only exception is in the V1 -> V2 migration. + *

+ * This method should be called BEFORE creating raw tables, because the V1V2 migration might create + * the raw tables. + *

+ * This method may affect the behavior of {@link #prepareFinalTables()}. For example, modifying a + * raw table may require us to run a soft reset. However, we should defer that soft reset until + * {@link #prepareFinalTables()}. + */ + void prepareSchemasAndRawTables() throws Exception; + /** * Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be * the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is * empty) we write to a temporary final table, and swap it into the true final table at the end of * the sync. This is to prevent user downtime during a sync. + *

+ * This method should be called AFTER creating the raw tables, because it may run a soft reset + * (which requires the raw tables to exist). */ - void prepareTables() throws Exception; + void prepareFinalTables() throws Exception; /** * Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 8f56b1a81acb..59d829cb79b9 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -1,15 +1,72 @@ package io.airbyte.integrations.base.destination.typing_deduping +import com.google.common.collect.Streams +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst +import io.airbyte.commons.concurrency.CompletableFutures +import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutorService -/** - * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they - * exist in the Destination Database. - */ -fun prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) { - val rawSchema = parsedCatalog.streams.mapNotNull { it.id.rawNamespace } - val finalSchema = parsedCatalog.streams.mapNotNull { it.id.finalNamespace } - val createAllSchemasSql = rawSchema.union(finalSchema) - .map { sqlGenerator.createSchema(it) } - .toList() - destinationHandler.execute(Sql.concat(createAllSchemasSql)) + +class TyperDeduperUtil { + companion object { + + @JvmStatic + fun executeRawTableMigrations( + executorService: ExecutorService, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + v1V2Migrator: DestinationV1V2Migrator, + v2TableMigrator: V2TableMigrator, + parsedCatalog: ParsedCatalog + ) { + // TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables. + // unify the logic + // with current state of raw tables & final tables. This is done first before gather initial state + // to avoid recreating + // final tables later again. + val runMigrationsResult = + CompletableFutures.allOf(parsedCatalog.streams().stream() + .map { streamConfig -> runMigrationsAsync(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, streamConfig) } + .toList()).toCompletableFuture().join() + getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult) + } + + /** + * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they + * exist in the Destination Database. + */ + @JvmStatic + fun prepareSchemas( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + parsedCatalog: ParsedCatalog) { + val rawSchema = parsedCatalog.streams.stream().map { it.id.rawNamespace } + val finalSchema = parsedCatalog.streams.stream().map { it.id.finalNamespace } + val createAllSchemasSql = Streams.concat(rawSchema, finalSchema) + .filter(Objects::nonNull) + .distinct() + .map(sqlGenerator::createSchema) + .toList() + destinationHandler.execute(Sql.concat(createAllSchemasSql)) + } + + private fun runMigrationsAsync( + executorService: ExecutorService, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + v1V2Migrator: DestinationV1V2Migrator, + v2TableMigrator: V2TableMigrator, + streamConfig: StreamConfig): CompletionStage { + return CompletableFuture.runAsync({ + try { + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig) + v2TableMigrator.migrateIfNecessary(streamConfig) + } catch (e: java.lang.Exception) { + throw RuntimeException(e) + } + }, executorService) + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java index 916c0235722d..0753bb7e5a8c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java @@ -92,10 +92,11 @@ void setup() throws Exception { @Test void emptyDestination() throws Exception { initialStates.forEach(initialState -> when(initialState.isFinalTablePresent()).thenReturn(false)); - // when(destinationHandler.findExistingTable(any())).thenReturn(Optional.empty()); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); + + typerDeduper.prepareFinalTables(); verify(destinationHandler).execute(Sql.of("CREATE TABLE overwrite_ns.overwrite_stream")); verify(destinationHandler).execute(Sql.of("CREATE TABLE append_ns.append_stream")); verify(destinationHandler).execute(Sql.of("CREATE TABLE dedup_ns.dedup_stream")); @@ -126,8 +127,11 @@ void existingEmptyTable() throws Exception { when(initialState.isFinalTableEmpty()).thenReturn(true); when(initialState.isSchemaMismatch()).thenReturn(true); }); - typerDeduper.prepareTables(); + + typerDeduper.prepareSchemasAndRawTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); + + typerDeduper.prepareFinalTables(); verify(destinationHandler).execute(Sql.of("CREATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp")); verify(destinationHandler).execute(Sql.of("PREPARE append_ns.append_stream FOR SOFT RESET")); verify(destinationHandler).execute(Sql.of("UPDATE TABLE append_ns.append_stream_ab_soft_reset WITHOUT SAFER CASTING")); @@ -161,12 +165,14 @@ void existingEmptyTableMatchingSchema() throws Exception { initialStates.forEach(initialState -> { when(initialState.isFinalTablePresent()).thenReturn(true); when(initialState.isFinalTableEmpty()).thenReturn(true); - when(initialState.isSchemaMismatch()).thenReturn(true); + when(initialState.isSchemaMismatch()).thenReturn(false); }); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); clearInvocations(destinationHandler); + + typerDeduper.prepareFinalTables(); verify(destinationHandler, never()).execute(any()); } @@ -183,8 +189,10 @@ void existingNonemptyTable() throws Exception { when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.parse("2023-01-01T12:34:56Z")))); }); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); + + typerDeduper.prepareFinalTables(); // NB: We only create a tmp table for the overwrite stream, and do _not_ soft reset the existing // overwrite stream's table. @@ -228,10 +236,12 @@ void existingNonemptyTableMatchingSchema() throws Exception { when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.now()))); }); - typerDeduper.prepareTables(); + typerDeduper.prepareSchemasAndRawTables(); + verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); + + typerDeduper.prepareFinalTables(); // NB: We only create one tmp table here. // Also, we need to alter the existing _real_ table, not the tmp table! - verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); verify(destinationHandler).execute(Sql.of("CREATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp")); verifyNoMoreInteractions(ignoreStubs(destinationHandler)); } @@ -247,7 +257,7 @@ void nonexistentStream() { void failedSetup() throws Exception { doThrow(new RuntimeException("foo")).when(destinationHandler).execute(any()); - assertThrows(Exception.class, () -> typerDeduper.prepareTables()); + assertThrows(Exception.class, () -> typerDeduper.prepareFinalTables()); clearInvocations(destinationHandler); typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false); @@ -263,7 +273,8 @@ void failedSetup() throws Exception { @Test void noUnprocessedRecords() throws Exception { initialStates.forEach(initialState -> when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(false, Optional.empty()))); - typerDeduper.prepareTables(); + + typerDeduper.prepareFinalTables(); clearInvocations(destinationHandler); typerDeduper.typeAndDedupe(Map.of( @@ -286,7 +297,8 @@ void noUnprocessedRecords() throws Exception { void unprocessedRecords() throws Exception { initialStates.forEach(initialState -> when(initialState.initialRawTableState()) .thenReturn(new InitialRawTableState(true, Optional.of(Instant.parse("2023-01-23T12:34:56Z"))))); - typerDeduper.prepareTables(); + + typerDeduper.prepareFinalTables(); clearInvocations(destinationHandler); typerDeduper.typeAndDedupe(Map.of( From f946fb93cf968f025a8606dab72ddb305cd14296 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 28 Feb 2024 10:52:25 -0800 Subject: [PATCH 2/3] nit: rename method Signed-off-by: Gireesh Sreepathi --- .../destination/jdbc/JdbcBufferedConsumerFactory.java | 2 +- .../destination/staging/GeneralStagingFunctions.java | 2 +- .../typing_deduping/DefaultTyperDeduper.java | 2 +- .../NoOpTyperDeduperWithV1V2Migrations.java | 2 +- .../destination/typing_deduping/NoopTyperDeduper.java | 2 +- .../base/destination/typing_deduping/TyperDeduper.java | 2 +- .../typing_deduping/DefaultTyperDeduperTest.java | 10 +++++----- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index ce4be3856c24..8fd513423b0a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -161,7 +161,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database, final Collection writeConfigs, final TyperDeduper typerDeduper) { return () -> { - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size()); final List queryList = new ArrayList<>(); for (final WriteConfig writeConfig : writeConfigs) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java index dd59a80e7dd2..ef88ca3743c9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.java @@ -40,7 +40,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database, return () -> { log.info("Preparing raw tables in destination started for {} streams", writeConfigs.size()); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); // Create raw tables final List queryList = new ArrayList<>(); diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index 4cee3375ec63..2dbd9f1e8498 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -104,7 +104,7 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator, } @Override - public void prepareSchemasAndRawTables() { + public void prepareSchemasAndRunMigrations() { // Technically kind of weird to call this here, but it's the best place we have. // Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas // until prepareFinalTables... but it doesn't really matter. diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java index 2276def0de96..1d06b9a49b61 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java @@ -46,7 +46,7 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator, } @Override - public void prepareSchemasAndRawTables() { + public void prepareSchemasAndRunMigrations() { TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog); TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog); } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java index 88151485665a..6a312a72b515 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java @@ -19,7 +19,7 @@ public class NoopTyperDeduper implements TyperDeduper { @Override - public void prepareSchemasAndRawTables() throws Exception { + public void prepareSchemasAndRunMigrations() throws Exception { } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java index f31fabcc68c4..37d34643b720 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduper.java @@ -44,7 +44,7 @@ public interface TyperDeduper { * raw table may require us to run a soft reset. However, we should defer that soft reset until * {@link #prepareFinalTables()}. */ - void prepareSchemasAndRawTables() throws Exception; + void prepareSchemasAndRunMigrations() throws Exception; /** * Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java index 0753bb7e5a8c..65f2c127f26e 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java @@ -93,7 +93,7 @@ void setup() throws Exception { void emptyDestination() throws Exception { initialStates.forEach(initialState -> when(initialState.isFinalTablePresent()).thenReturn(false)); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); typerDeduper.prepareFinalTables(); @@ -128,7 +128,7 @@ void existingEmptyTable() throws Exception { when(initialState.isSchemaMismatch()).thenReturn(true); }); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); typerDeduper.prepareFinalTables(); @@ -168,7 +168,7 @@ void existingEmptyTableMatchingSchema() throws Exception { when(initialState.isSchemaMismatch()).thenReturn(false); }); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); clearInvocations(destinationHandler); @@ -189,7 +189,7 @@ void existingNonemptyTable() throws Exception { when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.parse("2023-01-01T12:34:56Z")))); }); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); typerDeduper.prepareFinalTables(); @@ -236,7 +236,7 @@ void existingNonemptyTableMatchingSchema() throws Exception { when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.of(Instant.now()))); }); - typerDeduper.prepareSchemasAndRawTables(); + typerDeduper.prepareSchemasAndRunMigrations(); verify(destinationHandler).execute(separately("CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns")); typerDeduper.prepareFinalTables(); From 7a3503817bb94b3f3958c203258d9093898bed00 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 28 Feb 2024 11:01:13 -0800 Subject: [PATCH 3/3] cdk version bump Signed-off-by: Gireesh Sreepathi --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f05d010ea701..ed366673ecff 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Add a getNamespace into TestDataHolder | | 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder | | 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. | | 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 8aef173dc819..c40a8721d426 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.23.6 +version=0.23.7