From ceebfda1a92526feeb7083f3eabf51f05ff336ef Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Thu, 12 Dec 2024 18:58:46 +0100 Subject: [PATCH] feat(low-code cdk): add config component resolver (#149) --- .../auth/selective_authenticator.py | 2 +- .../declarative_component_schema.yaml | 54 +++++- .../manifest_declarative_source.py | 23 ++- .../models/declarative_component_schema.py | 24 ++- .../parsers/manifest_component_transformer.py | 5 +- .../parsers/model_to_component_factory.py | 50 +++++- .../sources/declarative/resolvers/__init__.py | 11 +- .../resolvers/config_components_resolver.py | 136 +++++++++++++++ .../test_config_components_resolver.py | 163 ++++++++++++++++++ .../test_http_components_resolver.py | 126 +++++++++++++- 10 files changed, 579 insertions(+), 15 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py create mode 100644 unit_tests/sources/declarative/resolvers/test_config_components_resolver.py diff --git a/airbyte_cdk/sources/declarative/auth/selective_authenticator.py b/airbyte_cdk/sources/declarative/auth/selective_authenticator.py index bc276cb9..3a84150b 100644 --- a/airbyte_cdk/sources/declarative/auth/selective_authenticator.py +++ b/airbyte_cdk/sources/declarative/auth/selective_authenticator.py @@ -30,7 +30,7 @@ def __new__( # type: ignore[misc] try: selected_key = str( dpath.get( - config, # type: ignore [arg-type] # Dpath wants mutable mapping but doesn't need it. + config, # type: ignore[arg-type] # Dpath wants mutable mapping but doesn't need it. authenticator_selection_path, ) ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ab8bfa0d..2d2a463b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3015,8 +3015,10 @@ definitions: examples: - ["data"] - ["data", "records"] - - ["data", "{{ parameters.name }}"] + - ["data", 1, "name"] + - ["data", "{{ components_values.name }}"] - ["data", "*", "record"] + - ["*", "**", "name"] value: title: Value description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime. @@ -3061,6 +3063,52 @@ definitions: - type - retriever - components_mapping + StreamConfig: + title: Stream Config + description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config. + type: object + required: + - type + - configs_pointer + properties: + type: + type: string + enum: [StreamConfig] + configs_pointer: + title: Configs Pointer + description: A list of potentially nested fields indicating the full path in source config file where streams configs located. + type: array + items: + - type: string + interpolation_context: + - parameters + examples: + - ["data"] + - ["data", "streams"] + - ["data", "{{ parameters.name }}"] + $parameters: + type: object + additionalProperties: true + ConfigComponentsResolver: + type: object + description: (This component is experimental. Use at your own risk.) Resolves and populates stream templates with components fetched from the source config. + properties: + type: + type: string + enum: [ConfigComponentsResolver] + stream_config: + "$ref": "#/definitions/StreamConfig" + components_mapping: + type: array + items: + "$ref": "#/definitions/ComponentMappingDefinition" + $parameters: + type: object + additionalProperties: true + required: + - type + - stream_config + - components_mapping DynamicDeclarativeStream: type: object description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template. @@ -3075,7 +3123,9 @@ definitions: components_resolver: title: Components Resolver description: Component resolve and populates stream templates with components values. - "$ref": "#/definitions/HttpComponentsResolver" + anyOf: + - "$ref": "#/definitions/HttpComponentsResolver" + - "$ref": "#/definitions/ConfigComponentsResolver" required: - type - stream_template diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 652da85c..83c5fa5f 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -7,7 +7,7 @@ import pkgutil from copy import deepcopy from importlib import metadata -from typing import Any, Dict, Iterator, List, Mapping, Optional +from typing import Any, Dict, Iterator, List, Mapping, Optional, Set import yaml from jsonschema.exceptions import ValidationError @@ -20,6 +20,7 @@ AirbyteStateMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, + FailureType, ) from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource @@ -48,6 +49,7 @@ DebugSliceLogger, SliceLogger, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException class ManifestDeclarativeSource(DeclarativeSource): @@ -313,6 +315,7 @@ def _dynamic_stream_configs( ) -> List[Dict[str, Any]]: dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) dynamic_stream_configs: List[Dict[str, Any]] = [] + seen_dynamic_streams: Set[str] = set() for dynamic_definition in dynamic_stream_definitions: components_resolver_config = dynamic_definition["components_resolver"] @@ -350,6 +353,24 @@ def _dynamic_stream_configs( if "type" not in dynamic_stream: dynamic_stream["type"] = "DeclarativeStream" + # Ensure that each stream is created with a unique name + name = dynamic_stream.get("name") + + if name in seen_dynamic_streams: + error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." + failure_type = FailureType.system_error + + if resolver_type == "ConfigComponentsResolver": + error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." + failure_type = FailureType.config_error + + raise AirbyteTracedException( + message=error_message, + internal_message=error_message, + failure_type=failure_type, + ) + + seen_dynamic_streams.add(name) dynamic_stream_configs.append(dynamic_stream) return dynamic_stream_configs diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8f940fe5..a8dbe61b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1192,8 +1192,10 @@ class ComponentMappingDefinition(BaseModel): examples=[ ["data"], ["data", "records"], - ["data", "{{ parameters.name }}"], + ["data", 1, "name"], + ["data", "{{ components_values.name }}"], ["data", "*", "record"], + ["*", "**", "name"], ], title="Field Path", ) @@ -1215,6 +1217,24 @@ class ComponentMappingDefinition(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class StreamConfig(BaseModel): + type: Literal["StreamConfig"] + configs_pointer: List[str] = Field( + ..., + description="A list of potentially nested fields indicating the full path in source config file where streams configs located.", + examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]], + title="Configs Pointer", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class ConfigComponentsResolver(BaseModel): + type: Literal["ConfigComponentsResolver"] + stream_config: StreamConfig + components_mapping: List[ComponentMappingDefinition] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -2010,7 +2030,7 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: HttpComponentsResolver = Field( + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( ..., description="Component resolve and populates stream templates with components values.", title="Components Resolver", diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 473e78fc..f2719bb1 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -33,10 +33,13 @@ "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", # DynamicDeclarativeStream "DynamicDeclarativeStream.stream_template": "DeclarativeStream", - "DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver", + "DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver", # HttpComponentsResolver "HttpComponentsResolver.retriever": "SimpleRetriever", "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", + # ConfigComponentResolver + "ConfigComponentsResolver.stream_config": "StreamConfig", + "ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition", # DefaultErrorHandler "DefaultErrorHandler.response_filters": "HttpResponseFilter", # DefaultPaginator diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6594f33a..215d6fff 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -128,6 +128,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ConfigComponentsResolver as ConfigComponentsResolverModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) @@ -294,6 +297,9 @@ SimpleRetriever as SimpleRetrieverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + StreamConfig as StreamConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) @@ -356,7 +362,9 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, + ConfigComponentsResolver, HttpComponentsResolver, + StreamConfig, ) from airbyte_cdk.sources.declarative.retrievers import ( AsyncRetriever, @@ -494,6 +502,8 @@ def _init_mappings(self) -> None: WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, AsyncRetrieverModel: self.create_async_retriever, HttpComponentsResolverModel: self.create_http_components_resolver, + ConfigComponentsResolverModel: self.create_config_components_resolver, + StreamConfigModel: self.create_stream_config, ComponentMappingDefinitionModel: self.create_components_mapping_definition, } @@ -1884,8 +1894,8 @@ def create_record_selector( self, model: RecordSelectorModel, config: Config, - name: str, *, + name: str, transformations: List[RecordTransformation], decoder: Optional[Decoder] = None, client_side_incremental_sync: Optional[Dict[str, Any]] = None, @@ -2364,3 +2374,41 @@ def create_http_components_resolver( components_mapping=components_mapping, parameters=model.parameters or {}, ) + + @staticmethod + def create_stream_config( + model: StreamConfigModel, config: Config, **kwargs: Any + ) -> StreamConfig: + model_configs_pointer: List[Union[InterpolatedString, str]] = ( + [x for x in model.configs_pointer] if model.configs_pointer else [] + ) + + return StreamConfig( + configs_pointer=model_configs_pointer, + parameters=model.parameters or {}, + ) + + def create_config_components_resolver( + self, model: ConfigComponentsResolverModel, config: Config + ) -> Any: + stream_config = self._create_component_from_model( + model.stream_config, config=config, parameters=model.parameters or {} + ) + + components_mapping = [ + self._create_component_from_model( + model=components_mapping_definition_model, + value_type=ModelToComponentFactory._json_schema_type_name_to_type( + components_mapping_definition_model.value_type + ), + config=config, + ) + for components_mapping_definition_model in model.components_mapping + ] + + return ConfigComponentsResolver( + stream_config=stream_config, + config=config, + components_mapping=components_mapping, + parameters=model.parameters or {}, + ) diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py index 17a3b5d5..6b8497fb 100644 --- a/airbyte_cdk/sources/declarative/resolvers/__init__.py +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -4,10 +4,15 @@ from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver +from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ConfigComponentsResolver, StreamConfig from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel +from airbyte_cdk.sources.declarative.models import ConfigComponentsResolver as ConfigComponentsResolverModel +from pydantic.v1 import BaseModel +from typing import Mapping -COMPONENTS_RESOLVER_TYPE_MAPPING = { - "HttpComponentsResolver": HttpComponentsResolverModel +COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { + "HttpComponentsResolver": HttpComponentsResolverModel, + "ConfigComponentsResolver": ConfigComponentsResolverModel } -__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"] +__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "StreamConfig", "ConfigComponentsResolver", "COMPONENTS_RESOLVER_TYPE_MAPPING"] diff --git a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py new file mode 100644 index 00000000..0308ea5d --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py @@ -0,0 +1,136 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from copy import deepcopy +from dataclasses import InitVar, dataclass, field +from typing import Any, Dict, Iterable, List, Mapping, Union + +import dpath +from typing_extensions import deprecated + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( + ComponentMappingDefinition, + ComponentsResolver, + ResolvedComponentMappingDefinition, +) +from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.types import Config + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class StreamConfig: + """ + Identifies stream config details for dynamic schema extraction and processing. + """ + + configs_pointer: List[Union[InterpolatedString, str]] + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self.configs_pointer = [ + InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer + ] + + +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) +@dataclass +class ConfigComponentsResolver(ComponentsResolver): + """ + Resolves and populates stream templates with components fetched via source config. + + Attributes: + stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. + config (Config): Configuration object for the resolver. + components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. + parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. + """ + + stream_config: StreamConfig + config: Config + components_mapping: List[ComponentMappingDefinition] + parameters: InitVar[Mapping[str, Any]] + _resolved_components: List[ResolvedComponentMappingDefinition] = field( + init=False, repr=False, default_factory=list + ) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + """ + Initializes and parses component mappings, converting them to resolved definitions. + + Args: + parameters (Mapping[str, Any]): Parameters for interpolation. + """ + + for component_mapping in self.components_mapping: + if isinstance(component_mapping.value, (str, InterpolatedString)): + interpolated_value = ( + InterpolatedString.create(component_mapping.value, parameters=parameters) + if isinstance(component_mapping.value, str) + else component_mapping.value + ) + + field_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in component_mapping.field_path + ] + + self._resolved_components.append( + ResolvedComponentMappingDefinition( + field_path=field_path, + value=interpolated_value, + value_type=component_mapping.value_type, + parameters=parameters, + ) + ) + else: + raise ValueError( + f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" + ) + + @property + def _stream_config(self) -> Iterable[Mapping[str, Any]]: + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in self.stream_config.configs_pointer + ] + stream_config = dpath.get(dict(self.config), path, default=[]) + + if not isinstance(stream_config, list): + stream_config = [stream_config] + + return stream_config + + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Resolves components in the stream template configuration by populating values. + + Args: + stream_template_config (Dict[str, Any]): Stream template to populate. + + Yields: + Dict[str, Any]: Updated configurations with resolved components. + """ + kwargs = {"stream_template_config": stream_template_config} + + for components_values in self._stream_config: + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) + + path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + + dpath.set(updated_config, path, value) + + yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py new file mode 100644 index 00000000..75a61609 --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/test_config_components_resolver.py @@ -0,0 +1,163 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.sources.embedded.catalog import ( + to_configured_catalog, + to_configured_stream, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ], +} + +_CONFIG_WITH_STREAM_DUPLICATION = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_2"}, + ], +} + +_MANIFEST = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "ConfigComponentsResolver", + "stream_config": { + "type": "StreamConfig", + "configs_pointer": ["custom_streams"], + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} + + +@pytest.mark.parametrize( + "config, expected_exception, expected_stream_names", + [ + (_CONFIG, None, ["item_1", "item_2"]), + ( + _CONFIG_WITH_STREAM_DUPLICATION, + "Dynamic streams list contains a duplicate name: item_2. Please check your configuration.", + None, + ), + ], +) +def test_dynamic_streams_read_with_config_components_resolver( + config, expected_exception, expected_stream_names +): + if expected_exception: + with pytest.raises(AirbyteTracedException) as exc_info: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=config, catalog=None, state=None + ) + source.discover(logger=source.logger, config=config) + assert str(exc_info.value) == expected_exception + else: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=config, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=config) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items/1"), + HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/2"), + HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), + ) + + records = [ + message.record + for message in source.read(MagicMock(), config, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == len(expected_stream_names) + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == len(expected_stream_names) + assert [record.stream for record in records] == expected_stream_names diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 9e9fe225..1bbf2a41 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -21,6 +21,7 @@ to_configured_stream, ) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} @@ -110,6 +111,92 @@ ], } +_MANIFEST_WITH_DUPLICATES = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "duplicates_items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} + @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", @@ -147,13 +234,18 @@ def test_http_components_resolver( assert result == expected_result -def test_dynamic_streams_read(): +def test_dynamic_streams_read_with_http_components_resolver(): expected_stream_names = ["item_1", "item_2"] with HttpMocker() as http_mocker: http_mocker.get( HttpRequest(url="https://api.test.com/items"), HttpResponse( - body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ] + ) ), ) http_mocker.get( @@ -169,7 +261,7 @@ def test_dynamic_streams_read(): source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None ) - actual_catalog = source.discover(logger=source.logger, config={}) + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) configured_streams = [ to_configured_stream(stream, primary_key=stream.source_defined_primary_key) @@ -179,7 +271,7 @@ def test_dynamic_streams_read(): records = [ message.record - for message in source.read(MagicMock(), {}, configured_catalog) + for message in source.read(MagicMock(), _CONFIG, configured_catalog) if message.type == Type.RECORD ] @@ -187,3 +279,29 @@ def test_dynamic_streams_read(): assert [stream.name for stream in actual_catalog.streams] == expected_stream_names assert len(records) == 2 assert [record.stream for record in records] == expected_stream_names + + +def test_duplicated_dynamic_streams_read_with_http_components_resolver(): + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/duplicates_items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_2"}, + ] + ) + ), + ) + + with pytest.raises(AirbyteTracedException) as exc_info: + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST_WITH_DUPLICATES, config=_CONFIG, catalog=None, state=None + ) + source.discover(logger=source.logger, config=_CONFIG) + assert ( + str(exc_info.value) + == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." + )