diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 12ca3b440..70e9e075d 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1800,6 +1800,19 @@ definitions: $parameters: type: object additionalProperties: true + ComplexFieldType: + title: Schema Field Type + description: (This component is experimental. Use at your own risk.) Represents a complex field type. + type: object + required: + - field_type + properties: + field_type: + type: string + items: + anyOf: + - type: string + - "$ref": "#/definitions/ComplexFieldType" TypesMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type. @@ -1814,6 +1827,7 @@ definitions: - type: array items: type: string + - "$ref": "#/definitions/ComplexFieldType" current_type: anyOf: - type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5feb6b2c4..db8e80f56 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -736,8 +736,13 @@ class HttpResponseFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class ComplexFieldType(BaseModel): + field_type: str + items: Optional[Union[str, ComplexFieldType]] = None + + class TypesMap(BaseModel): - target_type: Union[str, List[str]] + target_type: Union[str, List[str], ComplexFieldType] current_type: Union[str, List[str]] condition: Optional[str] = None @@ -2260,6 +2265,7 @@ class DynamicDeclarativeStream(BaseModel): ) +ComplexFieldType.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3fd301947..172946f17 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -133,6 +133,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ComplexFieldType as ComplexFieldTypeModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ComponentMappingDefinition as ComponentMappingDefinitionModel, ) @@ -429,6 +432,7 @@ SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.declarative.schema import ( + ComplexFieldType, DefaultSchemaLoader, DynamicSchemaLoader, InlineSchemaLoader, @@ -572,6 +576,7 @@ def _init_mappings(self) -> None: DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, SchemaTypeIdentifierModel: self.create_schema_type_identifier, TypesMapModel: self.create_types_map, + ComplexFieldTypeModel: self.create_complex_field_type, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, ListPartitionRouterModel: self.create_list_partition_router, @@ -1894,10 +1899,26 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) - @staticmethod - def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap: + def create_complex_field_type( + self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any + ) -> ComplexFieldType: + items = ( + self._create_component_from_model(model=model.items, config=config) + if isinstance(model.items, ComplexFieldTypeModel) + else model.items + ) + + return ComplexFieldType(field_type=model.field_type, items=items) + + def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap: + target_type = ( + self._create_component_from_model(model=model.target_type, config=config) + if isinstance(model.target_type, ComplexFieldTypeModel) + else model.target_type + ) + return TypesMap( - target_type=model.target_type, + target_type=target_type, current_type=model.current_type, condition=model.condition if model.condition is not None else "True", ) diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index b5b6a7d31..cad0c2f06 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -4,6 +4,7 @@ from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( + ComplexFieldType, DynamicSchemaLoader, SchemaTypeIdentifier, TypesMap, @@ -18,6 +19,7 @@ "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", + "ComplexFieldType", "TypesMap", "SchemaTypeIdentifier", ] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index d65890b70..824d0f8f7 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -18,7 +18,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { +AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = { "string": {"type": ["null", "string"]}, "boolean": {"type": ["null", "boolean"]}, "date": {"type": ["null", "string"], "format": "date"}, @@ -45,6 +45,25 @@ } +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass(frozen=True) +class ComplexFieldType: + """ + Identifies complex field type + """ + + field_type: str + items: Optional[Union[str, "ComplexFieldType"]] = None + + def __post_init__(self) -> None: + """ + Enforces that `items` is only used when `field_type` is a array + """ + # `items_type` is valid only for array target types + if self.items and self.field_type != "array": + raise ValueError("'items' can only be used when 'field_type' is an array.") + + @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass(frozen=True) class TypesMap: @@ -52,7 +71,7 @@ class TypesMap: Represents a mapping between a current type and its corresponding target type. """ - target_type: Union[List[str], str] + target_type: Union[List[str], str, ComplexFieldType] current_type: Union[List[str], str] condition: Optional[str] @@ -135,8 +154,9 @@ def get_json_schema(self) -> Mapping[str, Any]: transformed_properties = self._transform(properties, {}) return { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": True, "properties": transformed_properties, } @@ -188,18 +208,36 @@ def _get_type( first_type = self._get_airbyte_type(mapped_field_type[0]) second_type = self._get_airbyte_type(mapped_field_type[1]) return {"oneOf": [first_type, second_type]} + elif isinstance(mapped_field_type, str): return self._get_airbyte_type(mapped_field_type) + + elif isinstance(mapped_field_type, ComplexFieldType): + return self._resolve_complex_type(mapped_field_type) + else: raise ValueError( f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." ) + def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: + if not complex_type.items: + return self._get_airbyte_type(complex_type.field_type) + + field_type = self._get_airbyte_type(complex_type.field_type) + field_type["items"] = ( + self._get_airbyte_type(complex_type.items) + if isinstance(complex_type.items, str) + else self._resolve_complex_type(complex_type.items) + ) + + return field_type + def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Union[List[str], str]: + ) -> Union[List[str], str, ComplexFieldType]: """ Replaces a field type if it matches a type mapping in `types_map`. """ @@ -216,7 +254,7 @@ def _replace_type_if_not_valid( return field_type @staticmethod - def _get_airbyte_type(field_type: str) -> Mapping[str, Any]: + def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. """ diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 4860e3e1d..4d9af8667 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -84,7 +84,17 @@ "schema_pointer": ["fields"], "key_pointer": ["name"], "type_pointer": ["type"], - "types_mapping": [{"target_type": "string", "current_type": "singleLineText"}], + "types_mapping": [ + {"target_type": "string", "current_type": "singleLineText"}, + { + "target_type": { + "field_type": "array", + "items": {"field_type": "array", "items": "integer"}, + }, + "current_type": "formula", + "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", + }, + ], }, }, }, @@ -150,7 +160,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "name": {"type": ["null", "string"]}, @@ -171,7 +182,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "name": {"type": ["null", "string"]}, @@ -191,7 +203,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "address": { @@ -204,7 +217,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): # Test case: Empty record set iter([]), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": {}, }, @@ -242,7 +256,8 @@ def test_dynamic_schema_loader_invalid_type(dynamic_schema_loader): def test_dynamic_schema_loader_manifest_flow(): expected_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "id": {"type": ["null", "integer"]}, @@ -314,7 +329,8 @@ def test_dynamic_schema_loader_with_type_conditions(): ]["types_mapping"].append({"target_type": "array", "current_type": "formula"}) expected_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "id": {"type": ["null", "integer"]}, @@ -324,6 +340,10 @@ def test_dynamic_schema_loader_with_type_conditions(): "currency": {"type": ["null", "number"]}, "salary": {"type": ["null", "number"]}, "working_days": {"type": ["null", "array"]}, + "avg_salary": { + "type": ["null", "array"], + "items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}, + }, }, } source = ConcurrentDeclarativeSource( @@ -365,6 +385,12 @@ def test_dynamic_schema_loader_with_type_conditions(): {"name": "FirstName", "type": "string"}, {"name": "Description", "type": "singleLineText"}, {"name": "Salary", "type": "formula", "result": {"type": "number"}}, + { + "name": "AvgSalary", + "type": "formula", + "result": {"type": "customInteger"}, + }, + {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "WorkingDays", "type": "formula"}, ]