From 1953fba34c04488c1103068f3d5149f0bb29010c Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 17:50:27 +0100 Subject: [PATCH 01/15] Added items handling to dynamic schemas --- .../declarative_component_schema.yaml | 44 +++++ .../models/declarative_component_schema.py | 168 ++++++++++++------ .../parsers/model_to_component_factory.py | 47 ++++- .../sources/declarative/schema/__init__.py | 4 + .../schema/dynamic_schema_loader.py | 80 ++++++++- .../schema/test_dynamic_schema_loader.py | 36 +++- 6 files changed, 309 insertions(+), 70 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 82706ae92..c70c58fa2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1793,6 +1793,44 @@ definitions: $parameters: type: object additionalProperties: true + + PropertyTypesMap: + title: Types Map + description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. + type: object + required: + - property_name + - property_type_pinter + - type_mapping + properties: + property_name: + type: string + property_type_pointer: + title: Key Path + description: List of potentially nested fields describing the full path of the property type to extract. + type: array + items: + - type: string + type_mapping: + "$ref": "#/definitions/TypesMap" + ItemsTypeMap: + title: Types Map + description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. + type: object + required: + - items_type_pinter + - type_mapping + properties: + property_name: + type: string + items_type_pointer: + title: Items Type Path + description: List of potentially nested fields describing the full path of the items type to extract. + type: array + items: + - type: string + type_mapping: + "$ref": "#/definitions/TypesMap" TypesMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type. @@ -1817,6 +1855,12 @@ definitions: type: string interpolation_context: - raw_schema + items_type: + "$ref": "#/definitions/ItemsTypeMap" + properties_types: + type: array + items: + - "$ref": "#/definitions/PropertyTypesMap" SchemaTypeIdentifier: title: Schema Type Identifier description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d0664aa2a..3a1d24a0d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,7 +604,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -726,33 +728,6 @@ class HttpResponseFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class TypesMap(BaseModel): - target_type: Union[str, List[str]] - current_type: Union[str, List[str]] - condition: Optional[str] = None - - -class SchemaTypeIdentifier(BaseModel): - type: Optional[Literal["SchemaTypeIdentifier"]] = None - schema_pointer: Optional[List[str]] = Field( - [], - description="List of nested fields defining the schema field path to extract. Defaults to [].", - title="Schema Path", - ) - key_pointer: List[str] = Field( - ..., - description="List of potentially nested fields describing the full path of the field key to extract.", - title="Key Path", - ) - type_pointer: Optional[List[str]] = Field( - None, - description="List of potentially nested fields describing the full path of the field type to extract.", - title="Type Path", - ) - types_mapping: Optional[List[TypesMap]] = None - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class InlineSchemaLoader(BaseModel): type: Literal["InlineSchemaLoader"] schema_: Optional[Dict[str, Any]] = Field( @@ -1025,24 +1000,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1060,7 +1039,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1646,7 +1627,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1820,12 +1803,16 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2010,6 +1997,55 @@ class HttpRequester(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class PropertyTypesMap(BaseModel): + property_name: str + property_type_pointer: Optional[List[str]] = Field( + None, + description="List of potentially nested fields describing the full path of the property type to extract.", + title="Key Path", + ) + type_mapping: TypesMap + + +class ItemsTypeMap(BaseModel): + property_name: Optional[str] = None + items_type_pointer: Optional[List[str]] = Field( + None, + description="List of potentially nested fields describing the full path of the items type to extract.", + title="Items Type Path", + ) + type_mapping: TypesMap + + +class TypesMap(BaseModel): + target_type: Union[str, List[str]] + current_type: Union[str, List[str]] + condition: Optional[str] = None + items_type: Optional[ItemsTypeMap] = None + properties_types: Optional[List[PropertyTypesMap]] = None + + +class SchemaTypeIdentifier(BaseModel): + type: Optional[Literal["SchemaTypeIdentifier"]] = None + schema_pointer: Optional[List[str]] = Field( + [], + description="List of nested fields defining the schema field path to extract. Defaults to [].", + title="Schema Path", + ) + key_pointer: List[str] = Field( + ..., + description="List of potentially nested fields describing the full path of the field key to extract.", + title="Key Path", + ) + type_pointer: Optional[List[str]] = Field( + None, + description="List of potentially nested fields describing the full path of the field type to extract.", + title="Type Path", + ) + types_mapping: Optional[List[TypesMap]] = None + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class DynamicSchemaLoader(BaseModel): type: Literal["DynamicSchemaLoader"] retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( @@ -2097,7 +2133,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2141,7 +2181,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2175,7 +2217,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2243,10 +2289,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) @@ -2256,6 +2304,8 @@ class DynamicDeclarativeStream(BaseModel): SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() +PropertyTypesMap.update_forward_refs() +ItemsTypeMap.update_forward_refs() DynamicSchemaLoader.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4a17a95c3..97014e210 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -353,6 +353,12 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( TypesMap as TypesMapModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyTypesMap as PropertyTypesMapModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ItemsTypeMap as ItemsTypeMapModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -435,6 +441,8 @@ JsonFileSchemaLoader, SchemaTypeIdentifier, TypesMap, + PropertyTypesMap, + ItemsTypeMap, ) from airbyte_cdk.sources.declarative.spec import Spec from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer @@ -572,6 +580,8 @@ def _init_mappings(self) -> None: DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, SchemaTypeIdentifierModel: self.create_schema_type_identifier, TypesMapModel: self.create_types_map, + PropertyTypesMapModel: self.create_property_types_map, + ItemsTypeMapModel: self.create_items_type_map, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, ListPartitionRouterModel: self.create_list_partition_router, @@ -1894,12 +1904,45 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) - @staticmethod - def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap: + def create_property_types_map(self, model: PropertyTypesMapModel, config: Config, **kwargs: Any) -> PropertyTypesMap: + type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) + model_property_type_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.property_type_pointer] if model.property_type_pointer else [] + ) + return PropertyTypesMap( + property_name=model.property_name, + property_type_pointer=model_property_type_pointer, + type_mapping=type_mapping + ) + + def create_items_type_map(self, model: ItemsTypeMapModel, config: Config, **kwargs: Any) -> ItemsTypeMap: + type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) + model_items_type_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.items_type_pointer] if model.items_type_pointer else [] + ) + return ItemsTypeMap( + items_type_pointer=model_items_type_pointer, + type_mapping=type_mapping + ) + + def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap: + items_type = self._create_component_from_model(model=model.items_type, config=config) if model.items_type else model.items_type + + properties_types = [] + if model.properties_types: + properties_types.extend( + [ + self._create_component_from_model(property_type, config=config) + for property_type in model.properties_types + ] + ) + return TypesMap( target_type=model.target_type, current_type=model.current_type, condition=model.condition if model.condition is not None else "True", + items_type=items_type, + properties_types=properties_types, ) def create_schema_type_identifier( diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index b5b6a7d31..4bd284e95 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -7,6 +7,8 @@ DynamicSchemaLoader, SchemaTypeIdentifier, TypesMap, + PropertyTypesMap, + ItemsTypeMap, ) from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader @@ -19,5 +21,7 @@ "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", + "PropertyTypesMap", + "ItemsTypeMap", "SchemaTypeIdentifier", ] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index d65890b70..af9c67d4d 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -5,7 +5,7 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, MutableMapping, Optional, Union +from typing import Any, List, Mapping, MutableMapping, Optional, Union, Tuple import dpath from typing_extensions import deprecated @@ -45,6 +45,29 @@ } +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass(frozen=True) +class PropertyTypesMap: + """ + Represents a mapping between a current type and its corresponding target type for property. + """ + + property_name: str + property_type_pointer: List[Union[InterpolatedString, str]] + type_mapping: "TypesMap" + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass(frozen=True) +class ItemsTypeMap: + """ + Represents a mapping between a current type and its corresponding target type for item. + """ + + items_type_pointer: List[Union[InterpolatedString, str]] + type_mapping: "TypesMap" + + @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass(frozen=True) class TypesMap: @@ -55,6 +78,26 @@ class TypesMap: target_type: Union[List[str], str] current_type: Union[List[str], str] condition: Optional[str] + items_type: Optional[ItemsTypeMap] = None + properties_types: Optional[List[PropertyTypesMap]] = None + + def __post_init__(self): + """ + Ensures that only one of `items_type` or `properties_types` can be set. + Additionally, enforces that `items_type` is only used when `target_type` is a list, + and `properties_types` is only used when `target_type` is an object. + """ + # Ensure only one of `items_type` or `properties_types` is set + if self.items_type and self.properties_types: + raise ValueError("Cannot specify both 'items_type' and 'properties_types' at the same time.") + + # `items_type` is valid only for array target types + if self.items_type and self.target_type != "array": + raise ValueError("'items_type' can only be used when 'target_type' is an array.") + + # `properties_types` is valid only for object target types + if self.properties_types and self.target_type != "object": + raise ValueError("'properties_types' can only be used when 'target_type' is an object.") @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @@ -179,7 +222,8 @@ def _get_type( if field_type_path else "string" ) - mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) + mapped_field_type, mapped_additional_types = self._replace_type_if_not_valid(raw_field_type, raw_schema) + if ( isinstance(mapped_field_type, list) and len(mapped_field_type) == 2 @@ -189,7 +233,7 @@ def _get_type( second_type = self._get_airbyte_type(mapped_field_type[1]) return {"oneOf": [first_type, second_type]} elif isinstance(mapped_field_type, str): - return self._get_airbyte_type(mapped_field_type) + return self._get_airbyte_type(mapped_field_type, mapped_additional_types) else: raise ValueError( f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." @@ -199,10 +243,11 @@ def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Union[List[str], str]: + ) -> Tuple[Union[List[str], str], List]: """ Replaces a field type if it matches a type mapping in `types_map`. """ + additional_types = [] if self.schema_type_identifier.types_mapping: for types_map in self.schema_type_identifier.types_mapping: # conditional is optional param, setting to true if not provided @@ -212,18 +257,37 @@ def _replace_type_if_not_valid( ).eval(config=self.config, raw_schema=raw_schema) if field_type == types_map.current_type and condition: - return types_map.target_type - return field_type + if types_map.items_type: + items_type = self._extract_data(raw_schema, types_map.items_type.items_type_pointer) + items_type_condition = InterpolatedBoolean( + condition=types_map.items_type.type_mapping.condition if types_map.items_type.type_mapping.condition is not None else "True", + parameters={}, + ).eval(config=self.config, raw_schema=raw_schema) + + if items_type == types_map.items_type.type_mapping.current_type and items_type_condition: + additional_types = [types_map.items_type.type_mapping.target_type] + + return types_map.target_type, additional_types + return field_type, additional_types @staticmethod - def _get_airbyte_type(field_type: str) -> Mapping[str, Any]: + def _get_airbyte_type(field_type: str, additional_types: Optional[List] = None) -> Mapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. """ + print(additional_types) + if additional_types is None: + additional_types = [] + if field_type not in AIRBYTE_DATA_TYPES: raise ValueError(f"Invalid Airbyte data type: {field_type}") - return deepcopy(AIRBYTE_DATA_TYPES[field_type]) + airbyte_type = deepcopy(AIRBYTE_DATA_TYPES[field_type]) + + if field_type == "array" and additional_types: + airbyte_type["items"] = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) + + return airbyte_type def _extract_data( self, diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 4860e3e1d..21fa9c73d 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -84,7 +84,37 @@ "schema_pointer": ["fields"], "key_pointer": ["name"], "type_pointer": ["type"], - "types_mapping": [{"target_type": "string", "current_type": "singleLineText"}], + "types_mapping": [ + { + "target_type": "string", + "current_type": "singleLineText" + }, + { + "target_type": "array", + "current_type": "formula", + "items_type": + { + "items_type_pointer": ["result", "type"], + "type_mapping": {"target_type": "integer", "current_type": "customInteger"} + }, + "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", + }, + { + "target_type": "object", + "current_type": "customObjectType", + "properties_types": [ + { + "property_name": "property_1", + "type_mapping": {"target_type": "string", "current_type": "singleLineText"} + }, + { + "property_name": "property_2", + "type_mapping": {"target_type": "integer", "current_type": "customInteger"} + }, + ] + }, + + ], }, }, }, @@ -324,6 +354,7 @@ def test_dynamic_schema_loader_with_type_conditions(): "currency": {"type": ["null", "number"]}, "salary": {"type": ["null", "number"]}, "working_days": {"type": ["null", "array"]}, + "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}} }, } source = ConcurrentDeclarativeSource( @@ -365,6 +396,8 @@ def test_dynamic_schema_loader_with_type_conditions(): {"name": "FirstName", "type": "string"}, {"name": "Description", "type": "singleLineText"}, {"name": "Salary", "type": "formula", "result": {"type": "number"}}, + {"name": "AvgSalary", "type": "formula", "result": {"type": "customInteger"}}, + {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "WorkingDays", "type": "formula"}, ] @@ -376,4 +409,5 @@ def test_dynamic_schema_loader_with_type_conditions(): actual_catalog = source.discover(logger=source.logger, config=_CONFIG) assert len(actual_catalog.streams) == 1 + breakpoint() assert actual_catalog.streams[0].json_schema == expected_schema From 2a34b814b031d299aa7ed52d85aa72a17282e8e7 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 23 Jan 2025 16:56:22 +0000 Subject: [PATCH 02/15] Auto-fix lint and format issues --- .../models/declarative_component_schema.py | 90 +++++++------------ .../parsers/model_to_component_factory.py | 37 ++++---- .../sources/declarative/schema/__init__.py | 4 +- .../schema/dynamic_schema_loader.py | 27 ++++-- .../schema/test_dynamic_schema_loader.py | 36 +++++--- 5 files changed, 97 insertions(+), 97 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3a1d24a0d..106a8ff78 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,9 +604,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1000,28 +998,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1039,9 +1033,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1627,9 +1619,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1803,16 +1793,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2133,11 +2119,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2181,9 +2163,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2217,11 +2197,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2289,12 +2265,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 97014e210..8d7569239 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -244,6 +244,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ItemsTypeMap as ItemsTypeMapModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( IterableDecoder as IterableDecoderModel, ) @@ -310,6 +313,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyTypesMap as PropertyTypesMapModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -353,12 +359,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( TypesMap as TypesMapModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - PropertyTypesMap as PropertyTypesMapModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ItemsTypeMap as ItemsTypeMapModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( WaitTimeFromHeader as WaitTimeFromHeaderModel, @@ -438,11 +438,11 @@ DefaultSchemaLoader, DynamicSchemaLoader, InlineSchemaLoader, + ItemsTypeMap, JsonFileSchemaLoader, + PropertyTypesMap, SchemaTypeIdentifier, TypesMap, - PropertyTypesMap, - ItemsTypeMap, ) from airbyte_cdk.sources.declarative.spec import Spec from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer @@ -1904,7 +1904,9 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) - def create_property_types_map(self, model: PropertyTypesMapModel, config: Config, **kwargs: Any) -> PropertyTypesMap: + def create_property_types_map( + self, model: PropertyTypesMapModel, config: Config, **kwargs: Any + ) -> PropertyTypesMap: type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) model_property_type_pointer: List[Union[InterpolatedString, str]] = ( [x for x in model.property_type_pointer] if model.property_type_pointer else [] @@ -1912,21 +1914,24 @@ def create_property_types_map(self, model: PropertyTypesMapModel, config: Config return PropertyTypesMap( property_name=model.property_name, property_type_pointer=model_property_type_pointer, - type_mapping=type_mapping + type_mapping=type_mapping, ) - def create_items_type_map(self, model: ItemsTypeMapModel, config: Config, **kwargs: Any) -> ItemsTypeMap: + def create_items_type_map( + self, model: ItemsTypeMapModel, config: Config, **kwargs: Any + ) -> ItemsTypeMap: type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) model_items_type_pointer: List[Union[InterpolatedString, str]] = ( [x for x in model.items_type_pointer] if model.items_type_pointer else [] ) - return ItemsTypeMap( - items_type_pointer=model_items_type_pointer, - type_mapping=type_mapping - ) + return ItemsTypeMap(items_type_pointer=model_items_type_pointer, type_mapping=type_mapping) def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap: - items_type = self._create_component_from_model(model=model.items_type, config=config) if model.items_type else model.items_type + items_type = ( + self._create_component_from_model(model=model.items_type, config=config) + if model.items_type + else model.items_type + ) properties_types = [] if model.properties_types: diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index 4bd284e95..d0621e4fb 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -5,10 +5,10 @@ from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( DynamicSchemaLoader, + ItemsTypeMap, + PropertyTypesMap, SchemaTypeIdentifier, TypesMap, - PropertyTypesMap, - ItemsTypeMap, ) from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index af9c67d4d..de83a2aa5 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -5,7 +5,7 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, MutableMapping, Optional, Union, Tuple +from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union import dpath from typing_extensions import deprecated @@ -89,7 +89,9 @@ def __post_init__(self): """ # Ensure only one of `items_type` or `properties_types` is set if self.items_type and self.properties_types: - raise ValueError("Cannot specify both 'items_type' and 'properties_types' at the same time.") + raise ValueError( + "Cannot specify both 'items_type' and 'properties_types' at the same time." + ) # `items_type` is valid only for array target types if self.items_type and self.target_type != "array": @@ -222,7 +224,9 @@ def _get_type( if field_type_path else "string" ) - mapped_field_type, mapped_additional_types = self._replace_type_if_not_valid(raw_field_type, raw_schema) + mapped_field_type, mapped_additional_types = self._replace_type_if_not_valid( + raw_field_type, raw_schema + ) if ( isinstance(mapped_field_type, list) @@ -258,20 +262,29 @@ def _replace_type_if_not_valid( if field_type == types_map.current_type and condition: if types_map.items_type: - items_type = self._extract_data(raw_schema, types_map.items_type.items_type_pointer) + items_type = self._extract_data( + raw_schema, types_map.items_type.items_type_pointer + ) items_type_condition = InterpolatedBoolean( - condition=types_map.items_type.type_mapping.condition if types_map.items_type.type_mapping.condition is not None else "True", + condition=types_map.items_type.type_mapping.condition + if types_map.items_type.type_mapping.condition is not None + else "True", parameters={}, ).eval(config=self.config, raw_schema=raw_schema) - if items_type == types_map.items_type.type_mapping.current_type and items_type_condition: + if ( + items_type == types_map.items_type.type_mapping.current_type + and items_type_condition + ): additional_types = [types_map.items_type.type_mapping.target_type] return types_map.target_type, additional_types return field_type, additional_types @staticmethod - def _get_airbyte_type(field_type: str, additional_types: Optional[List] = None) -> Mapping[str, Any]: + def _get_airbyte_type( + field_type: str, additional_types: Optional[List] = None + ) -> Mapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. """ diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 21fa9c73d..545928afa 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -85,18 +85,17 @@ "key_pointer": ["name"], "type_pointer": ["type"], "types_mapping": [ - { - "target_type": "string", - "current_type": "singleLineText" - }, + {"target_type": "string", "current_type": "singleLineText"}, { "target_type": "array", "current_type": "formula", - "items_type": - { - "items_type_pointer": ["result", "type"], - "type_mapping": {"target_type": "integer", "current_type": "customInteger"} + "items_type": { + "items_type_pointer": ["result", "type"], + "type_mapping": { + "target_type": "integer", + "current_type": "customInteger", }, + }, "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, { @@ -105,15 +104,20 @@ "properties_types": [ { "property_name": "property_1", - "type_mapping": {"target_type": "string", "current_type": "singleLineText"} + "type_mapping": { + "target_type": "string", + "current_type": "singleLineText", + }, }, { "property_name": "property_2", - "type_mapping": {"target_type": "integer", "current_type": "customInteger"} + "type_mapping": { + "target_type": "integer", + "current_type": "customInteger", + }, }, - ] + ], }, - ], }, }, @@ -354,7 +358,7 @@ def test_dynamic_schema_loader_with_type_conditions(): "currency": {"type": ["null", "number"]}, "salary": {"type": ["null", "number"]}, "working_days": {"type": ["null", "array"]}, - "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}} + "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}, }, } source = ConcurrentDeclarativeSource( @@ -396,7 +400,11 @@ def test_dynamic_schema_loader_with_type_conditions(): {"name": "FirstName", "type": "string"}, {"name": "Description", "type": "singleLineText"}, {"name": "Salary", "type": "formula", "result": {"type": "number"}}, - {"name": "AvgSalary", "type": "formula", "result": {"type": "customInteger"}}, + { + "name": "AvgSalary", + "type": "formula", + "result": {"type": "customInteger"}, + }, {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "Currency", "type": "formula", "result": {"type": "currency"}}, {"name": "WorkingDays", "type": "formula"}, From d181da39dae6032ca227b00124b8efceef058c48 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 18:41:59 +0100 Subject: [PATCH 03/15] Fix typo --- .../sources/declarative/declarative_component_schema.yaml | 4 ++-- .../declarative/parsers/model_to_component_factory.py | 2 +- .../sources/declarative/schema/dynamic_schema_loader.py | 8 ++++++-- .../declarative/schema/test_dynamic_schema_loader.py | 1 - 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 84594b4a1..f274fa70c 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1807,7 +1807,7 @@ definitions: type: object required: - property_name - - property_type_pinter + - property_type_pointer - type_mapping properties: property_name: @@ -1825,7 +1825,7 @@ definitions: description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. type: object required: - - items_type_pinter + - items_type_pointer - type_mapping properties: property_name: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b0f532242..c74d151eb 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1930,7 +1930,7 @@ def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) items_type = ( self._create_component_from_model(model=model.items_type, config=config) if model.items_type - else model.items_type + else None ) properties_types = [] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index de83a2aa5..d205e3178 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -251,7 +251,7 @@ def _replace_type_if_not_valid( """ Replaces a field type if it matches a type mapping in `types_map`. """ - additional_types = [] + additional_types: List[str] = [] if self.schema_type_identifier.types_mapping: for types_map in self.schema_type_identifier.types_mapping: # conditional is optional param, setting to true if not provided @@ -288,7 +288,6 @@ def _get_airbyte_type( """ Maps a field type to its corresponding Airbyte type definition. """ - print(additional_types) if additional_types is None: additional_types = [] @@ -300,6 +299,11 @@ def _get_airbyte_type( if field_type == "array" and additional_types: airbyte_type["items"] = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) + # elif field_type == "object" and additional_types: + # for additional_type in additional_types: + # + # airbyte_type["properties"] = + return airbyte_type def _extract_data( diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 545928afa..811fd3392 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -417,5 +417,4 @@ def test_dynamic_schema_loader_with_type_conditions(): actual_catalog = source.discover(logger=source.logger, config=_CONFIG) assert len(actual_catalog.streams) == 1 - breakpoint() assert actual_catalog.streams[0].json_schema == expected_schema From bf913dd609d9e2411cbd46a32b26491681cbf9f3 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 20:25:16 +0100 Subject: [PATCH 04/15] Rollback properties for objects --- .../declarative_component_schema.yaml | 24 ---- .../models/declarative_component_schema.py | 106 ++++++++++-------- .../parsers/model_to_component_factory.py | 28 ----- .../sources/declarative/schema/__init__.py | 2 - .../schema/dynamic_schema_loader.py | 33 +----- .../schema/test_dynamic_schema_loader.py | 25 +---- 6 files changed, 65 insertions(+), 153 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f274fa70c..afda2c50d 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1800,26 +1800,6 @@ definitions: $parameters: type: object additionalProperties: true - - PropertyTypesMap: - title: Types Map - description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. - type: object - required: - - property_name - - property_type_pointer - - type_mapping - properties: - property_name: - type: string - property_type_pointer: - title: Key Path - description: List of potentially nested fields describing the full path of the property type to extract. - type: array - items: - - type: string - type_mapping: - "$ref": "#/definitions/TypesMap" ItemsTypeMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. @@ -1864,10 +1844,6 @@ definitions: - raw_schema items_type: "$ref": "#/definitions/ItemsTypeMap" - properties_types: - type: array - items: - - "$ref": "#/definitions/PropertyTypesMap" SchemaTypeIdentifier: title: Schema Type Identifier description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e70c04bc4..5b2804aa3 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,7 +604,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1008,24 +1010,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1043,7 +1049,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1629,7 +1637,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1803,12 +1813,16 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -1993,20 +2007,10 @@ class HttpRequester(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class PropertyTypesMap(BaseModel): - property_name: str - property_type_pointer: Optional[List[str]] = Field( - None, - description="List of potentially nested fields describing the full path of the property type to extract.", - title="Key Path", - ) - type_mapping: TypesMap - - class ItemsTypeMap(BaseModel): property_name: Optional[str] = None - items_type_pointer: Optional[List[str]] = Field( - None, + items_type_pointer: List[str] = Field( + ..., description="List of potentially nested fields describing the full path of the items type to extract.", title="Items Type Path", ) @@ -2018,7 +2022,6 @@ class TypesMap(BaseModel): current_type: Union[str, List[str]] condition: Optional[str] = None items_type: Optional[ItemsTypeMap] = None - properties_types: Optional[List[PropertyTypesMap]] = None class SchemaTypeIdentifier(BaseModel): @@ -2129,7 +2132,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2173,7 +2180,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2207,7 +2216,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2275,10 +2288,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) @@ -2288,7 +2303,6 @@ class DynamicDeclarativeStream(BaseModel): SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() -PropertyTypesMap.update_forward_refs() ItemsTypeMap.update_forward_refs() DynamicSchemaLoader.update_forward_refs() SimpleRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c74d151eb..bab6ee5dc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -313,9 +313,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - PropertyTypesMap as PropertyTypesMapModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -440,7 +437,6 @@ InlineSchemaLoader, ItemsTypeMap, JsonFileSchemaLoader, - PropertyTypesMap, SchemaTypeIdentifier, TypesMap, ) @@ -580,7 +576,6 @@ def _init_mappings(self) -> None: DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, SchemaTypeIdentifierModel: self.create_schema_type_identifier, TypesMapModel: self.create_types_map, - PropertyTypesMapModel: self.create_property_types_map, ItemsTypeMapModel: self.create_items_type_map, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, @@ -1904,19 +1899,6 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) - def create_property_types_map( - self, model: PropertyTypesMapModel, config: Config, **kwargs: Any - ) -> PropertyTypesMap: - type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) - model_property_type_pointer: List[Union[InterpolatedString, str]] = ( - [x for x in model.property_type_pointer] if model.property_type_pointer else [] - ) - return PropertyTypesMap( - property_name=model.property_name, - property_type_pointer=model_property_type_pointer, - type_mapping=type_mapping, - ) - def create_items_type_map( self, model: ItemsTypeMapModel, config: Config, **kwargs: Any ) -> ItemsTypeMap: @@ -1933,21 +1915,11 @@ def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) else None ) - properties_types = [] - if model.properties_types: - properties_types.extend( - [ - self._create_component_from_model(property_type, config=config) - for property_type in model.properties_types - ] - ) - return TypesMap( target_type=model.target_type, current_type=model.current_type, condition=model.condition if model.condition is not None else "True", items_type=items_type, - properties_types=properties_types, ) def create_schema_type_identifier( diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index d0621e4fb..b7aed69d4 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -6,7 +6,6 @@ from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( DynamicSchemaLoader, ItemsTypeMap, - PropertyTypesMap, SchemaTypeIdentifier, TypesMap, ) @@ -21,7 +20,6 @@ "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", - "PropertyTypesMap", "ItemsTypeMap", "SchemaTypeIdentifier", ] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index d205e3178..ba797e568 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -45,18 +45,6 @@ } -@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) -@dataclass(frozen=True) -class PropertyTypesMap: - """ - Represents a mapping between a current type and its corresponding target type for property. - """ - - property_name: str - property_type_pointer: List[Union[InterpolatedString, str]] - type_mapping: "TypesMap" - - @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass(frozen=True) class ItemsTypeMap: @@ -79,28 +67,15 @@ class TypesMap: current_type: Union[List[str], str] condition: Optional[str] items_type: Optional[ItemsTypeMap] = None - properties_types: Optional[List[PropertyTypesMap]] = None def __post_init__(self): """ - Ensures that only one of `items_type` or `properties_types` can be set. - Additionally, enforces that `items_type` is only used when `target_type` is a list, - and `properties_types` is only used when `target_type` is an object. + Enforces that `items_type` is only used when `target_type` is a array """ - # Ensure only one of `items_type` or `properties_types` is set - if self.items_type and self.properties_types: - raise ValueError( - "Cannot specify both 'items_type' and 'properties_types' at the same time." - ) - # `items_type` is valid only for array target types if self.items_type and self.target_type != "array": raise ValueError("'items_type' can only be used when 'target_type' is an array.") - # `properties_types` is valid only for object target types - if self.properties_types and self.target_type != "object": - raise ValueError("'properties_types' can only be used when 'target_type' is an object.") - @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass @@ -277,7 +252,6 @@ def _replace_type_if_not_valid( and items_type_condition ): additional_types = [types_map.items_type.type_mapping.target_type] - return types_map.target_type, additional_types return field_type, additional_types @@ -299,11 +273,6 @@ def _get_airbyte_type( if field_type == "array" and additional_types: airbyte_type["items"] = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) - # elif field_type == "object" and additional_types: - # for additional_type in additional_types: - # - # airbyte_type["properties"] = - return airbyte_type def _extract_data( diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 811fd3392..283be1a70 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -85,7 +85,10 @@ "key_pointer": ["name"], "type_pointer": ["type"], "types_mapping": [ - {"target_type": "string", "current_type": "singleLineText"}, + { + "target_type": "string", + "current_type": "singleLineText" + }, { "target_type": "array", "current_type": "formula", @@ -98,26 +101,6 @@ }, "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, - { - "target_type": "object", - "current_type": "customObjectType", - "properties_types": [ - { - "property_name": "property_1", - "type_mapping": { - "target_type": "string", - "current_type": "singleLineText", - }, - }, - { - "property_name": "property_2", - "type_mapping": { - "target_type": "integer", - "current_type": "customInteger", - }, - }, - ], - }, ], }, }, From 2ff96b0e4c065f73655708e228c19c36edfafc20 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 20:29:03 +0100 Subject: [PATCH 05/15] Fix typo --- .../sources/declarative/schema/dynamic_schema_loader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index ba797e568..aa46c7fe3 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -68,7 +68,7 @@ class TypesMap: condition: Optional[str] items_type: Optional[ItemsTypeMap] = None - def __post_init__(self): + def __post_init__(self) -> None: """ Enforces that `items_type` is only used when `target_type` is a array """ @@ -222,7 +222,7 @@ def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Tuple[Union[List[str], str], List]: + ) -> Tuple[Union[List[str], str], List[str]]: """ Replaces a field type if it matches a type mapping in `types_map`. """ @@ -257,7 +257,7 @@ def _replace_type_if_not_valid( @staticmethod def _get_airbyte_type( - field_type: str, additional_types: Optional[List] = None + field_type: str, additional_types: Optional[List[str]] = None ) -> Mapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. From f42e98bcec017b69af94605b02b72893087d7757 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 23 Jan 2025 19:30:06 +0000 Subject: [PATCH 06/15] Auto-fix lint and format issues --- .../models/declarative_component_schema.py | 90 +++++++------------ .../schema/test_dynamic_schema_loader.py | 5 +- 2 files changed, 33 insertions(+), 62 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5b2804aa3..a3943abdd 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,9 +604,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1010,28 +1008,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1049,9 +1043,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1637,9 +1629,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1813,16 +1803,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2132,11 +2118,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2180,9 +2162,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2216,11 +2196,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2288,12 +2264,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 283be1a70..725b0ef5b 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -85,10 +85,7 @@ "key_pointer": ["name"], "type_pointer": ["type"], "types_mapping": [ - { - "target_type": "string", - "current_type": "singleLineText" - }, + {"target_type": "string", "current_type": "singleLineText"}, { "target_type": "array", "current_type": "formula", From 1616df003149e336c632ca2a0845c4709a86ca85 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 20:56:07 +0100 Subject: [PATCH 07/15] Fix mypy --- .../schema/dynamic_schema_loader.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index aa46c7fe3..c5e60902b 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -18,7 +18,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { +AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = { "string": {"type": ["null", "string"]}, "boolean": {"type": ["null", "boolean"]}, "date": {"type": ["null", "string"], "format": "date"}, @@ -222,11 +222,11 @@ def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Tuple[Union[List[str], str], List[str]]: + ) -> Tuple[Union[List[str], str], List[Union[List[str], str]]]: """ Replaces a field type if it matches a type mapping in `types_map`. """ - additional_types: List[str] = [] + additional_types: List[Union[List[str], str]] = [] if self.schema_type_identifier.types_mapping: for types_map in self.schema_type_identifier.types_mapping: # conditional is optional param, setting to true if not provided @@ -255,9 +255,8 @@ def _replace_type_if_not_valid( return types_map.target_type, additional_types return field_type, additional_types - @staticmethod def _get_airbyte_type( - field_type: str, additional_types: Optional[List[str]] = None + self, field_type: str, additional_types: Optional[List[Union[List[str], str]]] = None ) -> Mapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. @@ -271,8 +270,22 @@ def _get_airbyte_type( airbyte_type = deepcopy(AIRBYTE_DATA_TYPES[field_type]) if field_type == "array" and additional_types: - airbyte_type["items"] = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) - + if ( + isinstance(additional_types[0], list) + and len(additional_types[0]) == 2 + and all(isinstance(item, str) for item in additional_types[0]) + ): + first_type = self._get_airbyte_type(additional_types[0][0]) + second_type = self._get_airbyte_type(additional_types[0][1]) + items_type = {"oneOf": [first_type, second_type]} + elif isinstance(additional_types[0], str): + items_type = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) # type: ignore[arg-type] + else: + raise ValueError( + f"Invalid data type. Available string or two items list of string. Got {additional_types[0]}." + ) + + airbyte_type["items"] = items_type return airbyte_type def _extract_data( From 4412b72aa70e925f6bc4c052c695ab79b49adf2e Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 23 Jan 2025 19:58:13 +0000 Subject: [PATCH 08/15] Auto-fix lint and format issues --- .../sources/declarative/schema/dynamic_schema_loader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index c5e60902b..282a98d1b 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -271,9 +271,9 @@ def _get_airbyte_type( if field_type == "array" and additional_types: if ( - isinstance(additional_types[0], list) - and len(additional_types[0]) == 2 - and all(isinstance(item, str) for item in additional_types[0]) + isinstance(additional_types[0], list) + and len(additional_types[0]) == 2 + and all(isinstance(item, str) for item in additional_types[0]) ): first_type = self._get_airbyte_type(additional_types[0][0]) second_type = self._get_airbyte_type(additional_types[0][1]) From d21a122d5c4a8c13c793ec3415cffb60d37859b6 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 23 Jan 2025 23:30:24 +0100 Subject: [PATCH 09/15] Rollback dynamic schema loader --- .../declarative_component_schema.yaml | 28 +-- .../models/declarative_component_schema.py | 168 ++++++++++-------- .../parsers/model_to_component_factory.py | 4 +- .../schema/dynamic_schema_loader.py | 85 ++------- 4 files changed, 127 insertions(+), 158 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index afda2c50d..d5a6e5f93 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1800,24 +1800,30 @@ definitions: $parameters: type: object additionalProperties: true + SchemaFieldType: + title: Schema Field Type + description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. + type: object + required: + - field_type + properties: + field_type: + type: string + items: + "$ref": "#/definitions/SchemaFieldType" ItemsTypeMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. type: object required: - - items_type_pointer - - type_mapping + - target_type properties: - property_name: + condition: type: string - items_type_pointer: - title: Items Type Path - description: List of potentially nested fields describing the full path of the items type to extract. - type: array - items: - - type: string - type_mapping: - "$ref": "#/definitions/TypesMap" + interpolation_context: + - raw_schema + target_type: + "$ref": "#/definitions/SchemaFieldType" TypesMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a3943abdd..3f1d96e42 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,7 +604,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -736,6 +738,44 @@ class HttpResponseFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class SchemaFieldType(BaseModel): + field_type: str + items: Optional[SchemaFieldType] = None + + +class ItemsTypeMap(BaseModel): + condition: Optional[str] = None + target_type: SchemaFieldType + + +class TypesMap(BaseModel): + target_type: Union[str, List[str]] + current_type: Union[str, List[str]] + condition: Optional[str] = None + items_type: Optional[ItemsTypeMap] = None + + +class SchemaTypeIdentifier(BaseModel): + type: Optional[Literal["SchemaTypeIdentifier"]] = None + schema_pointer: Optional[List[str]] = Field( + [], + description="List of nested fields defining the schema field path to extract. Defaults to [].", + title="Schema Path", + ) + key_pointer: List[str] = Field( + ..., + description="List of potentially nested fields describing the full path of the field key to extract.", + title="Key Path", + ) + type_pointer: Optional[List[str]] = Field( + None, + description="List of potentially nested fields describing the full path of the field type to extract.", + title="Type Path", + ) + types_mapping: Optional[List[TypesMap]] = None + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class InlineSchemaLoader(BaseModel): type: Literal["InlineSchemaLoader"] schema_: Optional[Dict[str, Any]] = Field( @@ -1008,24 +1048,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1043,7 +1087,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1629,7 +1675,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1803,12 +1851,16 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -1993,44 +2045,6 @@ class HttpRequester(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class ItemsTypeMap(BaseModel): - property_name: Optional[str] = None - items_type_pointer: List[str] = Field( - ..., - description="List of potentially nested fields describing the full path of the items type to extract.", - title="Items Type Path", - ) - type_mapping: TypesMap - - -class TypesMap(BaseModel): - target_type: Union[str, List[str]] - current_type: Union[str, List[str]] - condition: Optional[str] = None - items_type: Optional[ItemsTypeMap] = None - - -class SchemaTypeIdentifier(BaseModel): - type: Optional[Literal["SchemaTypeIdentifier"]] = None - schema_pointer: Optional[List[str]] = Field( - [], - description="List of nested fields defining the schema field path to extract. Defaults to [].", - title="Schema Path", - ) - key_pointer: List[str] = Field( - ..., - description="List of potentially nested fields describing the full path of the field key to extract.", - title="Key Path", - ) - type_pointer: Optional[List[str]] = Field( - None, - description="List of potentially nested fields describing the full path of the field type to extract.", - title="Type Path", - ) - types_mapping: Optional[List[TypesMap]] = None - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class DynamicSchemaLoader(BaseModel): type: Literal["DynamicSchemaLoader"] retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( @@ -2118,7 +2132,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2162,7 +2180,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2196,7 +2216,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -2264,20 +2288,22 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) +SchemaFieldType.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() -ItemsTypeMap.update_forward_refs() DynamicSchemaLoader.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index bab6ee5dc..db7dcc238 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1911,8 +1911,8 @@ def create_items_type_map( def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap: items_type = ( self._create_component_from_model(model=model.items_type, config=config) - if model.items_type - else None + if isinstance(model.items_type, ItemsTypeMapModel) + else model.items_type ) return TypesMap( diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index c5e60902b..d65890b70 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -5,7 +5,7 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, List, Mapping, MutableMapping, Optional, Union import dpath from typing_extensions import deprecated @@ -18,7 +18,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = { +AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { "string": {"type": ["null", "string"]}, "boolean": {"type": ["null", "boolean"]}, "date": {"type": ["null", "string"], "format": "date"}, @@ -45,17 +45,6 @@ } -@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) -@dataclass(frozen=True) -class ItemsTypeMap: - """ - Represents a mapping between a current type and its corresponding target type for item. - """ - - items_type_pointer: List[Union[InterpolatedString, str]] - type_mapping: "TypesMap" - - @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass(frozen=True) class TypesMap: @@ -66,15 +55,6 @@ class TypesMap: target_type: Union[List[str], str] current_type: Union[List[str], str] condition: Optional[str] - items_type: Optional[ItemsTypeMap] = None - - def __post_init__(self) -> None: - """ - Enforces that `items_type` is only used when `target_type` is a array - """ - # `items_type` is valid only for array target types - if self.items_type and self.target_type != "array": - raise ValueError("'items_type' can only be used when 'target_type' is an array.") @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @@ -199,10 +179,7 @@ def _get_type( if field_type_path else "string" ) - mapped_field_type, mapped_additional_types = self._replace_type_if_not_valid( - raw_field_type, raw_schema - ) - + mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema) if ( isinstance(mapped_field_type, list) and len(mapped_field_type) == 2 @@ -212,7 +189,7 @@ def _get_type( second_type = self._get_airbyte_type(mapped_field_type[1]) return {"oneOf": [first_type, second_type]} elif isinstance(mapped_field_type, str): - return self._get_airbyte_type(mapped_field_type, mapped_additional_types) + return self._get_airbyte_type(mapped_field_type) else: raise ValueError( f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." @@ -222,11 +199,10 @@ def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Tuple[Union[List[str], str], List[Union[List[str], str]]]: + ) -> Union[List[str], str]: """ Replaces a field type if it matches a type mapping in `types_map`. """ - additional_types: List[Union[List[str], str]] = [] if self.schema_type_identifier.types_mapping: for types_map in self.schema_type_identifier.types_mapping: # conditional is optional param, setting to true if not provided @@ -236,57 +212,18 @@ def _replace_type_if_not_valid( ).eval(config=self.config, raw_schema=raw_schema) if field_type == types_map.current_type and condition: - if types_map.items_type: - items_type = self._extract_data( - raw_schema, types_map.items_type.items_type_pointer - ) - items_type_condition = InterpolatedBoolean( - condition=types_map.items_type.type_mapping.condition - if types_map.items_type.type_mapping.condition is not None - else "True", - parameters={}, - ).eval(config=self.config, raw_schema=raw_schema) - - if ( - items_type == types_map.items_type.type_mapping.current_type - and items_type_condition - ): - additional_types = [types_map.items_type.type_mapping.target_type] - return types_map.target_type, additional_types - return field_type, additional_types - - def _get_airbyte_type( - self, field_type: str, additional_types: Optional[List[Union[List[str], str]]] = None - ) -> Mapping[str, Any]: + return types_map.target_type + return field_type + + @staticmethod + def _get_airbyte_type(field_type: str) -> Mapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. """ - if additional_types is None: - additional_types = [] - if field_type not in AIRBYTE_DATA_TYPES: raise ValueError(f"Invalid Airbyte data type: {field_type}") - airbyte_type = deepcopy(AIRBYTE_DATA_TYPES[field_type]) - - if field_type == "array" and additional_types: - if ( - isinstance(additional_types[0], list) - and len(additional_types[0]) == 2 - and all(isinstance(item, str) for item in additional_types[0]) - ): - first_type = self._get_airbyte_type(additional_types[0][0]) - second_type = self._get_airbyte_type(additional_types[0][1]) - items_type = {"oneOf": [first_type, second_type]} - elif isinstance(additional_types[0], str): - items_type = deepcopy(AIRBYTE_DATA_TYPES[additional_types[0]]) # type: ignore[arg-type] - else: - raise ValueError( - f"Invalid data type. Available string or two items list of string. Got {additional_types[0]}." - ) - - airbyte_type["items"] = items_type - return airbyte_type + return deepcopy(AIRBYTE_DATA_TYPES[field_type]) def _extract_data( self, From 52683a46869057f922d40129a1a737a99eec8208 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 24 Jan 2025 00:36:06 +0100 Subject: [PATCH 10/15] Update to complex type resolving --- .../declarative_component_schema.yaml | 24 +++------ .../models/declarative_component_schema.py | 14 ++--- .../parsers/model_to_component_factory.py | 33 ++++++------ .../sources/declarative/schema/__init__.py | 4 +- .../schema/dynamic_schema_loader.py | 51 +++++++++++++++++-- .../schema/test_dynamic_schema_loader.py | 17 +++---- 6 files changed, 85 insertions(+), 58 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index d5a6e5f93..70e9e075d 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1800,9 +1800,9 @@ definitions: $parameters: type: object additionalProperties: true - SchemaFieldType: + ComplexFieldType: title: Schema Field Type - description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. + description: (This component is experimental. Use at your own risk.) Represents a complex field type. type: object required: - field_type @@ -1810,20 +1810,9 @@ definitions: field_type: type: string items: - "$ref": "#/definitions/SchemaFieldType" - ItemsTypeMap: - title: Types Map - description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property. - type: object - required: - - target_type - properties: - condition: - type: string - interpolation_context: - - raw_schema - target_type: - "$ref": "#/definitions/SchemaFieldType" + anyOf: + - type: string + - "$ref": "#/definitions/ComplexFieldType" TypesMap: title: Types Map description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type. @@ -1838,6 +1827,7 @@ definitions: - type: array items: type: string + - "$ref": "#/definitions/ComplexFieldType" current_type: anyOf: - type: string @@ -1848,8 +1838,6 @@ definitions: type: string interpolation_context: - raw_schema - items_type: - "$ref": "#/definitions/ItemsTypeMap" SchemaTypeIdentifier: title: Schema Type Identifier description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3f1d96e42..51aec5bbd 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -738,21 +738,15 @@ class HttpResponseFilter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class SchemaFieldType(BaseModel): +class ComplexFieldType(BaseModel): field_type: str - items: Optional[SchemaFieldType] = None - - -class ItemsTypeMap(BaseModel): - condition: Optional[str] = None - target_type: SchemaFieldType + items: Optional[Union[str, ComplexFieldType]] = None class TypesMap(BaseModel): - target_type: Union[str, List[str]] + target_type: Union[str, List[str], ComplexFieldType] current_type: Union[str, List[str]] condition: Optional[str] = None - items_type: Optional[ItemsTypeMap] = None class SchemaTypeIdentifier(BaseModel): @@ -2297,7 +2291,7 @@ class DynamicDeclarativeStream(BaseModel): ) -SchemaFieldType.update_forward_refs() +ComplexFieldType.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index db7dcc238..c28977b88 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -245,7 +245,7 @@ InlineSchemaLoader as InlineSchemaLoaderModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ItemsTypeMap as ItemsTypeMapModel, + ComplexFieldType as ComplexFieldTypeModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( IterableDecoder as IterableDecoderModel, @@ -435,7 +435,7 @@ DefaultSchemaLoader, DynamicSchemaLoader, InlineSchemaLoader, - ItemsTypeMap, + ComplexFieldType, JsonFileSchemaLoader, SchemaTypeIdentifier, TypesMap, @@ -576,7 +576,7 @@ def _init_mappings(self) -> None: DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, SchemaTypeIdentifierModel: self.create_schema_type_identifier, TypesMapModel: self.create_types_map, - ItemsTypeMapModel: self.create_items_type_map, + ComplexFieldTypeModel: self.create_complex_field_type, JwtAuthenticatorModel: self.create_jwt_authenticator, LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration, ListPartitionRouterModel: self.create_list_partition_router, @@ -1899,27 +1899,28 @@ def create_inline_schema_loader( ) -> InlineSchemaLoader: return InlineSchemaLoader(schema=model.schema_ or {}, parameters={}) - def create_items_type_map( - self, model: ItemsTypeMapModel, config: Config, **kwargs: Any - ) -> ItemsTypeMap: - type_mapping = self._create_component_from_model(model=model.type_mapping, config=config) - model_items_type_pointer: List[Union[InterpolatedString, str]] = ( - [x for x in model.items_type_pointer] if model.items_type_pointer else [] + def create_complex_field_type( + self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any + ) -> ComplexFieldType: + items = ( + self._create_component_from_model(model=model.items, config=config) + if isinstance(model.items, ComplexFieldTypeModel) + else model.items ) - return ItemsTypeMap(items_type_pointer=model_items_type_pointer, type_mapping=type_mapping) + + return ComplexFieldType(field_type=model.field_type, items=items) def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap: - items_type = ( - self._create_component_from_model(model=model.items_type, config=config) - if isinstance(model.items_type, ItemsTypeMapModel) - else model.items_type + target_type = ( + self._create_component_from_model(model=model.target_type, config=config) + if isinstance(model.target_type, ComplexFieldTypeModel) + else model.target_type ) return TypesMap( - target_type=model.target_type, + target_type=target_type, current_type=model.current_type, condition=model.condition if model.condition is not None else "True", - items_type=items_type, ) def create_schema_type_identifier( diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index b7aed69d4..bb9d05f2f 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -5,8 +5,8 @@ from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( DynamicSchemaLoader, - ItemsTypeMap, SchemaTypeIdentifier, + ComplexFieldType, TypesMap, ) from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader @@ -19,7 +19,7 @@ "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", + "ComplexFieldType", "TypesMap", - "ItemsTypeMap", "SchemaTypeIdentifier", ] diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index d65890b70..e9241a513 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -18,7 +18,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config, StreamSlice, StreamState -AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { +AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = { "string": {"type": ["null", "string"]}, "boolean": {"type": ["null", "boolean"]}, "date": {"type": ["null", "string"], "format": "date"}, @@ -45,6 +45,25 @@ } +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass(frozen=True) +class ComplexFieldType: + """ + Identifies complex field type + """ + + field_type: str + items: Optional[Union[str, "ComplexFieldType"]] = None + + def __post_init__(self) -> None: + """ + Enforces that `items` is only used when `field_type` is a array + """ + # `items_type` is valid only for array target types + if self.items and self.field_type != "array": + raise ValueError("'items' can only be used when 'field_type' is an array.") + + @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass(frozen=True) class TypesMap: @@ -52,7 +71,7 @@ class TypesMap: Represents a mapping between a current type and its corresponding target type. """ - target_type: Union[List[str], str] + target_type: Union[List[str], str, ComplexFieldType] current_type: Union[List[str], str] condition: Optional[str] @@ -188,13 +207,39 @@ def _get_type( first_type = self._get_airbyte_type(mapped_field_type[0]) second_type = self._get_airbyte_type(mapped_field_type[1]) return {"oneOf": [first_type, second_type]} + elif isinstance(mapped_field_type, str): return self._get_airbyte_type(mapped_field_type) + + elif isinstance(mapped_field_type, ComplexFieldType): + return self._resolve_complex_type(mapped_field_type) + else: raise ValueError( f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}." ) + def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: + types = [complex_type] + resolved_type = {} + + while types: + current_type = types.pop() + if not current_type.items: + resolved_type = self._get_airbyte_type(current_type.field_type) + else: + field_type = self._get_airbyte_type(current_type.field_type) + + if isinstance(current_type.items, str): + items_type = current_type.items + else: + types.append(current_type.items) + continue # Skip the following lines until the stack is resolved + field_type["items"] = self._get_airbyte_type(items_type) + resolved_type = field_type + + return resolved_type + def _replace_type_if_not_valid( self, field_type: Union[List[str], str], @@ -216,7 +261,7 @@ def _replace_type_if_not_valid( return field_type @staticmethod - def _get_airbyte_type(field_type: str) -> Mapping[str, Any]: + def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]: """ Maps a field type to its corresponding Airbyte type definition. """ diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 725b0ef5b..4d933543b 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -85,17 +85,16 @@ "key_pointer": ["name"], "type_pointer": ["type"], "types_mapping": [ - {"target_type": "string", "current_type": "singleLineText"}, { - "target_type": "array", - "current_type": "formula", - "items_type": { - "items_type_pointer": ["result", "type"], - "type_mapping": { - "target_type": "integer", - "current_type": "customInteger", - }, + "target_type": "string", + "current_type": "singleLineText" + }, + { + "target_type": { + "field_type": "array", + "items": "integer" }, + "current_type": "formula", "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, ], From 6682d9f2a691762858d8eec5979539462260e27d Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 23 Jan 2025 23:39:57 +0000 Subject: [PATCH 11/15] Auto-fix lint and format issues --- .../models/declarative_component_schema.py | 90 +++++++------------ .../parsers/model_to_component_factory.py | 8 +- .../sources/declarative/schema/__init__.py | 2 +- .../schema/test_dynamic_schema_loader.py | 10 +-- 4 files changed, 39 insertions(+), 71 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 51aec5bbd..db8e80f56 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -604,9 +604,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1042,28 +1040,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1081,9 +1075,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1669,9 +1661,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1845,16 +1835,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2126,11 +2112,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2174,9 +2156,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2210,11 +2190,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2282,12 +2258,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c28977b88..c16476dae 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -133,6 +133,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ComplexFieldType as ComplexFieldTypeModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ComponentMappingDefinition as ComponentMappingDefinitionModel, ) @@ -244,9 +247,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ComplexFieldType as ComplexFieldTypeModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( IterableDecoder as IterableDecoderModel, ) @@ -432,10 +432,10 @@ SimpleRetrieverTestReadDecorator, ) from airbyte_cdk.sources.declarative.schema import ( + ComplexFieldType, DefaultSchemaLoader, DynamicSchemaLoader, InlineSchemaLoader, - ComplexFieldType, JsonFileSchemaLoader, SchemaTypeIdentifier, TypesMap, diff --git a/airbyte_cdk/sources/declarative/schema/__init__.py b/airbyte_cdk/sources/declarative/schema/__init__.py index bb9d05f2f..cad0c2f06 100644 --- a/airbyte_cdk/sources/declarative/schema/__init__.py +++ b/airbyte_cdk/sources/declarative/schema/__init__.py @@ -4,9 +4,9 @@ from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import ( + ComplexFieldType, DynamicSchemaLoader, SchemaTypeIdentifier, - ComplexFieldType, TypesMap, ) from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 4d933543b..178d5a1f2 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -85,15 +85,9 @@ "key_pointer": ["name"], "type_pointer": ["type"], "types_mapping": [ + {"target_type": "string", "current_type": "singleLineText"}, { - "target_type": "string", - "current_type": "singleLineText" - }, - { - "target_type": { - "field_type": "array", - "items": "integer" - }, + "target_type": {"field_type": "array", "items": "integer"}, "current_type": "formula", "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, From 73bda5532f2ce9452638952fe99693fc0c8563cb Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 24 Jan 2025 00:42:07 +0100 Subject: [PATCH 12/15] Fix mypy --- .../sources/declarative/schema/dynamic_schema_loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index e9241a513..d0fb81c8b 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -221,7 +221,7 @@ def _get_type( def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: types = [complex_type] - resolved_type = {} + resolved_type: MutableMapping[str, Any] = {} while types: current_type = types.pop() @@ -244,7 +244,7 @@ def _replace_type_if_not_valid( self, field_type: Union[List[str], str], raw_schema: MutableMapping[str, Any], - ) -> Union[List[str], str]: + ) -> Union[List[str], str, ComplexFieldType]: """ Replaces a field type if it matches a type mapping in `types_map`. """ From 0ee84d50d0bcb8b6351b12b8335f4c8b459f605d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 24 Jan 2025 06:20:19 +0100 Subject: [PATCH 13/15] Fix complex type resolving --- .../schema/dynamic_schema_loader.py | 30 +++++++------------ .../schema/test_dynamic_schema_loader.py | 7 +++-- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index d0fb81c8b..81fb780ea 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -154,8 +154,9 @@ def get_json_schema(self) -> Mapping[str, Any]: transformed_properties = self._transform(properties, {}) return { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": True, "properties": transformed_properties, } @@ -220,25 +221,14 @@ def _get_type( ) def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]: - types = [complex_type] - resolved_type: MutableMapping[str, Any] = {} - - while types: - current_type = types.pop() - if not current_type.items: - resolved_type = self._get_airbyte_type(current_type.field_type) - else: - field_type = self._get_airbyte_type(current_type.field_type) - - if isinstance(current_type.items, str): - items_type = current_type.items - else: - types.append(current_type.items) - continue # Skip the following lines until the stack is resolved - field_type["items"] = self._get_airbyte_type(items_type) - resolved_type = field_type - - return resolved_type + if not complex_type.items: + return self._get_airbyte_type(complex_type.field_type) + + field_type = self._get_airbyte_type(complex_type.field_type) + field_type["items"] = self._get_airbyte_type(complex_type.items) if isinstance(complex_type.items, str) else self._resolve_complex_type( + complex_type.items) + + return field_type def _replace_type_if_not_valid( self, diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 178d5a1f2..1c997feb2 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -87,7 +87,7 @@ "types_mapping": [ {"target_type": "string", "current_type": "singleLineText"}, { - "target_type": {"field_type": "array", "items": "integer"}, + "target_type": {"field_type": "array", "items": {"field_type": "array", "items": "integer"}}, "current_type": "formula", "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, @@ -321,7 +321,8 @@ def test_dynamic_schema_loader_with_type_conditions(): ]["types_mapping"].append({"target_type": "array", "current_type": "formula"}) expected_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "id": {"type": ["null", "integer"]}, @@ -331,7 +332,7 @@ def test_dynamic_schema_loader_with_type_conditions(): "currency": {"type": ["null", "number"]}, "salary": {"type": ["null", "number"]}, "working_days": {"type": ["null", "array"]}, - "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}, + "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}}, }, } source = ConcurrentDeclarativeSource( From f59cd42a7f8efe1ba43f23b41cde488a3aaf8e86 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 24 Jan 2025 06:28:09 +0100 Subject: [PATCH 14/15] Fix typo --- .../schema/test_dynamic_schema_loader.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 1c997feb2..7badfe64c 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -157,7 +157,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "name": {"type": ["null", "string"]}, @@ -178,7 +179,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "name": {"type": ["null", "string"]}, @@ -198,7 +200,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): ] ), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "address": { @@ -211,7 +214,8 @@ def dynamic_schema_loader(mock_retriever, mock_schema_type_identifier): # Test case: Empty record set iter([]), { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": {}, }, @@ -249,7 +253,8 @@ def test_dynamic_schema_loader_invalid_type(dynamic_schema_loader): def test_dynamic_schema_loader_manifest_flow(): expected_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", + "$schema": "https://json-schema.org/draft-07/schema#", + "additionalProperties": True, "type": "object", "properties": { "id": {"type": ["null", "integer"]}, From 30d92c7e00b19b99ee813e487f5fe40048f69b17 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 24 Jan 2025 05:29:50 +0000 Subject: [PATCH 15/15] Auto-fix lint and format issues --- .../declarative/schema/dynamic_schema_loader.py | 7 +++++-- .../declarative/schema/test_dynamic_schema_loader.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index 81fb780ea..824d0f8f7 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -225,8 +225,11 @@ def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, return self._get_airbyte_type(complex_type.field_type) field_type = self._get_airbyte_type(complex_type.field_type) - field_type["items"] = self._get_airbyte_type(complex_type.items) if isinstance(complex_type.items, str) else self._resolve_complex_type( - complex_type.items) + field_type["items"] = ( + self._get_airbyte_type(complex_type.items) + if isinstance(complex_type.items, str) + else self._resolve_complex_type(complex_type.items) + ) return field_type diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 7badfe64c..4d9af8667 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -87,7 +87,10 @@ "types_mapping": [ {"target_type": "string", "current_type": "singleLineText"}, { - "target_type": {"field_type": "array", "items": {"field_type": "array", "items": "integer"}}, + "target_type": { + "field_type": "array", + "items": {"field_type": "array", "items": "integer"}, + }, "current_type": "formula", "condition": "{{ raw_schema['result']['type'] == 'customInteger' }}", }, @@ -337,7 +340,10 @@ def test_dynamic_schema_loader_with_type_conditions(): "currency": {"type": ["null", "number"]}, "salary": {"type": ["null", "number"]}, "working_days": {"type": ["null", "array"]}, - "avg_salary": {"type": ["null", "array"], "items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}}, + "avg_salary": { + "type": ["null", "array"], + "items": {"type": ["null", "array"], "items": {"type": ["null", "integer"]}}, + }, }, } source = ConcurrentDeclarativeSource(