Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DV2 TyperDeduper: Extract migrations to separate method #35376

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Add a getNamespace into TestDataHolder |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.6
version=0.23.7
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
final Collection<WriteConfig> writeConfigs,
final TyperDeduper typerDeduper) {
return () -> {
typerDeduper.prepareTables();
typerDeduper.prepareSchemasAndRunMigrations();
LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
for (final WriteConfig writeConfig : writeConfigs) {
Expand All @@ -181,6 +181,7 @@ private static OnStartFunction onStartFunction(final JdbcDatabase database,
}
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Preparing raw tables in destination completed.");
typerDeduper.prepareFinalTables();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
final TyperDeduper typerDeduper) {
return () -> {
log.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareTables();

typerDeduper.prepareSchemasAndRunMigrations();

// Create raw tables
final List<String> queryList = new ArrayList<>();
for (final WriteConfig writeConfig : writeConfigs) {
final String schema = writeConfig.getOutputSchemaName();
Expand Down Expand Up @@ -69,6 +72,9 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,

log.info("Preparing staging area in destination completed for schema {} stream {}", schema, stream);
}

typerDeduper.prepareFinalTables();

log.info("Executing finalization of tables.");
stagingOperations.executeTransaction(database, queryList);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.*;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas;
import static java.util.Collections.singleton;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
Expand Down Expand Up @@ -43,7 +42,7 @@
* <p>
* In a typical sync, destinations should call the methods:
* <ol>
* <li>{@link #prepareTables()} once at the start of the sync</li>
* <li>{@link #prepareFinalTables()} once at the start of the sync</li>
* <li>{@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
Expand Down Expand Up @@ -104,27 +103,23 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator());
}

private void prepareSchemas(final ParsedCatalog parsedCatalog) throws Exception {
prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler);
@Override
public void prepareSchemasAndRunMigrations() {
// Technically kind of weird to call this here, but it's the best place we have.
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
// until prepareFinalTables... but it doesn't really matter.
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
}

@Override
public void prepareTables() throws Exception {
public void prepareFinalTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
}
overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet();
LOGGER.info("Preparing tables");

// This is intentionally not done in parallel to avoid rate limits in some destinations.
prepareSchemas(parsedCatalog);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call now happens in prepareSchemasAndRawTables


// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic now happens in TyperDeduperUtil.executeRawTableMigrations

// unify the logic with current state of raw tables & final tables. This is done first before gather
// initial state to avoid recreating final tables later again.
final List<Either<? extends Exception, Void>> runMigrationsResult =
CompletableFutures.allOf(parsedCatalog.streams().stream().map(this::runMigrationsAsync).toList()).toCompletableFuture().join();
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult);
final List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
final List<Either<? extends Exception, Void>> prepareTablesFutureResult = CompletableFutures.allOf(
initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,14 @@

import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import kotlin.NotImplementedError;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

/**
Expand Down Expand Up @@ -54,31 +46,14 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
}

@Override
public void prepareTables() throws Exception {
try {
log.info("Ensuring schemas exist for prepareTables with V1V2 migrations");
prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler);
final Set<CompletableFuture<Optional<Exception>>> prepareTablesTasks = new HashSet<>();
for (final StreamConfig stream : parsedCatalog.streams()) {
prepareTablesTasks.add(CompletableFuture.supplyAsync(() -> {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
try {
log.info("Migrating V1->V2 for stream {}", stream.id());
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
log.info("Migrating V2 legacy for stream {}", stream.id());
v2TableMigrator.migrateIfNecessary(stream);
return Optional.empty();
} catch (final Exception e) {
return Optional.of(e);
}
}, executorService));
}
CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join();
reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n");
} catch (NotImplementedError | NotImplementedException e) {
log.warn(
"Could not prepare schemas or tables because this is not implemented for this destination, this should not be required for this destination to succeed");
}
public void prepareSchemasAndRunMigrations() {
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
}

@Override
public void prepareFinalTables() {
log.info("Skipping prepareFinalTables");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* This class should be used while upgrading a destination from V1 to V2. V2 destinations should use
* {@link NoOpTyperDeduperWithV1V2Migrations} for disabling T+D, because it correctly handles
* various migration operations.
*/
public class NoopTyperDeduper implements TyperDeduper {

@Override
public void prepareTables() {
public void prepareSchemasAndRunMigrations() throws Exception {

}

@Override
public void prepareFinalTables() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,53 @@
import java.util.Map;
import java.util.concurrent.locks.Lock;

/*
* This class wants to do three separate things, but not all of them actually happen here right now:
* * A migration runner, which handles any changes in raw tables (#prepareSchemasAndRawTables) * A
* raw table creator, which creates any missing raw tables (currently handled in e.g.
* GeneralStagingFunctions.onStartFunction, BigQueryStagingConsumerFactory.onStartFunction, etc.) *
* A T+D runner, which manages the final tables (#prepareFinalTables, #typeAndDedupe, etc.)
*
* These would be injectable to the relevant locations, so that we can have: * DV2 destinations with
* T+D enabled (i.e. all three objects instantiated for real) * DV2 destinations with T+D disabled
* (i.e. noop T+D runner but the other two objects for real) * DV1 destinations (i.e. all three
* objects as noop)
*
* Even more ideally, we'd create an instance per stream, instead of having one instance for the
* entire sync. This would massively simplify all the state contained in our implementations - see
* DefaultTyperDeduper's pile of Sets and Maps.
*
* Unfortunately, it's just a pain to inject these objects to everywhere they need to be, and we'd
* need to refactor part of the async framework on top of that. There's an obvious overlap with the
* async framework's onStart function... which we should deal with eventually.
*/
public interface TyperDeduper {

/**
* Does two things: Set up the schemas for the sync (both airbyte_internal and final table schemas),
* and execute any raw table migrations. These migrations might include: Upgrading v1 raw tables to
* v2, adding a column to the raw tables, etc. In general, this method shouldn't actually create the
* raw tables; the only exception is in the V1 -> V2 migration.
* <p>
* This method should be called BEFORE creating raw tables, because the V1V2 migration might create
* the raw tables.
* <p>
* This method may affect the behavior of {@link #prepareFinalTables()}. For example, modifying a
* raw table may require us to run a soft reset. However, we should defer that soft reset until
* {@link #prepareFinalTables()}.
*/
void prepareSchemasAndRunMigrations() throws Exception;

/**
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
* <p>
* This method should be called AFTER creating the raw tables, because it may run a soft reset
* (which requires the raw tables to exist).
*/
void prepareTables() throws Exception;
void prepareFinalTables() throws Exception;

/**
* Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,72 @@
package io.airbyte.integrations.base.destination.typing_deduping

import com.google.common.collect.Streams
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
import io.airbyte.commons.concurrency.CompletableFutures
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutorService

/**
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
* exist in the Destination Database.
*/
fun prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) {
val rawSchema = parsedCatalog.streams.mapNotNull { it.id.rawNamespace }
val finalSchema = parsedCatalog.streams.mapNotNull { it.id.finalNamespace }
val createAllSchemasSql = rawSchema.union(finalSchema)
.map { sqlGenerator.createSchema(it) }
.toList()
destinationHandler.execute(Sql.concat(createAllSchemasSql))

class TyperDeduperUtil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is partly taken from @jbfbell 's https://github.com/airbytehq/airbyte/pull/34637/files#diff-c9e7bf1074e965f63b135422dd2a7ac6ba196046b226a0ffe3a855a4eb82ed3b

it'll have some merge conflicts b/c I reordered the arguments in prepareAllSchemas but 🤷 just wanted to be consistent across the methods, and I generally prefer infra-ish params (executorservice, destinationhandler, etc.) to come before functional-ish params (parsedcatalog, streamconfig, etc.)

companion object {

@JvmStatic
fun executeRawTableMigrations(
executorService: ExecutorService,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
v1V2Migrator: DestinationV1V2Migrator,
v2TableMigrator: V2TableMigrator,
parsedCatalog: ParsedCatalog
) {
// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables.
// unify the logic
// with current state of raw tables & final tables. This is done first before gather initial state
// to avoid recreating
// final tables later again.
val runMigrationsResult =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as noted above - this code originally lived in DefaultTyperDeduper, I just extracted it to a separate class.

CompletableFutures.allOf(parsedCatalog.streams().stream()
.map { streamConfig -> runMigrationsAsync(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, streamConfig) }
.toList()).toCompletableFuture().join()
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult)
}

/**
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
* exist in the Destination Database.
*/
@JvmStatic
fun prepareSchemas(
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
parsedCatalog: ParsedCatalog) {
val rawSchema = parsedCatalog.streams.stream().map { it.id.rawNamespace }
val finalSchema = parsedCatalog.streams.stream().map { it.id.finalNamespace }
val createAllSchemasSql = Streams.concat<String>(rawSchema, finalSchema)
.filter(Objects::nonNull)
.distinct()
.map(sqlGenerator::createSchema)
.toList()
destinationHandler.execute(Sql.concat(createAllSchemasSql))
}

private fun runMigrationsAsync(
executorService: ExecutorService,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler,
v1V2Migrator: DestinationV1V2Migrator,
v2TableMigrator: V2TableMigrator,
streamConfig: StreamConfig): CompletionStage<Void> {
return CompletableFuture.runAsync({
try {
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig)
v2TableMigrator.migrateIfNecessary(streamConfig)
} catch (e: java.lang.Exception) {
throw RuntimeException(e)
}
}, executorService)
}
}
}
Loading
Loading