From e43343cc633b02a0f7497038d51f0ebf8368c8c7 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 6 Dec 2024 19:32:36 +0100 Subject: [PATCH 01/12] Add config components resolver --- .../declarative_component_schema.yaml | 58 ++++++- .../manifest_declarative_source.py | 10 +- .../models/declarative_component_schema.py | 130 ++++++++++------ .../parsers/manifest_component_transformer.py | 5 +- .../parsers/model_to_component_factory.py | 50 +++++- .../sources/declarative/resolvers/__init__.py | 7 +- .../resolvers/config_components_resolver.py | 136 +++++++++++++++++ .../test_config_components_resolver.py | 142 ++++++++++++++++++ .../test_http_components_resolver.py | 22 ++- 9 files changed, 503 insertions(+), 57 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py create mode 100644 unit_tests/sources/declarative/resolvers/test_config_components_resolver.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dff7abab6..200372a6b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2920,7 +2920,9 @@ definitions: description: A list of potentially nested fields indicating the full path where value will be added or updated. type: array items: - - type: string + anyOf: + - type: string + - type: integer interpolation_context: - config - components_values @@ -2928,8 +2930,10 @@ definitions: examples: - ["data"] - ["data", "records"] - - ["data", "{{ parameters.name }}"] + - ["data", 1, "name"] + - ["data", "{{ components_values.name }}"] - ["data", "*", "record"] + - ["*", "**", "name"] value: title: Value description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime. @@ -2974,6 +2978,52 @@ definitions: - type - retriever - components_mapping + StreamConfig: + title: Stream Config + description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config. + type: object + required: + - type + - configs_pointer + properties: + type: + type: string + enum: [StreamConfig] + configs_pointer: + title: Configs Pointer + description: A list of potentially nested fields indicating the full path in source config file where streams configs located. + type: array + items: + - type: string + interpolation_context: + - parameters + examples: + - ["data"] + - ["data", "streams"] + - ["data", "{{ parameters.name }}"] + $parameters: + type: object + additionalProperties: true + ConfigComponentsResolver: + type: object + description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched from the source config. + properties: + type: + type: string + enum: [ConfigComponentsResolver] + stream_config: + "$ref": "#/definitions/StreamConfig" + components_mapping: + type: array + items: + "$ref": "#/definitions/ComponentMappingDefinition" + $parameters: + type: object + additionalProperties: true + required: + - type + - stream_config + - components_mapping DynamicDeclarativeStream: type: object description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template. @@ -2988,7 +3038,9 @@ definitions: components_resolver: title: Components Resolver description: Component resolve and populates stream templates with components values. - "$ref": "#/definitions/HttpComponentsResolver" + anyOf: + - "$ref": "#/definitions/HttpComponentsResolver" + - "$ref": "#/definitions/ConfigComponentsResolver" required: - type - stream_template diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 652da85c4..2dc8ca808 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -7,7 +7,7 @@ import pkgutil from copy import deepcopy from importlib import metadata -from typing import Any, Dict, Iterator, List, Mapping, Optional +from typing import Any, Dict, Iterator, List, Mapping, Optional, Set import yaml from jsonschema.exceptions import ValidationError @@ -313,6 +313,7 @@ def _dynamic_stream_configs( ) -> List[Dict[str, Any]]: dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) dynamic_stream_configs: List[Dict[str, Any]] = [] + seen_dynamic_streams: Set[str] = set() for dynamic_definition in dynamic_stream_definitions: components_resolver_config = dynamic_definition["components_resolver"] @@ -350,7 +351,12 @@ def _dynamic_stream_configs( if "type" not in dynamic_stream: dynamic_stream["type"] = "DeclarativeStream" - dynamic_stream_configs.append(dynamic_stream) + # Ensure that each stream is created with a unique name + name = dynamic_stream.get("name") + + if name not in seen_dynamic_streams: + seen_dynamic_streams.add(name) + dynamic_stream_configs.append(dynamic_stream) return dynamic_stream_configs diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 173e045a0..36a890f9b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -822,13 +824,13 @@ class Config: ) extract_output: List[str] = Field( ..., - description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ", + description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.", examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}], title="DeclarativeOAuth Extract Output", ) state: Optional[State] = Field( None, - description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ", + description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.", examples=[{"state": {"min": 7, "max": 128}}], title="(Optional) DeclarativeOAuth Configurable State Query Param", ) @@ -852,13 +854,13 @@ class Config: ) state_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.", examples=[{"state_key": "my_custom_state_key_key_name"}], title="(Optional) DeclarativeOAuth State Key Override", ) auth_code_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.", examples=[{"auth_code_key": "my_custom_auth_code_key_name"}], title="(Optional) DeclarativeOAuth Auth Code Key Override", ) @@ -874,24 +876,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -909,7 +915,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1160,14 +1168,16 @@ class WaitUntilTimeFromHeader(BaseModel): class ComponentMappingDefinition(BaseModel): type: Literal["ComponentMappingDefinition"] - field_path: List[str] = Field( + field_path: List[Union[str, int]] = Field( ..., description="A list of potentially nested fields indicating the full path where value will be added or updated.", examples=[ ["data"], ["data", "records"], - ["data", "{{ parameters.name }}"], + ["data", 1, "name"], + ["data", "{{ components_values.name }}"], ["data", "*", "record"], + ["*", "**", "name"], ], title="Field Path", ) @@ -1189,6 +1199,24 @@ class ComponentMappingDefinition(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class StreamConfig(BaseModel): + type: Literal["StreamConfig"] + configs_pointer: List[str] = Field( + ..., + description="A list of potentially nested fields indicating the full path in source config file where streams configs located.", + examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]], + title="Configs Pointer", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class ConfigComponentsResolver(BaseModel): + type: Literal["ConfigComponentsResolver"] + stream_config: StreamConfig + components_mapping: List[ComponentMappingDefinition] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -1600,21 +1628,25 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1832,7 +1864,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1874,7 +1910,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1904,7 +1942,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1968,10 +2010,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: HttpComponentsResolver = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index ed05b8e52..c0e14fd84 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -33,10 +33,13 @@ "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", # DynamicDeclarativeStream "DynamicDeclarativeStream.stream_template": "DeclarativeStream", - "DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver", + "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", # HttpComponentsResolver "HttpComponentsResolver.retriever": "SimpleRetriever", "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", + # ConfigComponentResolver + "ConfigComponentsResolver.stream_config": "StreamConfig", + "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", # DefaultErrorHandler "DefaultErrorHandler.response_filters": "HttpResponseFilter", # DefaultPaginator diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9a405f8f2..276e8ecc7 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -197,6 +197,12 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ConfigComponentsResolver as ConfigComponentsResolverModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + StreamConfig as StreamConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) @@ -348,6 +354,8 @@ from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, HttpComponentsResolver, + ConfigComponentsResolver, + StreamConfig, ) from airbyte_cdk.sources.declarative.retrievers import ( AsyncRetriever, @@ -479,6 +487,8 @@ def _init_mappings(self) -> None: WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, AsyncRetrieverModel: self.create_async_retriever, HttpComponentsResolverModel: self.create_http_components_resolver, + ConfigComponentsResolverModel: self.create_config_components_resolver, + StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, } @@ -1812,8 +1822,8 @@ def create_record_selector( self, model: RecordSelectorModel, config: Config, - name: str, *, + name: str, transformations: List[RecordTransformation], decoder: Optional[Decoder] = None, client_side_incremental_sync: Optional[Dict[str, Any]] = None, @@ -2292,3 +2302,41 @@ def create_http_components_resolver( components_mapping=components_mapping, parameters=model.parameters or {}, ) + + @staticmethod + def create_stream_config( + model: StreamConfigModel, config: Config, **kwargs: Any + ) -> StreamConfig: + model_configs_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.configs_pointer] if model.configs_pointer else [] + ) + + return StreamConfig( + configs_pointer=model_configs_pointer, + parameters=model.parameters or {}, + ) + + def create_config_components_resolver( + self, model: ConfigComponentsResolverModel, config: Config + ) -> Any: + stream_config = self._create_component_from_model( + model.stream_config, config=config, parameters=model.parameters or {} + ) + + components_mapping = [ + self._create_component_from_model( + model=components_mapping_definition_model, + value_type=ModelToComponentFactory._json_schema_type_name_to_type( + components_mapping_definition_model.value_type + ), + config=config, + ) + for components_mapping_definition_model in model.components_mapping + ] + + return ConfigComponentsResolver( + stream_config=stream_config, + config=config, + components_mapping=components_mapping, + parameters=model.parameters or {}, + ) diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py index 17a3b5d52..9b3e5a422 100644 --- a/airbyte_cdk/sources/declarative/resolvers/__init__.py +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -4,10 +4,13 @@ from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver +from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ConfigComponentsResolver, StreamConfig from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel +from airbyte_cdk.sources.declarative.models import ConfigComponentsResolver as ConfigComponentsResolverModel COMPONENTS_RESOLVER_TYPE_MAPPING = { - "HttpComponentsResolver": HttpComponentsResolverModel + "HttpComponentsResolver": HttpComponentsResolverModel, + "ConfigComponentsResolver": ConfigComponentsResolverModel } -__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"] +__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "StreamConfig", "ConfigComponentsResolver", "COMPONENTS_RESOLVER_TYPE_MAPPING"] diff --git a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py new file mode 100644 index 000000000..f19afb25f --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py @@ -0,0 +1,136 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from copy import deepcopy +from dataclasses import InitVar, dataclass, field +from typing import Any, Dict, Iterable, List, Mapping, Union + +import dpath +from typing_extensions import deprecated + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( + ComponentMappingDefinition, + ComponentsResolver, + ResolvedComponentMappingDefinition, +) +from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.types import Config + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class StreamConfig: + """ + Identifies stream config details for dynamic schema extraction and processing. + """ + + configs_pointer: List[Union[InterpolatedString, str]] + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.configs_pointer = [ + InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer + ] + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class ConfigComponentsResolver(ComponentsResolver): + """ + Resolves and populates stream templates with components fetched via source config. + + Attributes: + stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. + config (Config): Configuration object for the resolver. + components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. + parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. + """ + + stream_config: StreamConfig + config: Config + components_mapping: List[ComponentMappingDefinition] + parameters: InitVar[Mapping[str, Any]] + _resolved_components: List[ResolvedComponentMappingDefinition] = field( + init=False, repr=False, default_factory=list + ) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + """ + Initializes and parses component mappings, converting them to resolved definitions. + + Args: + parameters (Mapping[str, Any]): Parameters for interpolation. + """ + + for component_mapping in self.components_mapping: + if isinstance(component_mapping.value, (str, InterpolatedString)): + interpolated_value = ( + InterpolatedString.create(component_mapping.value, parameters=parameters) + if isinstance(component_mapping.value, str) + else component_mapping.value + ) + + field_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in component_mapping.field_path + ] + + self._resolved_components.append( + ResolvedComponentMappingDefinition( + field_path=field_path, + value=interpolated_value, + value_type=component_mapping.value_type, + parameters=parameters, + ) + ) + else: + raise ValueError( + f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" + ) + + @property + def _stream_config(self): + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in self.stream_config.configs_pointer + ] + stream_config = dpath.get(self.config, path, default=[]) + + if not isinstance(stream_config, list): + stream_config = [stream_config] + + return stream_config + + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Resolves components in the stream template configuration by populating values. + + Args: + stream_template_config (Dict[str, Any]): Stream template to populate. + + Yields: + Dict[str, Any]: Updated configurations with resolved components. + """ + kwargs = {"stream_template_config": stream_template_config} + + for components_values in self._stream_config: + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) + + path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + + dpath.set(updated_config, path, value) + + yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py new file mode 100644 index 000000000..8bf0026e6 --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -0,0 +1,142 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.resolvers import ( + ComponentMappingDefinition, + HttpComponentsResolver, +) +from airbyte_cdk.sources.embedded.catalog import ( + to_configured_catalog, + to_configured_stream, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_2"}, + ], +} + +_MANIFEST = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "ConfigComponentsResolver", + "stream_config": { + "type": "StreamConfig", + "configs_pointer": ["custom_streams"], + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} + + +def test_dynamic_streams_read_with_config_components_resolver(): + expected_stream_names = ["item_1", "item_2"] + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items/1"), + HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/2"), + HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), + ) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert ( + len(actual_catalog.streams) == 2 + ) # Only 2 streams were created because the 3rd item has a duplicate name. + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 2 + assert [record.stream for record in records] == expected_stream_names diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 9e9fe225a..8f1785643 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -147,13 +147,19 @@ def test_http_components_resolver( assert result == expected_result -def test_dynamic_streams_read(): +def test_dynamic_streams_read_with_http_components_resolver(): expected_stream_names = ["item_1", "item_2"] with HttpMocker() as http_mocker: http_mocker.get( HttpRequest(url="https://api.test.com/items"), HttpResponse( - body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_2"}, + ] + ) ), ) http_mocker.get( @@ -164,12 +170,16 @@ def test_dynamic_streams_read(): HttpRequest(url="https://api.test.com/items/2"), HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/3"), + HttpResponse(body=json.dumps({"id": "3", "name": "item_2"})), + ) source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None ) - actual_catalog = source.discover(logger=source.logger, config={}) + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) configured_streams = [ to_configured_stream(stream, primary_key=stream.source_defined_primary_key) @@ -179,11 +189,13 @@ def test_dynamic_streams_read(): records = [ message.record - for message in source.read(MagicMock(), {}, configured_catalog) + for message in source.read(MagicMock(), _CONFIG, configured_catalog) if message.type == Type.RECORD ] - assert len(actual_catalog.streams) == 2 + assert ( + len(actual_catalog.streams) == 2 + ) # Only 2 streams were created because the 3rd item has a duplicate name. assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 2 assert [record.stream for record in records] == expected_stream_names From 29bb2eee87162a71cf811aab98e442b65e095bcf Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 6 Dec 2024 19:42:44 +0100 Subject: [PATCH 02/12] Fix formatting --- .../parsers/model_to_component_factory.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 276e8ecc7..3207fd36c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -128,6 +128,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ConfigComponentsResolver as ConfigComponentsResolverModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) @@ -197,12 +200,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ConfigComponentsResolver as ConfigComponentsResolverModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - StreamConfig as StreamConfigModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) @@ -294,6 +291,9 @@ SimpleRetriever as SimpleRetrieverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + StreamConfig as StreamConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) @@ -353,8 +353,8 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, - HttpComponentsResolver, ConfigComponentsResolver, + HttpComponentsResolver, StreamConfig, ) from airbyte_cdk.sources.declarative.retrievers import ( From 5cd295f23278b196dbc5b5b46f6eff0959915688 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 6 Dec 2024 19:46:15 +0100 Subject: [PATCH 03/12] Fix formatting --- .../models/declarative_component_schema.py | 98 +++++++------------ 1 file changed, 37 insertions(+), 61 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 36a890f9b..98385bf65 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -876,28 +874,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -915,9 +909,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1628,25 +1620,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1864,11 +1852,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1910,9 +1894,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1942,11 +1924,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2010,12 +1988,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) From c9842526684338d6350fe7bb9bf76a60051107b8 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 6 Dec 2024 20:08:33 +0100 Subject: [PATCH 04/12] Fix mypy --- .../sources/declarative/auth/selective_authenticator.py | 2 +- .../sources/declarative/declarative_component_schema.yaml | 4 +--- .../declarative/models/declarative_component_schema.py | 2 +- airbyte_cdk/sources/declarative/resolvers/__init__.py | 4 +++- .../declarative/resolvers/config_components_resolver.py | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/selective_authenticator.py b/airbyte_cdk/sources/declarative/auth/selective_authenticator.py index bc276cb99..3a84150bf 100644 --- a/airbyte_cdk/sources/declarative/auth/selective_authenticator.py +++ b/airbyte_cdk/sources/declarative/auth/selective_authenticator.py @@ -30,7 +30,7 @@ def __new__( # type: ignore[misc] try: selected_key = str( dpath.get( - config, # type: ignore [arg-type] # Dpath wants mutable mapping but doesn't need it. + config, # type: ignore[arg-type] # Dpath wants mutable mapping but doesn't need it. authenticator_selection_path, ) ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 200372a6b..572b11655 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2920,9 +2920,7 @@ definitions: description: A list of potentially nested fields indicating the full path where value will be added or updated. type: array items: - anyOf: - - type: string - - type: integer + - type: string interpolation_context: - config - components_values diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 98385bf65..aeb8d756b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1160,7 +1160,7 @@ class WaitUntilTimeFromHeader(BaseModel): class ComponentMappingDefinition(BaseModel): type: Literal["ComponentMappingDefinition"] - field_path: List[Union[str, int]] = Field( + field_path: List[str] = Field( ..., description="A list of potentially nested fields indicating the full path where value will be added or updated.", examples=[ diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py index 9b3e5a422..6b8497fb7 100644 --- a/airbyte_cdk/sources/declarative/resolvers/__init__.py +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -7,8 +7,10 @@ from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ConfigComponentsResolver, StreamConfig from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel from airbyte_cdk.sources.declarative.models import ConfigComponentsResolver as ConfigComponentsResolverModel +from pydantic.v1 import BaseModel +from typing import Mapping -COMPONENTS_RESOLVER_TYPE_MAPPING = { +COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { "HttpComponentsResolver": HttpComponentsResolverModel, "ConfigComponentsResolver": ConfigComponentsResolverModel } diff --git a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py index f19afb25f..0308ea5da 100644 --- a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py @@ -91,12 +91,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) @property - def _stream_config(self): + def _stream_config(self) -> Iterable[Mapping[str, Any]]: path = [ node.eval(self.config) if not isinstance(node, str) else node for node in self.stream_config.configs_pointer ] - stream_config = dpath.get(self.config, path, default=[]) + stream_config = dpath.get(dict(self.config), path, default=[]) if not isinstance(stream_config, list): stream_config = [stream_config] From 5cc9f5f1ae19df764d682fc3c23a502ea5531371 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 15:25:37 +0100 Subject: [PATCH 05/12] Updated description for ConfigComponentsResolver~ --- .../declarative_component_schema.yaml | 2 +- .../models/declarative_component_schema.py | 98 ++++++++++++------- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 572b11655..579db6437 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3004,7 +3004,7 @@ definitions: additionalProperties: true ConfigComponentsResolver: type: object - description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched from the source config. + description: (This component is experimental. Use at your own risk.) Resolves and populates stream templates with components fetched from the source config. properties: type: type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index aeb8d756b..c09575c40 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -874,24 +876,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -909,7 +915,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1620,21 +1628,25 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1852,7 +1864,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1894,7 +1910,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1924,7 +1942,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1988,10 +2010,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) From 182552bd9083efc96ad7dabc8793278421c2358c Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 15:52:39 +0100 Subject: [PATCH 06/12] Updated stream uniqness handling behavior --- .../manifest_declarative_source.py | 16 +++- .../test_config_components_resolver.py | 93 +++++++++++-------- 2 files changed, 69 insertions(+), 40 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 2dc8ca808..203d377e1 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -20,6 +20,7 @@ AirbyteStateMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, + FailureType, ) from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource @@ -48,6 +49,7 @@ DebugSliceLogger, SliceLogger, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class ManifestDeclarativeSource(DeclarativeSource): @@ -354,9 +356,17 @@ def _dynamic_stream_configs( # Ensure that each stream is created with a unique name name = dynamic_stream.get("name") - if name not in seen_dynamic_streams: - seen_dynamic_streams.add(name) - dynamic_stream_configs.append(dynamic_stream) + if name in seen_dynamic_streams: + error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." + + raise AirbyteTracedException( + message=error_message, + internal_message=error_message, + failure_type=FailureType.config_error, + ) + + seen_dynamic_streams.add(name) + dynamic_stream_configs.append(dynamic_stream) return dynamic_stream_configs diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py index 8bf0026e6..debe7a423 100644 --- a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -11,18 +11,22 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString -from airbyte_cdk.sources.declarative.resolvers import ( - ComponentMappingDefinition, - HttpComponentsResolver, -) from airbyte_cdk.sources.embedded.catalog import ( to_configured_catalog, to_configured_stream, ) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ], +} + +_CONFIG_WITH_STREAM_DUPLICATION = { "start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ {"id": 1, "name": "item_1"}, @@ -103,40 +107,55 @@ } -def test_dynamic_streams_read_with_config_components_resolver(): - expected_stream_names = ["item_1", "item_2"] - - source = ConcurrentDeclarativeSource( - source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None - ) +@pytest.mark.parametrize( + "config, expected_exception, expected_stream_names", + [ + (_CONFIG, None, ["item_1", "item_2"]), + ( + _CONFIG_WITH_STREAM_DUPLICATION, + "Dynamic streams list contains a duplicate name: item_2. Please check your configuration.", + None + ), + ], +) +def test_dynamic_streams_read_with_config_components_resolver(config, expected_exception, expected_stream_names): + if expected_exception: + with pytest.raises(AirbyteTracedException) as exc_info: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=config, catalog=None, state=None + ) + source.discover(logger=source.logger, config=config) + assert str(exc_info.value) == expected_exception + else: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=config, catalog=None, state=None + ) - actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + actual_catalog = source.discover(logger=source.logger, config=config) - configured_streams = [ - to_configured_stream(stream, primary_key=stream.source_defined_primary_key) - for stream in actual_catalog.streams - ] - configured_catalog = to_configured_catalog(configured_streams) + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) - with HttpMocker() as http_mocker: - http_mocker.get( - HttpRequest(url="https://api.test.com/items/1"), - HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), - ) - http_mocker.get( - HttpRequest(url="https://api.test.com/items/2"), - HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), - ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items/1"), + HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/2"), + HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), + ) - records = [ - message.record - for message in source.read(MagicMock(), _CONFIG, configured_catalog) - if message.type == Type.RECORD - ] + records = [ + message.record + for message in source.read(MagicMock(), config, configured_catalog) + if message.type == Type.RECORD + ] - assert ( - len(actual_catalog.streams) == 2 - ) # Only 2 streams were created because the 3rd item has a duplicate name. - assert [stream.name for stream in actual_catalog.streams] == expected_stream_names - assert len(records) == 2 - assert [record.stream for record in records] == expected_stream_names + assert len(actual_catalog.streams) == len(expected_stream_names) + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == len(expected_stream_names) + assert [record.stream for record in records] == expected_stream_names From 95370b256a603bf17796d4274a0110bc91841a98 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 16:14:27 +0100 Subject: [PATCH 07/12] Removed duplicated name from test_dynamic_streams_read_with_http_components_resolver --- .../resolvers/test_http_components_resolver.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 8f1785643..06f070147 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -157,7 +157,6 @@ def test_dynamic_streams_read_with_http_components_resolver(): [ {"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}, - {"id": 3, "name": "item_2"}, ] ) ), @@ -170,10 +169,6 @@ def test_dynamic_streams_read_with_http_components_resolver(): HttpRequest(url="https://api.test.com/items/2"), HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), ) - http_mocker.get( - HttpRequest(url="https://api.test.com/items/3"), - HttpResponse(body=json.dumps({"id": "3", "name": "item_2"})), - ) source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None @@ -193,9 +188,7 @@ def test_dynamic_streams_read_with_http_components_resolver(): if message.type == Type.RECORD ] - assert ( - len(actual_catalog.streams) == 2 - ) # Only 2 streams were created because the 3rd item has a duplicate name. + assert len(actual_catalog.streams) == 2 assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 2 assert [record.stream for record in records] == expected_stream_names From e02ee97fde1b4977c786fd29b61b33203ef45b11 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 16:16:09 +0100 Subject: [PATCH 08/12] Fix with linter --- .../models/declarative_component_schema.py | 98 +++++++------------ .../test_config_components_resolver.py | 6 +- 2 files changed, 41 insertions(+), 63 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c09575c40..aeb8d756b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -876,28 +874,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -915,9 +909,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1628,25 +1620,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1864,11 +1852,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1910,9 +1894,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1942,11 +1924,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2010,12 +1988,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py index debe7a423..75a61609b 100644 --- a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -114,11 +114,13 @@ ( _CONFIG_WITH_STREAM_DUPLICATION, "Dynamic streams list contains a duplicate name: item_2. Please check your configuration.", - None + None, ), ], ) -def test_dynamic_streams_read_with_config_components_resolver(config, expected_exception, expected_stream_names): +def test_dynamic_streams_read_with_config_components_resolver( + config, expected_exception, expected_stream_names +): if expected_exception: with pytest.raises(AirbyteTracedException) as exc_info: source = ConcurrentDeclarativeSource( From 079d32a8176992c63274d8234483de5f2f6a0f0c Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 16:49:51 +0100 Subject: [PATCH 09/12] Added condition for failure_type --- .../manifest_declarative_source.py | 9 +++++-- .../test_http_components_resolver.py | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 203d377e1..b2f02c7eb 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -357,12 +357,17 @@ def _dynamic_stream_configs( name = dynamic_stream.get("name") if name in seen_dynamic_streams: - error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." + error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." + failure_type = FailureType.system_error + + if resolver_type == "ConfigComponentsResolverModel": + error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." + failure_type = FailureType.config_error raise AirbyteTracedException( message=error_message, internal_message=error_message, - failure_type=FailureType.config_error, + failure_type=failure_type, ) seen_dynamic_streams.add(name) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 06f070147..34ad47087 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -21,6 +21,7 @@ to_configured_stream, ) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} @@ -192,3 +193,29 @@ def test_dynamic_streams_read_with_http_components_resolver(): assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 2 assert [record.stream for record in records] == expected_stream_names + + +def test_duplicated_dynamic_streams_read_with_http_components_resolver(): + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_2"}, + ] + ) + ), + ) + + with pytest.raises(AirbyteTracedException) as exc_info: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + source.discover(logger=source.logger, config=_CONFIG) + assert ( + str(exc_info.value) + == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." + ) From 4dd1a0af7304532d99cc40c0db74d44d4cd2a811 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 16:58:58 +0100 Subject: [PATCH 10/12] Fix typo --- airbyte_cdk/sources/declarative/manifest_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index b2f02c7eb..83c5fa5f3 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -360,7 +360,7 @@ def _dynamic_stream_configs( error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." failure_type = FailureType.system_error - if resolver_type == "ConfigComponentsResolverModel": + if resolver_type == "ConfigComponentsResolver": error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." failure_type = FailureType.config_error From a0f4761806171fbe69161446ed1fe0c4a2186bad Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 17:31:06 +0100 Subject: [PATCH 11/12] Re-run CI From ba7b95e71b235ddcf813df67167ee2b09f456555 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 12 Dec 2024 18:16:36 +0100 Subject: [PATCH 12/12] Fix test --- .../test_http_components_resolver.py | 90 ++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 34ad47087..1bbf2a412 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -111,6 +111,92 @@ ], } +_MANIFEST_WITH_DUPLICATES = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "duplicates_items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} + @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", @@ -198,7 +284,7 @@ def test_dynamic_streams_read_with_http_components_resolver(): def test_duplicated_dynamic_streams_read_with_http_components_resolver(): with HttpMocker() as http_mocker: http_mocker.get( - HttpRequest(url="https://api.test.com/items"), + HttpRequest(url="https://api.test.com/duplicates_items"), HttpResponse( body=json.dumps( [ @@ -212,7 +298,7 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver(): with pytest.raises(AirbyteTracedException) as exc_info: source = ConcurrentDeclarativeSource( - source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + source_config=_MANIFEST_WITH_DUPLICATES, config=_CONFIG, catalog=None, state=None ) source.discover(logger=source.logger, config=_CONFIG) assert (