diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index aacac6654..261520881 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -9,8 +9,10 @@ from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( ResponseToFileExtractor, ) +from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer __all__ = [ + "AbstractTypeTransformer", "HttpSelector", "DpathExtractor", "RecordFilter", diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index a2da0ccd2..2d9546878 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -10,6 +10,7 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter +from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation @@ -33,7 +34,7 @@ class RecordSelector(HttpSelector): extractor: RecordExtractor config: Config parameters: InitVar[Mapping[str, Any]] - schema_normalization: TypeTransformer + schema_normalization: TypeTransformer | AbstractTypeTransformer name: str _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") record_filter: Optional[RecordFilter] = None diff --git a/airbyte_cdk/sources/declarative/extractors/type_transformer.py b/airbyte_cdk/sources/declarative/extractors/type_transformer.py new file mode 100644 index 000000000..f9e107fe4 --- /dev/null +++ b/airbyte_cdk/sources/declarative/extractors/type_transformer.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, Mapping + + +@dataclass +class AbstractTypeTransformer(ABC): + """ + Abstract base class for implementing type transformation logic. + + This class provides a blueprint for defining custom transformations + on data records based on a provided schema. Implementing classes + must override the `transform` method to specify the transformation + logic. + + Attributes: + None explicitly defined, as this is a dataclass intended to be + subclassed. + + Methods: + transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: + Abstract method that must be implemented by subclasses. + It performs a transformation on a given data record based + on the provided schema. + + Usage: + To use this class, create a subclass that implements the + `transform` method with the desired transformation logic. + """ + + @abstractmethod + def transform( + self, + record: Dict[str, Any], + schema: Mapping[str, Any], + ) -> None: + """ + Perform a transformation on a data record based on a given schema. + + Args: + record (Dict[str, Any]): The data record to be transformed. + schema (Mapping[str, Any]): The schema that dictates how + the record should be transformed. + + Returns: + None + + Raises: + NotImplementedError: If the method is not implemented + by a subclass. + """