Skip to content

Commit

Permalink
update redshift to latest cdk
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 26, 2024
1 parent 2067931 commit 9098002
Show file tree
Hide file tree
Showing 30 changed files with 55 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.29.12'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(super.getNamingResolver());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(getNamingResolver());
}

Expand Down Expand Up @@ -271,7 +271,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true)
JavaBaseConstants.DestinationColumns.V2_WITH_META)
.setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace))
.build()
.createAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public String uploadRecordsToStage(final JdbcDatabase database,

private String putManifest(final String manifestContents, final String stagingPath) {
final String manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID());
s3StorageOperations.uploadManifest(s3Config.getBucketName(), manifestFilePath, manifestContents);
s3StorageOperations.uploadManifest(manifestFilePath, manifestContents);
return manifestFilePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RedshiftRawTableAirbyteMetaMigration(
"Executing RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} for real"
)
destinationHandler.execute(
getRawTableMetaColumnAddDdl(stream.id.rawNamespace!!, stream.id.rawName!!)
getRawTableMetaColumnAddDdl(stream.id.rawNamespace, stream.id.rawName)
)

// Update the state. We didn't modify the table in a relevant way, so don't invalidate the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ protected SQLDialect getDialect() {
*/

@Override
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias, final boolean useExpensiveSaferCasting) {
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final boolean useExpensiveSaferCasting) {
if (type instanceof final AirbyteProtocolType airbyteProtocolType) {
switch (airbyteProtocolType) {
case STRING -> {
return field(CASE_STATEMENT_SQL_TEMPLATE,
jsonTypeOf(field).ne("string").and(field.isNotNull()),
jsonSerialize(field),
castedField(field, airbyteProtocolType, useExpensiveSaferCasting)).as(quotedName(alias));
castedField(field, airbyteProtocolType, useExpensiveSaferCasting));
}
default -> {
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias));
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting);
}
}

Expand All @@ -117,12 +117,12 @@ protected Field<?> castedField(final Field<?> field, final AirbyteType type, fin
return switch (type.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE -> field(CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE,
jsonTypeOf(field).eq("object"),
cast(field, getStructType())).as(quotedName(alias));
cast(field, getStructType()));
case Array.TYPE -> field(CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE,
jsonTypeOf(field).eq("array"),
cast(field, getArrayType())).as(quotedName(alias));
cast(field, getArrayType()));
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting);
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), useExpensiveSaferCasting);
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type);
};
}
Expand All @@ -133,11 +133,11 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
.entrySet()
.stream()
.map(column -> castedField(
field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())),
column.getValue(),
column.getKey().getName(),
useExpensiveSaferCasting))
.collect(Collectors.toList());
field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())),
column.getValue(),
useExpensiveSaferCasting
).as(column.getKey().getName())
).collect(Collectors.toList());
}

private Field<String> jsonTypeOf(final Field<?> field) {
Expand Down Expand Up @@ -176,7 +176,7 @@ Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
// supports regex functions.
return field(CASE_STATEMENT_SQL_TEMPLATE,
field.isNotNull().and(castedField(field, type, column.getName(), true).isNull()),
field.isNotNull().and(castedField(field, type, true).as(column.getName()).isNull()),
function("ARRAY", getSuperType(),
function("JSON_PARSE", getSuperType(), val(
"{\"field\": \"" + column.getName() + "\", "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination;
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
import java.nio.file.Files;
Expand All @@ -38,6 +39,9 @@
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
Expand All @@ -48,6 +52,11 @@

public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<RedshiftState> {

@Override
protected boolean getSupportsSafeCast() {
return true;
}

/**
* Redshift's JDBC driver doesn't map certain data types onto {@link java.sql.JDBCType} usefully.
* This class adds special handling for those types.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an explicit null, but we should handle both cases.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
// Emit a record with an invalid age & address nulled at source.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}}
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}}
// Emit a record with interesting characters in one of the values.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Charlie wasn't re-emitted with updated_at, so it still has a null cursor
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 200, "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
Expand Down
Loading

0 comments on commit 9098002

Please sign in to comment.