From 3344441fb8f53d976539f0af1d8eef0729e83882 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:12:04 +0200 Subject: [PATCH] feat(low-code): added keys replace transformation (#183) Co-authored-by: octavia-squidington-iii --- .../declarative_component_schema.yaml | 45 +++++++ .../models/declarative_component_schema.py | 19 +++ .../parsers/model_to_component_factory.py | 14 +++ .../keys_replace_transformation.py | 61 ++++++++++ .../test_keys_replace_transformation.py | 112 ++++++++++++++++++ 5 files changed, 251 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py create mode 100644 unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 89c731075..a9d4f2558 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1241,6 +1241,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1785,6 +1786,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" $parameters: @@ -1883,6 +1885,49 @@ definitions: $parameters: type: object additionalProperties: true + KeysReplace: + title: Keys Replace + description: A transformation that replaces symbols in keys. + type: object + required: + - type + - old + - new + properties: + type: + type: string + enum: [KeysReplace] + old: + type: string + title: Old value + description: Old value to replace. + examples: + - " " + - "{{ record.id }}" + - "{{ config['id'] }}" + - "{{ stream_slice['id'] }}" + interpolation_context: + - config + - record + - stream_state + - stream_slice + new: + type: string + title: New value + description: New value to set. + examples: + - "_" + - "{{ record.id }}" + - "{{ config['id'] }}" + - "{{ stream_slice['id'] }}" + interpolation_context: + - config + - record + - stream_state + - stream_slice + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5823b34c1..6b70b3fdd 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeysReplace(BaseModel): + type: Literal["KeysReplace"] + old: str = Field( + ..., + description="Old value to replace.", + examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"], + title="Old value", + ) + new: str = Field( + ..., + description="New value to set.", + examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"], + title="New value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class FlattenFields(BaseModel): type: Literal["FlattenFields"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1701,6 +1718,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( @@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 694cb1042..7cb04c2a6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -254,6 +254,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtPayload as JwtPayloadModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysReplace as KeysReplaceModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToLower as KeysToLowerModel, ) @@ -417,6 +420,9 @@ from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) @@ -509,6 +515,7 @@ def _init_mappings(self) -> None: GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, + KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -630,6 +637,13 @@ def create_keys_to_snake_transformation( ) -> KeysToSnakeCaseTransformation: return KeysToSnakeCaseTransformation() + def create_keys_replace_transformation( + self, model: KeysReplaceModel, config: Config, **kwargs: Any + ) -> KeysReplaceTransformation: + return KeysReplaceTransformation( + old=model.old, new=model.new, parameters=model.parameters or {} + ) + def create_flatten_fields( self, model: FlattenFieldsModel, config: Config, **kwargs: Any ) -> FlattenFields: diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py new file mode 100644 index 000000000..8fe0bbffb --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Dict, Mapping, Optional + +from airbyte_cdk import InterpolatedString +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class KeysReplaceTransformation(RecordTransformation): + """ + Transformation that applies keys names replacement. + + Example usage: + - type: KeysReplace + old: " " + new: "_" + Result: + from: {"created time": ..., "customer id": ..., "user id": ...} + to: {"created_time": ..., "customer_id": ..., "user_id": ...} + """ + + old: str + new: str + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._old = InterpolatedString.create(self.old, parameters=parameters) + self._new = InterpolatedString.create(self.new, parameters=parameters) + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + if config is None: + config = {} + + kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} + old_key = str(self._old.eval(config, **kwargs)) + new_key = str(self._new.eval(config, **kwargs)) + + def _transform(data: Dict[str, Any]) -> Dict[str, Any]: + result = {} + for key, value in data.items(): + updated_key = key.replace(old_key, new_key) + if isinstance(value, dict): + result[updated_key] = _transform(value) + else: + result[updated_key] = value + return result + + transformed_record = _transform(record) + record.clear() + record.update(transformed_record) diff --git a/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py new file mode 100644 index 000000000..417d992af --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import pytest + +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +_ANY_VALUE = -1 + + +@pytest.mark.parametrize( + [ + "input_record", + "config", + "stream_state", + "stream_slice", + "keys_replace_config", + "expected_record", + ], + [ + pytest.param( + {"date time": _ANY_VALUE, "customer id": _ANY_VALUE}, + {}, + {}, + {}, + {"old": " ", "new": "_"}, + {"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE}, + id="simple keys replace config", + ), + pytest.param( + { + "customer_id": 111111, + "customer_name": "MainCustomer", + "field_1_111111": _ANY_VALUE, + "field_2_111111": _ANY_VALUE, + }, + {}, + {}, + {}, + {"old": '{{ record["customer_id"] }}', "new": '{{ record["customer_name"] }}'}, + { + "customer_id": 111111, + "customer_name": "MainCustomer", + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from record", + ), + pytest.param( + {"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE}, + {}, + {}, + {"customer_name": "MainCustomer"}, + {"old": '{{ record["customer_id"] }}', "new": '{{ stream_slice["customer_name"] }}'}, + { + "customer_id": 111111, + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from slice", + ), + pytest.param( + {"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE}, + {"customer_name": "MainCustomer"}, + {}, + {}, + {"old": '{{ record["customer_id"] }}', "new": '{{ config["customer_name"] }}'}, + { + "customer_id": 111111, + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from config", + ), + pytest.param( + { + "date time": _ANY_VALUE, + "user id": _ANY_VALUE, + "customer": { + "customer name": _ANY_VALUE, + "customer id": _ANY_VALUE, + "contact info": {"email": _ANY_VALUE, "phone number": _ANY_VALUE}, + }, + }, + {}, + {}, + {}, + {"old": " ", "new": "_"}, + { + "customer": { + "contact_info": {"email": _ANY_VALUE, "phone_number": _ANY_VALUE}, + "customer_id": _ANY_VALUE, + "customer_name": _ANY_VALUE, + }, + "date_time": _ANY_VALUE, + "user_id": _ANY_VALUE, + }, + id="simple keys replace config with nested fields in record", + ), + ], +) +def test_transform( + input_record, config, stream_state, stream_slice, keys_replace_config, expected_record +): + KeysReplaceTransformation( + old=keys_replace_config["old"], new=keys_replace_config["new"], parameters={} + ).transform( + record=input_record, config=config, stream_state=stream_state, stream_slice=stream_slice + ) + assert input_record == expected_record