Skip to content

Commit

Permalink
refactor(CatalogHelpers): remove catalog diff helper methods (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroslopez authored May 7, 2024
1 parent b93c4fd commit 9d3d67d
Show file tree
Hide file tree
Showing 23 changed files with 0 additions and 1,170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldSchemaTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
Expand Down Expand Up @@ -331,152 +324,4 @@ private static boolean isObjectWithSubFields(final Field field) {
return field.getType().equals(JsonSchemaType.OBJECT) && field.getSubFields() != null
&& !field.getSubFields().isEmpty();
}

public static StreamDescriptor extractStreamDescriptor(final AirbyteStream airbyteStream) {
return new StreamDescriptor().withName(airbyteStream.getName())
.withNamespace(airbyteStream.getNamespace());
}

private static Map<StreamDescriptor, AirbyteStream> streamDescriptorToMap(
final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.collect(Collectors.toMap(CatalogHelpers::extractStreamDescriptor, s -> s));
}

/**
* Returns difference between two provided catalogs.
*
* @param oldCatalog - old catalog
* @param newCatalog - new catalog
* @return difference between old and new catalogs
*/
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog,
final AirbyteCatalog newCatalog,
final ConfiguredAirbyteCatalog configuredCatalog) {
final Set<StreamTransform> streamTransforms = new HashSet<>();

final Map<StreamDescriptor, AirbyteStream> descriptorToStreamOld = streamDescriptorToMap(
oldCatalog);
final Map<StreamDescriptor, AirbyteStream> descriptorToStreamNew = streamDescriptorToMap(
newCatalog);

Sets.difference(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> streamTransforms.add(
StreamTransform.createRemoveStreamTransform(descriptor)));
Sets.difference(descriptorToStreamNew.keySet(), descriptorToStreamOld.keySet())
.forEach(descriptor -> streamTransforms.add(
StreamTransform.createAddStreamTransform(descriptor)));
Sets.intersection(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> {
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);

final Optional<ConfiguredAirbyteStream> stream = configuredCatalog.getStreams().stream()
.filter(s -> Objects.equals(s.getStream().getNamespace(), descriptor.getNamespace())
&& s.getStream().getName().equals(descriptor.getName()))
.findFirst();

if (!streamOld.equals(streamNew) && stream.isPresent()) {
// getStreamDiff only checks for differences in the stream's field name or field type
// but there are a number of reasons the streams might be different (such as a source-defined
// primary key or cursor changing). These should not be expressed as "stream updates".
UpdateStreamTransform streamTransform = getStreamDiff(streamOld, streamNew, stream);
if (streamTransform.getFieldTransforms().size() > 0) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, streamTransform));
}
}
});

return streamTransforms;
}

private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld,
final AirbyteStream streamNew,
final Optional<ConfiguredAirbyteStream> configuredStream) {
final Set<FieldTransform> fieldTransforms = new HashSet<>();
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(
streamOld.getJsonSchema())
.stream()
.collect(
HashMap::new,
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);
final Map<List<String>, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(
streamNew.getJsonSchema())
.stream()
.collect(
HashMap::new,
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);

Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> {
fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName,
fieldNameToTypeOld.get(fieldName),
transformBreaksConnection(configuredStream, fieldName)));
});
Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet())
.forEach(fieldName -> fieldTransforms.add(
FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName))));
Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> {
final JsonNode oldType = fieldNameToTypeOld.get(fieldName);
final JsonNode newType = fieldNameToTypeNew.get(fieldName);

if (!oldType.equals(newType)) {
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(fieldName,
new UpdateFieldSchemaTransform(oldType, newType)));
}
});

return new UpdateStreamTransform(fieldTransforms);
}

@VisibleForTesting
static final JsonNode DUPLICATED_SCHEMA = Jsons.jsonNode("Duplicated Schema");

@VisibleForTesting
static void collectInHashMap(final Map<List<String>, JsonNode> accumulator,
final Pair<List<String>, JsonNode> value) {
if (accumulator.containsKey(value.getKey())) {
accumulator.put(value.getKey(), DUPLICATED_SCHEMA);
} else {
accumulator.put(value.getKey(), value.getValue());
}
}

@VisibleForTesting
static void combineAccumulator(final Map<List<String>, JsonNode> accumulatorLeft,
final Map<List<String>, JsonNode> accumulatorRight) {
accumulatorRight.forEach((key, value) -> {
if (accumulatorLeft.containsKey(key)) {
accumulatorLeft.put(key, DUPLICATED_SCHEMA);
} else {
accumulatorLeft.put(key, value);
}
});
}

static boolean transformBreaksConnection(final Optional<ConfiguredAirbyteStream> configuredStream,
final List<String> fieldName) {
if (configuredStream.isEmpty()) {
return false;
}

final ConfiguredAirbyteStream streamConfig = configuredStream.get();

final SyncMode syncMode = streamConfig.getSyncMode();
if (SyncMode.INCREMENTAL == syncMode && streamConfig.getCursorField().equals(fieldName)) {
return true;
}

final DestinationSyncMode destinationSyncMode = streamConfig.getDestinationSyncMode();
if (DestinationSyncMode.APPEND_DEDUP == destinationSyncMode && streamConfig.getPrimaryKey()
.contains(fieldName)) {
return true;
}
return false;
}

}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 9d3d67d

Please sign in to comment.