Skip to content

Commit

Permalink
feat(low-code): added keys replace transformation (#183)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
darynaishchenko and octavia-squidington-iii authored Jan 8, 2025
1 parent 3ee710d commit 3344441
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 0 deletions.
45 changes: 45 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1785,6 +1786,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down Expand Up @@ -1883,6 +1885,49 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
type: object
required:
- type
- old
- new
properties:
type:
type: string
enum: [KeysReplace]
old:
type: string
title: Old value
description: Old value to replace.
examples:
- " "
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
title: New value
description: New value to set.
examples:
- "_"
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -1701,6 +1718,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down Expand Up @@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtPayload as JwtPayloadModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysReplace as KeysReplaceModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
Expand Down Expand Up @@ -417,6 +420,9 @@
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -509,6 +515,7 @@ def _init_mappings(self) -> None:
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -630,6 +637,13 @@ def create_keys_to_snake_transformation(
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_keys_replace_transformation(
self, model: KeysReplaceModel, config: Config, **kwargs: Any
) -> KeysReplaceTransformation:
return KeysReplaceTransformation(
old=model.old, new=model.new, parameters=model.parameters or {}
)

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, Mapping, Optional

from airbyte_cdk import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysReplaceTransformation(RecordTransformation):
"""
Transformation that applies keys names replacement.
Example usage:
- type: KeysReplace
old: " "
new: "_"
Result:
from: {"created time": ..., "customer id": ..., "user id": ...}
to: {"created_time": ..., "customer_id": ..., "user_id": ...}
"""

old: str
new: str
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._old = InterpolatedString.create(self.old, parameters=parameters)
self._new = InterpolatedString.create(self.new, parameters=parameters)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
if config is None:
config = {}

kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
old_key = str(self._old.eval(config, **kwargs))
new_key = str(self._new.eval(config, **kwargs))

def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
result = {}
for key, value in data.items():
updated_key = key.replace(old_key, new_key)
if isinstance(value, dict):
result[updated_key] = _transform(value)
else:
result[updated_key] = value
return result

transformed_record = _transform(record)
record.clear()
record.update(transformed_record)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import pytest

from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)

_ANY_VALUE = -1


@pytest.mark.parametrize(
[
"input_record",
"config",
"stream_state",
"stream_slice",
"keys_replace_config",
"expected_record",
],
[
pytest.param(
{"date time": _ANY_VALUE, "customer id": _ANY_VALUE},
{},
{},
{},
{"old": " ", "new": "_"},
{"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE},
id="simple keys replace config",
),
pytest.param(
{
"customer_id": 111111,
"customer_name": "MainCustomer",
"field_1_111111": _ANY_VALUE,
"field_2_111111": _ANY_VALUE,
},
{},
{},
{},
{"old": '{{ record["customer_id"] }}', "new": '{{ record["customer_name"] }}'},
{
"customer_id": 111111,
"customer_name": "MainCustomer",
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from record",
),
pytest.param(
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
{},
{},
{"customer_name": "MainCustomer"},
{"old": '{{ record["customer_id"] }}', "new": '{{ stream_slice["customer_name"] }}'},
{
"customer_id": 111111,
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from slice",
),
pytest.param(
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
{"customer_name": "MainCustomer"},
{},
{},
{"old": '{{ record["customer_id"] }}', "new": '{{ config["customer_name"] }}'},
{
"customer_id": 111111,
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from config",
),
pytest.param(
{
"date time": _ANY_VALUE,
"user id": _ANY_VALUE,
"customer": {
"customer name": _ANY_VALUE,
"customer id": _ANY_VALUE,
"contact info": {"email": _ANY_VALUE, "phone number": _ANY_VALUE},
},
},
{},
{},
{},
{"old": " ", "new": "_"},
{
"customer": {
"contact_info": {"email": _ANY_VALUE, "phone_number": _ANY_VALUE},
"customer_id": _ANY_VALUE,
"customer_name": _ANY_VALUE,
},
"date_time": _ANY_VALUE,
"user_id": _ANY_VALUE,
},
id="simple keys replace config with nested fields in record",
),
],
)
def test_transform(
input_record, config, stream_state, stream_slice, keys_replace_config, expected_record
):
KeysReplaceTransformation(
old=keys_replace_config["old"], new=keys_replace_config["new"], parameters={}
).transform(
record=input_record, config=config, stream_state=stream_state, stream_slice=stream_slice
)
assert input_record == expected_record

0 comments on commit 3344441

Please sign in to comment.