Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CustomSchemaNormalization #194

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,28 @@ definitions:
$parameters:
type: object
additionalProperties: true
CustomSchemaNormalization:
title: Custom Schema Normalization
description: Schema normalization component whose behavior is derived from a custom code implementation of the connector.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
$parameters:
type: object
additionalProperties: true
CustomStateMigration:
title: Custom State Migration
description: Apply a custom transformation on the input state.
Expand Down Expand Up @@ -2555,7 +2577,11 @@ definitions:
- "$ref": "#/definitions/CustomRecordFilter"
- "$ref": "#/definitions/RecordFilter"
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
title: Schema Normalization
description: Responsible for normalization according to the schema.
anyOf:
- "$ref": "#/definitions/SchemaNormalization"
- "$ref": "#/definitions/CustomSchemaNormalization"
default: None
$parameters:
type: object
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
ResponseToFileExtractor,
)
from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer

__all__ = [
"AbstractTypeTransformer",
"HttpSelector",
"DpathExtractor",
"RecordFilter",
Expand Down
10 changes: 3 additions & 7 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalization.None_: TransformConfig.NoTransform,
SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization,
}
from airbyte_cdk.sources.utils.transform import TypeTransformer


@dataclass
Expand All @@ -38,7 +34,7 @@ class RecordSelector(HttpSelector):
extractor: RecordExtractor
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_normalization: TypeTransformer
schema_normalization: TypeTransformer | AbstractTypeTransformer
name: str
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
record_filter: Optional[RecordFilter] = None
Expand Down
55 changes: 55 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/type_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Mapping


@dataclass
class AbstractTypeTransformer(ABC):
"""
Abstract base class for implementing type transformation logic.

This class provides a blueprint for defining custom transformations
on data records based on a provided schema. Implementing classes
must override the `transform` method to specify the transformation
logic.

Attributes:
None explicitly defined, as this is a dataclass intended to be
subclassed.

Methods:
transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
Abstract method that must be implemented by subclasses.
It performs a transformation on a given data record based
on the provided schema.

Usage:
To use this class, create a subclass that implements the
`transform` method with the desired transformation logic.
"""

@abstractmethod
def transform(
self,
record: Dict[str, Any],
schema: Mapping[str, Any],
) -> None:
"""
Perform a transformation on a data record based on a given schema.

Args:
record (Dict[str, Any]): The data record to be transformed.
schema (Mapping[str, Any]): The schema that dictates how
the record should be transformed.

Returns:
None

Raises:
NotImplementedError: If the method is not implemented
by a subclass.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomSchemaNormalization(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomSchemaNormalization"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.",
examples=[
"source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomStateMigration(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1513,7 +1529,11 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.extractors.record_selector import (
SCHEMA_TRANSFORMER_TYPE_MAPPING,
)
from airbyte_cdk.sources.declarative.incremental import (
ChildPartitionResumableFullRefreshCursor,
CursorFactory,
Expand All @@ -100,7 +97,9 @@
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import (
LegacyToPerPartitionStateMigration,
)
from airbyte_cdk.sources.declarative.models import CustomStateMigration
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -185,6 +184,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaLoader as CustomSchemaLoader,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaNormalization as CustomSchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomTransformation as CustomTransformationModel,
)
Expand Down Expand Up @@ -308,6 +310,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaNormalization as SchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
Expand Down Expand Up @@ -439,6 +444,11 @@

ComponentDefinition = Mapping[str, Any]

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalizationModel.None_: TransformConfig.NoTransform,
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
}


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -487,6 +497,7 @@ def _init_mappings(self) -> None:
CustomRequesterModel: self.create_custom_component,
CustomRetrieverModel: self.create_custom_component,
CustomSchemaLoader: self.create_custom_component,
CustomSchemaNormalizationModel: self.create_custom_component,
CustomStateMigration: self.create_custom_component,
CustomPaginationStrategyModel: self.create_custom_component,
CustomPartitionRouterModel: self.create_custom_component,
Expand Down Expand Up @@ -1981,7 +1992,6 @@ def create_record_selector(
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
extractor = self._create_component_from_model(
model=model.extractor, decoder=decoder, config=config
)
Expand All @@ -1999,8 +2009,10 @@ def create_record_selector(
else None,
**client_side_incremental_sync,
)
schema_normalization = TypeTransformer(
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
schema_normalization = (
TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])
if isinstance(model.schema_normalization, SchemaNormalizationModel)
else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this CustomNormalization component would need to already adhere or inherit from the TypeTransformer interface/class?

And that's why we don't need to invoke the TypeTransformer constructor?

Copy link
Contributor Author

@artem1205 artem1205 Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, most of the time we use customtypetransformer by registration some custom function, but generally speaking we need only transform interface, I can add abstract class for this

self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
if schema:
# record has type Mapping[str, Any], but dict[str, Any] expected
for record in records:
normalized_record = dict(record)
self.schema_normalization.transform(normalized_record, schema)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we need to create an interface in the declarative package for this. I don't think we want users to be re-implementing TypeTransformer for a couple reasons:

  • It looks like it has a lot of method we don't actually care about
  • We want to avoid the pattern of customers re-implementing concrete classes since it couples our internal implementation to customer usage w/ unexpected breaking changes

So we should just create a new TypeTransformer interface in low-code (i'll leave naming to you), and then I think RecordSelector.schema_normalization will need to be changed to a union of these two classes interfaces. Once we have these added will 👍 the PR. thanks for making the other adjustments!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

return RecordSelector(
Expand Down
Loading