diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index d982ff531bd3..e4e8a8f01c44 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. | | 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. | | 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. | | 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 55d88e2da2a2..9af4b1f631da 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.20.9 \ No newline at end of file +version=0.21.0 diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopier.java deleted file mode 100644 index bdf669194a91..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc.copy.s3; - -import com.amazonaws.services.s3.AmazonS3; -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.cdk.integrations.destination.s3.S3FormatConfig; -import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig; -import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvWriter; -import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; -import io.airbyte.cdk.integrations.destination.s3.util.CompressionType; -import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.io.IOException; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.commons.csv.CSVFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class S3StreamCopier implements StreamCopier { - - private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamCopier.class); - - private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default. - private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS; - - protected final AmazonS3 s3Client; - protected final S3DestinationConfig s3Config; - protected final String tmpTableName; - protected final String schemaName; - protected final String streamName; - protected final JdbcDatabase db; - protected final ConfiguredAirbyteStream configuredAirbyteStream; - protected final String stagingFolder; - protected final Map stagingWritersByFile = new HashMap<>(); - private final DestinationSyncMode destSyncMode; - private final StandardNameTransformer nameTransformer; - private final SqlOperations sqlOperations; - private final Timestamp uploadTime; - protected final Set activeStagingWriterFileNames = new HashSet<>(); - protected final Set stagingFileNames = new LinkedHashSet<>(); - private final boolean purgeStagingData; - - // The number of batches of records that will be inserted into each file. - private final int maxPartsPerFile; - // The number of batches inserted into the current file. - private int partsAddedToCurrentFile; - private String currentFile; - - public S3StreamCopier(final String stagingFolder, - final String schema, - final AmazonS3 client, - final JdbcDatabase db, - final S3CopyConfig config, - final StandardNameTransformer nameTransformer, - final SqlOperations sqlOperations, - final ConfiguredAirbyteStream configuredAirbyteStream, - final Timestamp uploadTime, - final int maxPartsPerFile) { - this.destSyncMode = configuredAirbyteStream.getDestinationSyncMode(); - this.schemaName = schema; - this.streamName = configuredAirbyteStream.getStream().getName(); - this.stagingFolder = stagingFolder; - this.db = db; - this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.configuredAirbyteStream = configuredAirbyteStream; - this.uploadTime = uploadTime; - this.tmpTableName = nameTransformer.getTmpTableName(this.streamName); - this.s3Client = client; - this.s3Config = config.s3Config(); - this.purgeStagingData = config.purgeStagingData(); - - this.maxPartsPerFile = maxPartsPerFile; - this.partsAddedToCurrentFile = 0; - } - - @Override - public String prepareStagingFile() { - if (partsAddedToCurrentFile == 0) { - - try { - // The Flattening value is actually ignored, because we pass an explicit CsvSheetGenerator. So just - // pass in null. - final S3FormatConfig csvFormatConfig = new S3CsvFormatConfig(null, CompressionType.NO_COMPRESSION); - final S3DestinationConfig writerS3Config = S3DestinationConfig.create(s3Config).withFormatConfig(csvFormatConfig).get(); - final S3CsvWriter writer = new S3CsvWriter.Builder( - writerS3Config, - s3Client, - configuredAirbyteStream, - uploadTime) - .uploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .csvSettings(CSVFormat.DEFAULT) - .withHeader(false) - .csvSheetGenerator(new StagingDatabaseCsvSheetGenerator()) - .build(); - currentFile = writer.getOutputPath(); - stagingWritersByFile.put(currentFile, writer); - activeStagingWriterFileNames.add(currentFile); - stagingFileNames.add(currentFile); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - partsAddedToCurrentFile = (partsAddedToCurrentFile + 1) % maxPartsPerFile; - return currentFile; - } - - @Override - public void write(final UUID id, final AirbyteRecordMessage recordMessage, final String filename) throws Exception { - if (stagingWritersByFile.containsKey(filename)) { - stagingWritersByFile.get(filename).write(id, recordMessage); - } - } - - @Override - public void closeNonCurrentStagingFileWriters() throws Exception { - final Set removedKeys = new HashSet<>(); - for (final String key : activeStagingWriterFileNames) { - if (!key.equals(currentFile)) { - stagingWritersByFile.get(key).close(false); - stagingWritersByFile.remove(key); - removedKeys.add(key); - } - } - activeStagingWriterFileNames.removeAll(removedKeys); - } - - @Override - public void closeStagingUploader(final boolean hasFailed) throws Exception { - for (final DestinationFileWriter writer : stagingWritersByFile.values()) { - writer.close(hasFailed); - } - } - - @Override - public void createDestinationSchema() throws Exception { - LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName); - sqlOperations.createSchemaIfNotExists(db, schemaName); - } - - @Override - public void createTemporaryTable() throws Exception { - LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName); - sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName); - } - - @Override - public void copyStagingFileToTemporaryTable() throws Exception { - LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - for (final String fileName : stagingFileNames) { - copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), fileName), schemaName, tmpTableName, s3Config); - } - LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); - } - - @Override - public String createDestinationTable() throws Exception { - final var destTableName = nameTransformer.getRawTableName(streamName); - LOGGER.info("Preparing table {} in destination.", destTableName); - sqlOperations.createTableIfNotExists(db, schemaName, destTableName); - LOGGER.info("Table {} in destination prepared.", tmpTableName); - - return destTableName; - } - - @Override - public String generateMergeStatement(final String destTableName) { - LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName); - final var queries = new StringBuilder(); - if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) { - queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName)); - LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName); - } - queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName)); - return queries.toString(); - } - - @Override - public void removeFileAndDropTmpTable() throws Exception { - if (purgeStagingData) { - for (final String fileName : stagingFileNames) { - s3Client.deleteObject(s3Config.getBucketName(), fileName); - LOGGER.info("S3 staging file {} cleaned.", fileName); - } - } - - LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); - sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); - LOGGER.info("{} tmp table in destination cleaned.", tmpTableName); - } - - @Override - public String getCurrentFile() { - return currentFile; - } - - protected static String getFullS3Path(final String s3BucketName, final String s3StagingFile) { - return String.join("/", "s3:/", s3BucketName, s3StagingFile); - } - - @VisibleForTesting - public String getTmpTableName() { - return tmpTableName; - } - - @VisibleForTesting - public Map getStagingWritersByFile() { - return stagingWritersByFile; - } - - @VisibleForTesting - public Set getStagingFiles() { - return stagingFileNames; - } - - public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, - String s3FileLocation, - String schema, - String tableName, - S3DestinationConfig s3Config) - throws SQLException; - -} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java deleted file mode 100644 index b9b94c72c329..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc.copy.s3; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopierFactory; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; - -public abstract class S3StreamCopierFactory implements StreamCopierFactory { - - /** - * Used by the copy consumer. - */ - @Override - public StreamCopier create(final String configuredSchema, - final S3CopyConfig config, - final String stagingFolder, - final ConfiguredAirbyteStream configuredStream, - final StandardNameTransformer nameTransformer, - final JdbcDatabase db, - final SqlOperations sqlOperations) { - try { - final AirbyteStream stream = configuredStream.getStream(); - final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); - final AmazonS3 s3Client = config.s3Config().getS3Client(); - - return create(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - /** - * For specific copier suppliers to implement. - */ - protected abstract StreamCopier create(String stagingFolder, - String schema, - AmazonS3 s3Client, - JdbcDatabase db, - S3CopyConfig config, - StandardNameTransformer nameTransformer, - SqlOperations sqlOperations, - ConfiguredAirbyteStream configuredStream) - throws Exception; - -} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java deleted file mode 100644 index 770643e875e4..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.destination.jdbc.copy.s3; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import com.amazonaws.services.s3.AmazonS3Client; -import com.google.common.collect.Lists; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.DestinationConfig; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator; -import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig; -import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvWriter; -import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; -import io.airbyte.cdk.integrations.destination.s3.util.CompressionType; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import org.apache.commons.csv.CSVFormat; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.MockedConstruction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class S3StreamCopierTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamCopierTest.class); - - private static final S3DestinationConfig S3_CONFIG = S3DestinationConfig.create( - "fake-bucket", - "fake-bucketPath", - "fake-region") - .withEndpoint("fake-endpoint") - .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .get(); - private static final ConfiguredAirbyteStream CONFIGURED_STREAM = new ConfiguredAirbyteStream() - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream(new AirbyteStream() - .withName("fake-stream") - .withNamespace("fake-namespace") - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH))); - private static final int UPLOAD_THREADS = 10; - private static final int QUEUE_CAPACITY = 10; - // equivalent to Thu, 09 Dec 2021 19:17:54 GMT - private static final Timestamp UPLOAD_TIME = Timestamp.from(Instant.ofEpochMilli(1639077474000L)); - private static final int MAX_PARTS_PER_FILE = 42; - - private AmazonS3Client s3Client; - private JdbcDatabase db; - private SqlOperations sqlOperations; - private S3StreamCopier copier; - - private MockedConstruction csvWriterMockedConstruction; - private List csvWriterConstructorArguments; - - private List copyArguments; - - private record S3CsvWriterArguments(S3DestinationConfig config, - ConfiguredAirbyteStream stream, - Timestamp uploadTime, - int uploadThreads, - int queueCapacity, - boolean writeHeader, - CSVFormat csvSettings, - CsvSheetGenerator csvSheetGenerator) { - - } - - private record CopyArguments(JdbcDatabase database, - String s3FileLocation, - String schema, - String tableName, - S3DestinationConfig s3Config) { - - } - - @BeforeEach - public void setup() { - DestinationConfig.initialize(Jsons.emptyObject()); - - s3Client = mock(AmazonS3Client.class); - db = mock(JdbcDatabase.class); - sqlOperations = mock(SqlOperations.class); - - csvWriterConstructorArguments = new ArrayList<>(); - copyArguments = new ArrayList<>(); - - // This is basically RETURNS_SELF, except with getMultiPartOutputStreams configured correctly. - // Other non-void methods (e.g. toString()) will return null. - csvWriterMockedConstruction = mockConstruction( - S3CsvWriter.class, - (mock, context) -> { - // Normally, the S3CsvWriter would return a path that ends in a UUID, but this mock will generate an - // int ID to make our asserts easier. - doReturn(String.format("fakeOutputPath-%05d", csvWriterConstructorArguments.size())).when(mock).getOutputPath(); - - // Mockito doesn't seem to provide an easy way to actually retrieve these arguments later on, so - // manually store them on construction. - // _PowerMockito_ does, but I didn't want to set up that additional dependency. - final List arguments = context.arguments(); - csvWriterConstructorArguments.add(new S3CsvWriterArguments( - (S3DestinationConfig) arguments.get(0), - (ConfiguredAirbyteStream) arguments.get(2), - (Timestamp) arguments.get(3), - (int) arguments.get(4), - (int) arguments.get(5), - (boolean) arguments.get(6), - (CSVFormat) arguments.get(7), - (CsvSheetGenerator) arguments.get(8))); - }); - - copier = new S3StreamCopier( - // In reality, this is normally a UUID - see CopyConsumerFactory#createWriteConfigs - "fake-staging-folder", - "fake-schema", - s3Client, - db, - new S3CopyConfig(true, S3_CONFIG), - new StandardNameTransformer(), - sqlOperations, - CONFIGURED_STREAM, - UPLOAD_TIME, - MAX_PARTS_PER_FILE) { - - @Override - public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3DestinationConfig s3Config) { - copyArguments.add(new CopyArguments(database, s3FileLocation, schema, tableName, s3Config)); - } - - }; - } - - @AfterEach - public void teardown() { - csvWriterMockedConstruction.close(); - } - - @Test - public void createSequentialStagingFiles_when_multipleFilesRequested() { - // When we call prepareStagingFile() the first time, it should create exactly one S3CsvWriter. The - // next (MAX_PARTS_PER_FILE - 1) invocations - // should reuse that same writer. - for (var i = 0; i < MAX_PARTS_PER_FILE; i++) { - final String file = copier.prepareStagingFile(); - assertEquals("fakeOutputPath-00000", file, "preparing file number " + i); - assertEquals(1, csvWriterMockedConstruction.constructed().size()); - checkCsvWriterArgs(csvWriterConstructorArguments.get(0)); - } - - // Now that we've hit the MAX_PARTS_PER_FILE, we should start a new writer - final String secondFile = copier.prepareStagingFile(); - assertEquals("fakeOutputPath-00001", secondFile); - final List secondManagers = csvWriterMockedConstruction.constructed(); - assertEquals(2, secondManagers.size()); - checkCsvWriterArgs(csvWriterConstructorArguments.get(1)); - } - - private void checkCsvWriterArgs(final S3CsvWriterArguments args) { - final S3DestinationConfig s3Config = S3DestinationConfig.create(S3_CONFIG) - .withFormatConfig(new S3CsvFormatConfig(null, CompressionType.NO_COMPRESSION)) - .get(); - assertEquals(s3Config, args.config); - assertEquals(CONFIGURED_STREAM, args.stream); - assertEquals(UPLOAD_TIME, args.uploadTime); - assertEquals(UPLOAD_THREADS, args.uploadThreads); - assertEquals(QUEUE_CAPACITY, args.queueCapacity); - assertFalse(args.writeHeader); - assertEquals(CSVFormat.DEFAULT, args.csvSettings); - assertTrue( - args.csvSheetGenerator instanceof StagingDatabaseCsvSheetGenerator, - "Sheet generator was actually a " + args.csvSheetGenerator.getClass()); - } - - @Test - public void closesS3Upload_when_stagingUploaderClosedSuccessfully() throws Exception { - copier.prepareStagingFile(); - - copier.closeStagingUploader(false); - - final List managers = csvWriterMockedConstruction.constructed(); - final S3CsvWriter manager = managers.get(0); - verify(manager).close(false); - } - - @Test - public void closesS3Upload_when_stagingUploaderClosedFailingly() throws Exception { - copier.prepareStagingFile(); - - copier.closeStagingUploader(true); - - final List managers = csvWriterMockedConstruction.constructed(); - final S3CsvWriter manager = managers.get(0); - verify(manager).close(true); - } - - @Test - public void deletesStagingFiles() throws Exception { - copier.prepareStagingFile(); - doReturn(true).when(s3Client).doesObjectExist("fake-bucket", "fakeOutputPath-00000"); - - copier.removeFileAndDropTmpTable(); - - verify(s3Client).deleteObject("fake-bucket", "fakeOutputPath-00000"); - } - - @Test - public void doesNotDeleteStagingFiles_if_purgeStagingDataDisabled() throws Exception { - copier = new S3StreamCopier( - "fake-staging-folder", - "fake-schema", - s3Client, - db, - // Explicitly disable purgeStagingData - new S3CopyConfig(false, S3_CONFIG), - new StandardNameTransformer(), - sqlOperations, - CONFIGURED_STREAM, - UPLOAD_TIME, - MAX_PARTS_PER_FILE) { - - @Override - public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3DestinationConfig s3Config) { - copyArguments.add(new CopyArguments(database, s3FileLocation, schema, tableName, s3Config)); - } - - }; - - copier.prepareStagingFile(); - doReturn(true).when(s3Client).doesObjectExist("fake-bucket", "fakeOutputPath-00000"); - - copier.removeFileAndDropTmpTable(); - - verify(s3Client, never()).deleteObject("fake-bucket", "fakeOutputPath-00000"); - } - - @Test - public void copiesCorrectFilesToTable() throws Exception { - // Generate two files - for (int i = 0; i < MAX_PARTS_PER_FILE + 1; i++) { - copier.prepareStagingFile(); - } - - copier.copyStagingFileToTemporaryTable(); - - assertEquals(2, copyArguments.size(), "Number of invocations was actually " + copyArguments.size() + ". Arguments were " + copyArguments); - - // S3StreamCopier operates on these from a HashMap, so need to sort them in order to assert in a - // sane way. - final List sortedArgs = copyArguments.stream().sorted(Comparator.comparing(arg -> arg.s3FileLocation)).toList(); - for (int i = 0; i < sortedArgs.size(); i++) { - LOGGER.info("Checking arguments for index {}", i); - final CopyArguments args = sortedArgs.get(i); - assertEquals(String.format("s3://fake-bucket/fakeOutputPath-%05d", i), args.s3FileLocation); - assertEquals("fake-schema", args.schema); - assertTrue(args.tableName.endsWith("fake_stream"), "Table name was actually " + args.tableName); - } - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopier.java deleted file mode 100644 index 3a8f801c4689..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopier.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.copiers; - -import com.amazonaws.services.s3.AmazonS3; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.jdbc.copy.s3.S3CopyConfig; -import io.airbyte.cdk.integrations.destination.jdbc.copy.s3.S3StreamCopier; -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.cdk.integrations.destination.s3.credential.S3AccessKeyCredentialConfig; -import io.airbyte.commons.lang.Exceptions; -import io.airbyte.integrations.destination.redshift.manifest.Entry; -import io.airbyte.integrations.destination.redshift.manifest.Manifest; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RedshiftStreamCopier extends S3StreamCopier { - - private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStreamCopier.class); - // From https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html - // "Split your load data files so that the files are about equal size, between 1 MB and 1 GB after - // compression" - public static final int MAX_PARTS_PER_FILE = 4; - - private final ObjectMapper objectMapper; - private String manifestFilePath = null; - - public RedshiftStreamCopier(final String stagingFolder, - final String schema, - final AmazonS3 client, - final JdbcDatabase db, - final S3CopyConfig config, - final StandardNameTransformer nameTransformer, - final SqlOperations sqlOperations, - final ConfiguredAirbyteStream configuredAirbyteStream) { - this( - stagingFolder, - schema, - client, - db, - config, - nameTransformer, - sqlOperations, - Timestamp.from(Instant.now()), - configuredAirbyteStream); - } - - @VisibleForTesting - RedshiftStreamCopier(final String stagingFolder, - final String schema, - final AmazonS3 client, - final JdbcDatabase db, - final S3CopyConfig config, - final StandardNameTransformer nameTransformer, - final SqlOperations sqlOperations, - final Timestamp uploadTime, - final ConfiguredAirbyteStream configuredAirbyteStream) { - super(stagingFolder, - schema, - client, - db, - config, - nameTransformer, - sqlOperations, - configuredAirbyteStream, - uploadTime, - MAX_PARTS_PER_FILE); - objectMapper = new ObjectMapper(); - } - - @Override - public void copyStagingFileToTemporaryTable() { - final var possibleManifest = Optional.ofNullable(createManifest()); - LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - possibleManifest.stream() - .map(this::putManifest) - .forEach(this::executeCopy); - LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); - } - - @Override - public void copyS3CsvFileIntoTable(final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3DestinationConfig s3Config) { - throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest"); - } - - @Override - public void removeFileAndDropTmpTable() throws Exception { - super.removeFileAndDropTmpTable(); - if (manifestFilePath != null) { - LOGGER.info("Begin cleaning s3 manifest file {}.", manifestFilePath); - if (s3Client.doesObjectExist(s3Config.getBucketName(), manifestFilePath)) { - s3Client.deleteObject(s3Config.getBucketName(), manifestFilePath); - } - LOGGER.info("S3 manifest file {} cleaned.", manifestFilePath); - } - } - - /** - * Creates the contents of a manifest file given the `s3StagingFiles`. There must be at least one - * entry in a manifest file otherwise it is not considered valid for the COPY command. - * - * @return null if no stagingFiles exist otherwise the manifest body String - */ - private String createManifest() { - if (getStagingFiles().isEmpty()) { - return null; - } - - final var s3FileEntries = getStagingFiles().stream() - .map(filePath -> new Entry(getFullS3Path(s3Config.getBucketName(), filePath))) - .collect(Collectors.toList()); - final var manifest = new Manifest(s3FileEntries); - - return Exceptions.toRuntime(() -> objectMapper.writeValueAsString(manifest)); - } - - /** - * Upload the supplied manifest file to S3 - * - * @param manifestContents the manifest contents, never null - * @return the path where the manifest file was placed in S3 - */ - private String putManifest(final String manifestContents) { - manifestFilePath = - String.join("/", s3Config.getBucketPath(), stagingFolder, schemaName, String.format("%s.manifest", UUID.randomUUID())); - - s3Client.putObject(s3Config.getBucketName(), manifestFilePath, manifestContents); - - return manifestFilePath; - } - - /** - * Run Redshift COPY command with the given manifest file - * - * @param manifestPath the path in S3 to the manifest file - */ - private void executeCopy(final String manifestPath) { - final S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) s3Config.getS3CredentialConfig(); - final var copyQuery = String.format( - "COPY %s.%s FROM '%s'\n" - + "CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s'\n" - + "CSV REGION '%s' TIMEFORMAT 'auto'\n" - + "STATUPDATE OFF\n" - + "MANIFEST;", - schemaName, - tmpTableName, - getFullS3Path(s3Config.getBucketName(), manifestPath), - credentialConfig.getAccessKeyId(), - credentialConfig.getSecretAccessKey(), - s3Config.getBucketRegion()); - - Exceptions.toRuntime(() -> db.execute(copyQuery)); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierFactory.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierFactory.java deleted file mode 100644 index 5527002288bc..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.copiers; - -import com.amazonaws.services.s3.AmazonS3; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier; -import io.airbyte.cdk.integrations.destination.jdbc.copy.s3.S3CopyConfig; -import io.airbyte.cdk.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; - -/** - * Very similar to the {@link S3StreamCopierFactory}, but we need some additional - */ -public class RedshiftStreamCopierFactory extends S3StreamCopierFactory { - - @Override - public StreamCopier create(final String stagingFolder, - final String schema, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3CopyConfig config, - final StandardNameTransformer nameTransformer, - final SqlOperations sqlOperations, - final ConfiguredAirbyteStream configuredStream) { - return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java deleted file mode 100644 index 5c029abc5e58..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/copiers/RedshiftStreamCopierTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.copiers; - -import static java.util.Comparator.comparing; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import com.amazonaws.services.s3.AmazonS3Client; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.DestinationConfig; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.cdk.integrations.destination.jdbc.copy.s3.S3CopyConfig; -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class RedshiftStreamCopierTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftStreamCopierTest.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - // The full path would be something like - // "fake-namespace/fake_stream/2021_12_09_1639077474000_e549e712-b89c-4272-9496-9690ba7f973e.csv" - // The namespace and stream have their hyphens replaced by underscores. Not super clear that that's - // actually required. - // 2021_12_09_1639077474000 is generated from the timestamp. It's followed by a random UUID, in case - // we need to create multiple files. - private static final String EXPECTED_OBJECT_BEGINNING = "fake-bucketPath/fake_namespace/fake_stream/2021_12_09_1639077474000_"; - private static final String EXPECTED_OBJECT_ENDING = ".csv"; - - // equivalent to Thu, 09 Dec 2021 19:17:54 GMT - private static final Timestamp UPLOAD_TIME = Timestamp.from(Instant.ofEpochMilli(1639077474000L)); - - private AmazonS3Client s3Client; - private JdbcDatabase db; - private SqlOperations sqlOperations; - private RedshiftStreamCopier copier; - - @BeforeEach - public void setup() { - DestinationConfig.initialize(Jsons.emptyObject()); - s3Client = mock(AmazonS3Client.class, RETURNS_DEEP_STUBS); - db = mock(JdbcDatabase.class); - sqlOperations = mock(SqlOperations.class); - - final S3DestinationConfig s3Config = S3DestinationConfig.create( - "fake-bucket", - "fake-bucketPath", - "fake-region") - .withEndpoint("fake-endpoint") - .withAccessKeyCredential("fake-access-key-id", "fake-secret-access-key") - .get(); - - copier = new RedshiftStreamCopier( - // In reality, this is normally a UUID - see CopyConsumerFactory#createWriteConfigs - "fake-staging-folder", - "fake-schema", - s3Client, - db, - new S3CopyConfig(true, s3Config), - new StandardNameTransformer(), - sqlOperations, - UPLOAD_TIME, - new ConfiguredAirbyteStream() - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream(new AirbyteStream() - .withName("fake-stream") - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH)) - .withNamespace("fake-namespace"))); - } - - @Test - public void copiesCorrectFilesToTable() throws SQLException { - // Generate two files - final String file1 = copier.prepareStagingFile(); - for (int i = 0; i < RedshiftStreamCopier.MAX_PARTS_PER_FILE - 1; i++) { - copier.prepareStagingFile(); - } - final String file2 = copier.prepareStagingFile(); - final List expectedFiles = List.of(file1, file2).stream().sorted().toList(); - - copier.copyStagingFileToTemporaryTable(); - - final AtomicReference manifestUuid = new AtomicReference<>(); - verify(s3Client).putObject( - eq("fake-bucket"), - argThat(path -> { - final boolean startsCorrectly = path.startsWith("fake-bucketPath/fake-staging-folder/fake-schema/"); - final boolean endsCorrectly = path.endsWith(".manifest"); - // Make sure that we have a valid UUID - manifestUuid.set(path.replaceFirst("^fake-bucketPath/fake-staging-folder/fake-schema/", "").replaceFirst(".manifest$", "")); - UUID.fromString(manifestUuid.get()); - - return startsCorrectly && endsCorrectly; - }), - (String) argThat(manifestStr -> { - try { - final JsonNode manifest = OBJECT_MAPPER.readTree((String) manifestStr); - final List entries = Lists.newArrayList(manifest.get("entries").elements()).stream() - .sorted(comparing(entry -> entry.get("url").asText())).toList(); - - boolean entriesAreCorrect = true; - for (int i = 0; i < 2; i++) { - final String expectedFilename = expectedFiles.get(i); - final JsonNode manifestEntry = entries.get(i); - entriesAreCorrect &= isManifestEntryCorrect(manifestEntry, expectedFilename); - if (!entriesAreCorrect) { - LOGGER.error("Invalid entry: {}", manifestEntry); - } - } - - return entriesAreCorrect && entries.size() == 2; - } catch (final JsonProcessingException e) { - throw new RuntimeException(e); - } - })); - - verify(db).execute(String.format( - """ - COPY fake-schema.%s FROM 's3://fake-bucket/fake-bucketPath/fake-staging-folder/fake-schema/%s.manifest' - CREDENTIALS 'aws_access_key_id=fake-access-key-id;aws_secret_access_key=fake-secret-access-key' - CSV REGION 'fake-region' TIMEFORMAT 'auto' - STATUPDATE OFF - MANIFEST;""", - copier.getTmpTableName(), - manifestUuid.get())); - } - - private static boolean isManifestEntryCorrect(final JsonNode entry, final String expectedFilename) { - final String url = entry.get("url").asText(); - final boolean mandatory = entry.get("mandatory").asBoolean(); - - return ("s3://fake-bucket/" + expectedFilename).equals(url) && mandatory; - } - -}