From 3984559c436fd7db2f73ac1f999518de355d282b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 31 Dec 2024 14:56:03 +0100 Subject: [PATCH 01/12] CDK: add custom type transformer Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 45 +++++++++++++++---- .../declarative/extractors/record_selector.py | 8 +--- .../models/declarative_component_schema.py | 20 ++++++++- .../parsers/model_to_component_factory.py | 40 ++++++++++++++--- 4 files changed, 88 insertions(+), 25 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 89c73107..f30a63a9 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2555,21 +2555,48 @@ definitions: - "$ref": "#/definitions/CustomRecordFilter" - "$ref": "#/definitions/RecordFilter" schema_normalization: - "$ref": "#/definitions/SchemaNormalization" - default: None + anyOf: + - "$ref": "#/definitions/SchemaNormalization" + - "$ref": "#/definitions/CustomSchemaNormalization" +# default: None $parameters: type: object additionalProperties: true SchemaNormalization: title: Schema Normalization description: Responsible for normalization according to the schema. - type: string - enum: - - None - - Default - examples: - - None - - Default + type: object + required: + - type + properties: + type: + type: string + enum: [ SchemaNormalization ] + transform_config: + type: string + enum: + - None + - Default + CustomSchemaNormalization: + title: Schema Normalization + description: Responsible for normalization according to the schema. + type: object + required: + - type + properties: + type: + type: string + enum: [ CustomSchemaNormalization ] + class_name: + title: Class Name + description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`. + type: string + additionalProperties: true + examples: + - "source_amazon_ads.components.LedgerDetailedViewReportsTypeTransformer" + $parameters: + type: object + additionalProperties: true RemoveFields: title: Remove Fields description: A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown. diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index b2eed93b..99da6bb9 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -11,15 +11,9 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.interpolation import InterpolatedString -from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer - -SCHEMA_TRANSFORMER_TYPE_MAPPING = { - SchemaNormalization.None_: TransformConfig.NoTransform, - SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization, -} +from airbyte_cdk.sources.utils.transform import TypeTransformer @dataclass diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5823b34c..b33c71ab 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1019,11 +1019,27 @@ class RecordFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class SchemaNormalization(Enum): +class TransformConfig(Enum): None_ = "None" Default = "Default" +class SchemaNormalization(BaseModel): + type: Literal["SchemaNormalization"] + transform_config: Optional[TransformConfig] = None + + +class CustomSchemaNormalization(BaseModel): + type: Literal["CustomSchemaNormalization"] + class_name: Optional[str] = Field( + None, + description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`.", + examples=["source_amazon_ads.components.LedgerDetailedViewReportsTypeTransformer"], + title="Class Name", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class RemoveFields(BaseModel): type: Literal["RemoveFields"] condition: Optional[str] = Field( @@ -1513,7 +1529,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_ + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = None parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 694cb104..ef52757a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -82,9 +82,6 @@ from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) -from airbyte_cdk.sources.declarative.extractors.record_selector import ( - SCHEMA_TRANSFORMER_TYPE_MAPPING, -) from airbyte_cdk.sources.declarative.incremental import ( ChildPartitionResumableFullRefreshCursor, CursorFactory, @@ -100,7 +97,12 @@ from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import ( LegacyToPerPartitionStateMigration, ) -from airbyte_cdk.sources.declarative.models import CustomStateMigration +from airbyte_cdk.sources.declarative.models import ( + CustomStateMigration, +) +from airbyte_cdk.sources.declarative.models import ( + TransformConfig as TransformConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -185,6 +187,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomSchemaLoader as CustomSchemaLoader, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CustomSchemaNormalization as CustomSchemaNormalizationModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomTransformation as CustomTransformationModel, ) @@ -308,6 +313,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ResponseToFileExtractor as ResponseToFileExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + SchemaNormalization as SchemaNormalizationModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SchemaTypeIdentifier as SchemaTypeIdentifierModel, ) @@ -439,6 +447,11 @@ ComponentDefinition = Mapping[str, Any] +SCHEMA_TRANSFORMER_TYPE_MAPPING = { + TransformConfigModel.None_: TransformConfig.NoTransform, + TransformConfigModel.Default: TransformConfig.DefaultSchemaNormalization, +} + class ModelToComponentFactory: EPOCH_DATETIME_FORMAT = "%s" @@ -487,6 +500,7 @@ def _init_mappings(self) -> None: CustomRequesterModel: self.create_custom_component, CustomRetrieverModel: self.create_custom_component, CustomSchemaLoader: self.create_custom_component, + CustomSchemaNormalizationModel: self.create_custom_component, CustomStateMigration: self.create_custom_component, CustomPaginationStrategyModel: self.create_custom_component, CustomPartitionRouterModel: self.create_custom_component, @@ -532,6 +546,7 @@ def _init_mappings(self) -> None: RequestPathModel: self.create_request_path, RequestOptionModel: self.create_request_option, LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator, + SchemaNormalizationModel: self.create_schema_normalization, SelectiveAuthenticatorModel: self.create_selective_authenticator, SimpleRetrieverModel: self.create_simple_retriever, SpecModel: self.create_spec, @@ -1981,7 +1996,6 @@ def create_record_selector( client_side_incremental_sync: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> RecordSelector: - assert model.schema_normalization is not None # for mypy extractor = self._create_component_from_model( model=model.extractor, decoder=decoder, config=config ) @@ -1999,8 +2013,10 @@ def create_record_selector( else None, **client_side_incremental_sync, ) - schema_normalization = TypeTransformer( - SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization] + schema_normalization = ( + self._create_component_from_model(model.schema_normalization, config=config) + if model.schema_normalization + else TypeTransformer(TransformConfig.NoTransform) ) return RecordSelector( @@ -2021,6 +2037,16 @@ def create_remove_fields( field_pointers=model.field_pointers, condition=model.condition or "", parameters={} ) + @staticmethod + def create_schema_normalization( + model: SchemaNormalizationModel, config: Config, **kwargs: Any + ) -> TypeTransformer: + schema_normalization = TypeTransformer( + SCHEMA_TRANSFORMER_TYPE_MAPPING[model.transform_config] + ) + + return schema_normalization + def create_selective_authenticator( self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any ) -> DeclarativeAuthenticator: From 24333407bf1c708ffe5d6c2981f1c745e0b1188d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 31 Dec 2024 15:12:00 +0100 Subject: [PATCH 02/12] CDK: ref Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 21 +++++++--------- .../models/declarative_component_schema.py | 11 ++++----- .../parsers/model_to_component_factory.py | 24 ++++--------------- 3 files changed, 17 insertions(+), 39 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f30a63a9..6b96367e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2558,25 +2558,20 @@ definitions: anyOf: - "$ref": "#/definitions/SchemaNormalization" - "$ref": "#/definitions/CustomSchemaNormalization" -# default: None + default: None $parameters: type: object additionalProperties: true SchemaNormalization: title: Schema Normalization description: Responsible for normalization according to the schema. - type: object - required: - - type - properties: - type: - type: string - enum: [ SchemaNormalization ] - transform_config: - type: string - enum: - - None - - Default + type: string + enum: + - None + - Default + examples: + - None + - Default CustomSchemaNormalization: title: Schema Normalization description: Responsible for normalization according to the schema. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b33c71ab..bbe013b1 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1019,16 +1019,11 @@ class RecordFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class TransformConfig(Enum): +class SchemaNormalization(Enum): None_ = "None" Default = "Default" -class SchemaNormalization(BaseModel): - type: Literal["SchemaNormalization"] - transform_config: Optional[TransformConfig] = None - - class CustomSchemaNormalization(BaseModel): type: Literal["CustomSchemaNormalization"] class_name: Optional[str] = Field( @@ -1529,7 +1524,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = None + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = ( + SchemaNormalization.None_ + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ef52757a..50de9495 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -100,9 +100,6 @@ from airbyte_cdk.sources.declarative.models import ( CustomStateMigration, ) -from airbyte_cdk.sources.declarative.models import ( - TransformConfig as TransformConfigModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -448,8 +445,8 @@ ComponentDefinition = Mapping[str, Any] SCHEMA_TRANSFORMER_TYPE_MAPPING = { - TransformConfigModel.None_: TransformConfig.NoTransform, - TransformConfigModel.Default: TransformConfig.DefaultSchemaNormalization, + SchemaNormalizationModel.None_: TransformConfig.NoTransform, + SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization, } @@ -546,7 +543,6 @@ def _init_mappings(self) -> None: RequestPathModel: self.create_request_path, RequestOptionModel: self.create_request_option, LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator, - SchemaNormalizationModel: self.create_schema_normalization, SelectiveAuthenticatorModel: self.create_selective_authenticator, SimpleRetrieverModel: self.create_simple_retriever, SpecModel: self.create_spec, @@ -2014,9 +2010,9 @@ def create_record_selector( **client_side_incremental_sync, ) schema_normalization = ( - self._create_component_from_model(model.schema_normalization, config=config) - if model.schema_normalization - else TypeTransformer(TransformConfig.NoTransform) + TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) + if isinstance(model.schema_normalization, SchemaNormalizationModel) + else self._create_component_from_model(model.schema_normalization, config=config) ) return RecordSelector( @@ -2037,16 +2033,6 @@ def create_remove_fields( field_pointers=model.field_pointers, condition=model.condition or "", parameters={} ) - @staticmethod - def create_schema_normalization( - model: SchemaNormalizationModel, config: Config, **kwargs: Any - ) -> TypeTransformer: - schema_normalization = TypeTransformer( - SCHEMA_TRANSFORMER_TYPE_MAPPING[model.transform_config] - ) - - return schema_normalization - def create_selective_authenticator( self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any ) -> DeclarativeAuthenticator: From 9bdfb7392682e5f35a40f9c2a35c4908a2bf8cb4 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 31 Dec 2024 15:15:39 +0100 Subject: [PATCH 03/12] CDK: fix Signed-off-by: Artem Inzhyyants --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 50de9495..88cdc8e5 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2012,7 +2012,7 @@ def create_record_selector( schema_normalization = ( TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) if isinstance(model.schema_normalization, SchemaNormalizationModel) - else self._create_component_from_model(model.schema_normalization, config=config) + else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here ) return RecordSelector( From c9202e5988bb049cd3cd839798acf16665744600 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 31 Dec 2024 15:50:39 +0100 Subject: [PATCH 04/12] CDK: fix typo Signed-off-by: Artem Inzhyyants --- .../declarative/declarative_component_schema.yaml | 14 +++++++------- .../models/declarative_component_schema.py | 6 ++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6b96367e..81e1d89a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2556,8 +2556,8 @@ definitions: - "$ref": "#/definitions/RecordFilter" schema_normalization: anyOf: - - "$ref": "#/definitions/SchemaNormalization" - - "$ref": "#/definitions/CustomSchemaNormalization" + - "$ref": "#/definitions/SchemaNormalization" + - "$ref": "#/definitions/CustomSchemaNormalization" default: None $parameters: type: object @@ -2573,22 +2573,22 @@ definitions: - None - Default CustomSchemaNormalization: - title: Schema Normalization - description: Responsible for normalization according to the schema. + title: Custom Schema Normalization + description: Use this to implement custom normalization according to the schema. type: object required: - type properties: type: type: string - enum: [ CustomSchemaNormalization ] + enum: [CustomSchemaNormalization] class_name: title: Class Name - description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`. + description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. type: string additionalProperties: true examples: - - "source_amazon_ads.components.LedgerDetailedViewReportsTypeTransformer" + - "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index bbe013b1..826e07ac 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1028,8 +1028,10 @@ class CustomSchemaNormalization(BaseModel): type: Literal["CustomSchemaNormalization"] class_name: Optional[str] = Field( None, - description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`.", - examples=["source_amazon_ads.components.LedgerDetailedViewReportsTypeTransformer"], + description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`.", + examples=[ + "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" + ], title="Class Name", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") From 26b99bfb365c29c0b72d663b0cbc9ff1b8c5e1a0 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 31 Dec 2024 16:02:36 +0100 Subject: [PATCH 05/12] CDK: fix schema Signed-off-by: Artem Inzhyyants --- .../sources/declarative/declarative_component_schema.yaml | 2 ++ .../declarative/models/declarative_component_schema.py | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 81e1d89a..b047baaa 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2576,8 +2576,10 @@ definitions: title: Custom Schema Normalization description: Use this to implement custom normalization according to the schema. type: object + additionalProperties: true required: - type + - class_name properties: type: type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 826e07ac..eef840fe 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1025,9 +1025,12 @@ class SchemaNormalization(Enum): class CustomSchemaNormalization(BaseModel): + class Config: + extra = Extra.allow + type: Literal["CustomSchemaNormalization"] - class_name: Optional[str] = Field( - None, + class_name: str = Field( + ..., description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`.", examples=[ "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" From 8db550a8e7a0bf89bf0b7af86af3f790c823b41a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 1 Jan 2025 12:52:11 +0100 Subject: [PATCH 06/12] CDK: reorder definition declaration Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 44 +++++++++---------- .../models/declarative_component_schema.py | 32 +++++++------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b047baaa..75d62139 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -667,6 +667,28 @@ definitions: $parameters: type: object additionalProperties: true + CustomSchemaNormalization: + title: Custom Schema Normalization + description: Use this to implement custom normalization according to the schema. + type: object + additionalProperties: true + required: + - type + - class_name + properties: + type: + type: string + enum: [ CustomSchemaNormalization ] + class_name: + title: Class Name + description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. + type: string + additionalProperties: true + examples: + - "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" + $parameters: + type: object + additionalProperties: true CustomStateMigration: title: Custom State Migration description: Apply a custom transformation on the input state. @@ -2572,28 +2594,6 @@ definitions: examples: - None - Default - CustomSchemaNormalization: - title: Custom Schema Normalization - description: Use this to implement custom normalization according to the schema. - type: object - additionalProperties: true - required: - - type - - class_name - properties: - type: - type: string - enum: [CustomSchemaNormalization] - class_name: - title: Class Name - description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. - type: string - additionalProperties: true - examples: - - "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" - $parameters: - type: object - additionalProperties: true RemoveFields: title: Remove Fields description: A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index eef840fe..6f37b763 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -268,6 +268,22 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class CustomSchemaNormalization(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["CustomSchemaNormalization"] + class_name: str = Field( + ..., + description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`.", + examples=[ + "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" + ], + title="Class Name", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class CustomStateMigration(BaseModel): class Config: extra = Extra.allow @@ -1024,22 +1040,6 @@ class SchemaNormalization(Enum): Default = "Default" -class CustomSchemaNormalization(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["CustomSchemaNormalization"] - class_name: str = Field( - ..., - description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`.", - examples=[ - "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" - ], - title="Class Name", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class RemoveFields(BaseModel): type: Literal["RemoveFields"] condition: Optional[str] = Field( From 4f64a45a729814ac1ed908b205a047ba80172c93 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 2 Jan 2025 13:20:26 +0100 Subject: [PATCH 07/12] CDK: ref Signed-off-by: Artem Inzhyyants --- .../sources/declarative/declarative_component_schema.yaml | 4 +++- .../declarative/models/declarative_component_schema.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 75d62139..2a83c234 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -669,7 +669,7 @@ definitions: additionalProperties: true CustomSchemaNormalization: title: Custom Schema Normalization - description: Use this to implement custom normalization according to the schema. + description: Schema normalization component whose behavior is derived from a custom code implementation of the connector. type: object additionalProperties: true required: @@ -2577,6 +2577,8 @@ definitions: - "$ref": "#/definitions/CustomRecordFilter" - "$ref": "#/definitions/RecordFilter" schema_normalization: + title: Schema Normalization + description: Responsible for normalization according to the schema. anyOf: - "$ref": "#/definitions/SchemaNormalization" - "$ref": "#/definitions/CustomSchemaNormalization" diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6f37b763..aed4476f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1529,8 +1529,10 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = ( - SchemaNormalization.None_ + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + SchemaNormalization.None_, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") From e7a6c0c3dfcc41051c443e7e5ead7af3e168358c Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 2 Jan 2025 14:55:30 +0100 Subject: [PATCH 08/12] CDK: fix tests Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/extractors/record_selector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 99da6bb9..a2da0ccd 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -11,6 +11,7 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.utils.transform import TypeTransformer From f787b6d126b709c8c6432106440c8a07447de482 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 2 Jan 2025 23:19:00 +0100 Subject: [PATCH 09/12] CDK: add AbstractTypeTransformer Signed-off-by: Artem Inzhyyants --- .../declarative/extractors/__init__.py | 2 + .../declarative/extractors/record_selector.py | 3 +- .../extractors/type_transformer.py | 55 +++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 airbyte_cdk/sources/declarative/extractors/type_transformer.py diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index aacac665..26152088 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -9,8 +9,10 @@ from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( ResponseToFileExtractor, ) +from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer __all__ = [ + "AbstractTypeTransformer", "HttpSelector", "DpathExtractor", "RecordFilter", diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index a2da0ccd..2d954687 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -10,6 +10,7 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter +from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation @@ -33,7 +34,7 @@ class RecordSelector(HttpSelector): extractor: RecordExtractor config: Config parameters: InitVar[Mapping[str, Any]] - schema_normalization: TypeTransformer + schema_normalization: TypeTransformer | AbstractTypeTransformer name: str _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") record_filter: Optional[RecordFilter] = None diff --git a/airbyte_cdk/sources/declarative/extractors/type_transformer.py b/airbyte_cdk/sources/declarative/extractors/type_transformer.py new file mode 100644 index 00000000..f9e107fe --- /dev/null +++ b/airbyte_cdk/sources/declarative/extractors/type_transformer.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, Mapping + + +@dataclass +class AbstractTypeTransformer(ABC): + """ + Abstract base class for implementing type transformation logic. + + This class provides a blueprint for defining custom transformations + on data records based on a provided schema. Implementing classes + must override the `transform` method to specify the transformation + logic. + + Attributes: + None explicitly defined, as this is a dataclass intended to be + subclassed. + + Methods: + transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: + Abstract method that must be implemented by subclasses. + It performs a transformation on a given data record based + on the provided schema. + + Usage: + To use this class, create a subclass that implements the + `transform` method with the desired transformation logic. + """ + + @abstractmethod + def transform( + self, + record: Dict[str, Any], + schema: Mapping[str, Any], + ) -> None: + """ + Perform a transformation on a data record based on a given schema. + + Args: + record (Dict[str, Any]): The data record to be transformed. + schema (Mapping[str, Any]): The schema that dictates how + the record should be transformed. + + Returns: + None + + Raises: + NotImplementedError: If the method is not implemented + by a subclass. + """ From eeba5cc933ffd0cb70111747fd07249ba208c1d6 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 8 Jan 2025 21:40:09 +0100 Subject: [PATCH 10/12] CDK: ref Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/extractors/record_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 2d954687..9b4f90d0 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -34,7 +34,7 @@ class RecordSelector(HttpSelector): extractor: RecordExtractor config: Config parameters: InitVar[Mapping[str, Any]] - schema_normalization: TypeTransformer | AbstractTypeTransformer + schema_normalization: Union[TypeTransformer, AbstractTypeTransformer] name: str _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") record_filter: Optional[RecordFilter] = None From e82682ad7c4a43f1a514f23955d84a23e50a69ac Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 8 Jan 2025 21:47:22 +0100 Subject: [PATCH 11/12] CDK: rename Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/extractors/__init__.py | 4 ++-- airbyte_cdk/sources/declarative/extractors/record_selector.py | 4 ++-- .../sources/declarative/extractors/type_transformer.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index 26152088..8f1d18d1 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -9,10 +9,10 @@ from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( ResponseToFileExtractor, ) -from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer +from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer __all__ = [ - "AbstractTypeTransformer", + "TypeTransformer", "HttpSelector", "DpathExtractor", "RecordFilter", diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 9b4f90d0..17232a70 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -10,7 +10,7 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter -from airbyte_cdk.sources.declarative.extractors.type_transformer import AbstractTypeTransformer +from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer as DeclarativeTypeTransformer from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation @@ -34,7 +34,7 @@ class RecordSelector(HttpSelector): extractor: RecordExtractor config: Config parameters: InitVar[Mapping[str, Any]] - schema_normalization: Union[TypeTransformer, AbstractTypeTransformer] + schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer] name: str _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") record_filter: Optional[RecordFilter] = None diff --git a/airbyte_cdk/sources/declarative/extractors/type_transformer.py b/airbyte_cdk/sources/declarative/extractors/type_transformer.py index f9e107fe..fe307684 100644 --- a/airbyte_cdk/sources/declarative/extractors/type_transformer.py +++ b/airbyte_cdk/sources/declarative/extractors/type_transformer.py @@ -8,7 +8,7 @@ @dataclass -class AbstractTypeTransformer(ABC): +class TypeTransformer(ABC): """ Abstract base class for implementing type transformation logic. From f9d22124ad6ec2318a56af37ceedbabdc073c088 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 8 Jan 2025 21:58:58 +0100 Subject: [PATCH 12/12] CDK: ref Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/declarative/extractors/record_selector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 17232a70..f29b8a75 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -10,7 +10,9 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter -from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer as DeclarativeTypeTransformer +from airbyte_cdk.sources.declarative.extractors.type_transformer import ( + TypeTransformer as DeclarativeTypeTransformer, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation