Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CustomSchemaNormalization #194

Merged
merged 14 commits into from
Jan 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,28 @@ definitions:
$parameters:
type: object
additionalProperties: true
CustomSchemaNormalization:
title: Custom Schema Normalization
description: Schema normalization component whose behavior is derived from a custom code implementation of the connector.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
$parameters:
type: object
additionalProperties: true
CustomStateMigration:
title: Custom State Migration
description: Apply a custom transformation on the input state.
Expand Down Expand Up @@ -2600,7 +2622,11 @@ definitions:
- "$ref": "#/definitions/CustomRecordFilter"
- "$ref": "#/definitions/RecordFilter"
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
title: Schema Normalization
description: Responsible for normalization according to the schema.
anyOf:
- "$ref": "#/definitions/SchemaNormalization"
- "$ref": "#/definitions/CustomSchemaNormalization"
default: None
$parameters:
type: object
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
ResponseToFileExtractor,
)
from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer

__all__ = [
"TypeTransformer",
"HttpSelector",
"DpathExtractor",
"RecordFilter",
Expand Down
12 changes: 5 additions & 7 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
TypeTransformer as DeclarativeTypeTransformer,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalization.None_: TransformConfig.NoTransform,
SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization,
}
from airbyte_cdk.sources.utils.transform import TypeTransformer


@dataclass
Expand All @@ -38,7 +36,7 @@ class RecordSelector(HttpSelector):
extractor: RecordExtractor
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_normalization: TypeTransformer
schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer]
name: str
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
record_filter: Optional[RecordFilter] = None
Expand Down
55 changes: 55 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/type_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Mapping


@dataclass
class TypeTransformer(ABC):
"""
Abstract base class for implementing type transformation logic.

This class provides a blueprint for defining custom transformations
on data records based on a provided schema. Implementing classes
must override the `transform` method to specify the transformation
logic.

Attributes:
None explicitly defined, as this is a dataclass intended to be
subclassed.

Methods:
transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
Abstract method that must be implemented by subclasses.
It performs a transformation on a given data record based
on the provided schema.

Usage:
To use this class, create a subclass that implements the
`transform` method with the desired transformation logic.
"""

@abstractmethod
def transform(
self,
record: Dict[str, Any],
schema: Mapping[str, Any],
) -> None:
"""
Perform a transformation on a data record based on a given schema.

Args:
record (Dict[str, Any]): The data record to be transformed.
schema (Mapping[str, Any]): The schema that dictates how
the record should be transformed.

Returns:
None

Raises:
NotImplementedError: If the method is not implemented
by a subclass.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomSchemaNormalization(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomSchemaNormalization"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.",
examples=[
"source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomStateMigration(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1530,7 +1546,11 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.extractors.record_selector import (
SCHEMA_TRANSFORMER_TYPE_MAPPING,
)
from airbyte_cdk.sources.declarative.incremental import (
ChildPartitionResumableFullRefreshCursor,
CursorFactory,
Expand All @@ -100,7 +97,9 @@
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import (
LegacyToPerPartitionStateMigration,
)
from airbyte_cdk.sources.declarative.models import CustomStateMigration
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -185,6 +184,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaLoader as CustomSchemaLoader,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaNormalization as CustomSchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomTransformation as CustomTransformationModel,
)
Expand Down Expand Up @@ -311,6 +313,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaNormalization as SchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
Expand Down Expand Up @@ -445,6 +450,11 @@

ComponentDefinition = Mapping[str, Any]

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalizationModel.None_: TransformConfig.NoTransform,
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
}


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -493,6 +503,7 @@ def _init_mappings(self) -> None:
CustomRequesterModel: self.create_custom_component,
CustomRetrieverModel: self.create_custom_component,
CustomSchemaLoader: self.create_custom_component,
CustomSchemaNormalizationModel: self.create_custom_component,
CustomStateMigration: self.create_custom_component,
CustomPaginationStrategyModel: self.create_custom_component,
CustomPartitionRouterModel: self.create_custom_component,
Expand Down Expand Up @@ -2000,7 +2011,6 @@ def create_record_selector(
client_side_incremental_sync: Dict[str, Any] | None = None,
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
extractor = self._create_component_from_model(
model=model.extractor, decoder=decoder, config=config
)
Expand All @@ -2018,8 +2028,10 @@ def create_record_selector(
else None,
**client_side_incremental_sync,
)
schema_normalization = TypeTransformer(
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
schema_normalization = (
TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])
if isinstance(model.schema_normalization, SchemaNormalizationModel)
else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
)

return RecordSelector(
Expand Down
Loading