Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CustomSchemaNormalization #194

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2555,7 +2555,9 @@ definitions:
- "$ref": "#/definitions/CustomRecordFilter"
- "$ref": "#/definitions/RecordFilter"
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
anyOf:
- "$ref": "#/definitions/SchemaNormalization"
- "$ref": "#/definitions/CustomSchemaNormalization"
default: None
$parameters:
type: object
Expand All @@ -2570,6 +2572,28 @@ definitions:
examples:
- None
- Default
CustomSchemaNormalization:
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
title: Custom Schema Normalization
description: Use this to implement custom normalization according to the schema.
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomSchemaNormalization]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
$parameters:
type: object
additionalProperties: true
RemoveFields:
title: Remove Fields
description: A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalization.None_: TransformConfig.NoTransform,
SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization,
}
from airbyte_cdk.sources.utils.transform import TypeTransformer


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,22 @@ class SchemaNormalization(Enum):
Default = "Default"


class CustomSchemaNormalization(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomSchemaNormalization"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.",
examples=[
"source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class RemoveFields(BaseModel):
type: Literal["RemoveFields"]
condition: Optional[str] = Field(
Expand Down Expand Up @@ -1513,7 +1529,9 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = (
SchemaNormalization.None_
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.extractors.record_selector import (
SCHEMA_TRANSFORMER_TYPE_MAPPING,
)
from airbyte_cdk.sources.declarative.incremental import (
ChildPartitionResumableFullRefreshCursor,
CursorFactory,
Expand All @@ -100,7 +97,9 @@
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import (
LegacyToPerPartitionStateMigration,
)
from airbyte_cdk.sources.declarative.models import CustomStateMigration
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -185,6 +184,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaLoader as CustomSchemaLoader,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaNormalization as CustomSchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomTransformation as CustomTransformationModel,
)
Expand Down Expand Up @@ -308,6 +310,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaNormalization as SchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
Expand Down Expand Up @@ -439,6 +444,11 @@

ComponentDefinition = Mapping[str, Any]

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalizationModel.None_: TransformConfig.NoTransform,
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
}


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -487,6 +497,7 @@ def _init_mappings(self) -> None:
CustomRequesterModel: self.create_custom_component,
CustomRetrieverModel: self.create_custom_component,
CustomSchemaLoader: self.create_custom_component,
CustomSchemaNormalizationModel: self.create_custom_component,
CustomStateMigration: self.create_custom_component,
CustomPaginationStrategyModel: self.create_custom_component,
CustomPartitionRouterModel: self.create_custom_component,
Expand Down Expand Up @@ -1981,7 +1992,6 @@ def create_record_selector(
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
extractor = self._create_component_from_model(
model=model.extractor, decoder=decoder, config=config
)
Expand All @@ -1999,8 +2009,10 @@ def create_record_selector(
else None,
**client_side_incremental_sync,
)
schema_normalization = TypeTransformer(
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
schema_normalization = (
TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])
if isinstance(model.schema_normalization, SchemaNormalizationModel)
else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this CustomNormalization component would need to already adhere or inherit from the TypeTransformer interface/class?

And that's why we don't need to invoke the TypeTransformer constructor?

Copy link
Contributor Author

@artem1205 artem1205 Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, most of the time we use customtypetransformer by registration some custom function, but generally speaking we need only transform interface, I can add abstract class for this

self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
if schema:
# record has type Mapping[str, Any], but dict[str, Any] expected
for record in records:
normalized_record = dict(record)
self.schema_normalization.transform(normalized_record, schema)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we need to create an interface in the declarative package for this. I don't think we want users to be re-implementing TypeTransformer for a couple reasons:

  • It looks like it has a lot of method we don't actually care about
  • We want to avoid the pattern of customers re-implementing concrete classes since it couples our internal implementation to customer usage w/ unexpected breaking changes

So we should just create a new TypeTransformer interface in low-code (i'll leave naming to you), and then I think RecordSelector.schema_normalization will need to be changed to a union of these two classes interfaces. Once we have these added will 👍 the PR. thanks for making the other adjustments!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

return RecordSelector(
Expand Down
Loading