From 884897e9d3c1324e55e62587805904c386582277 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Mon, 6 Jan 2025 14:44:38 -0800 Subject: [PATCH 1/4] fix(low-code cdk pagination): Fix the offset strategy so that it resets back to 0 when a stream is an incremental data feed (#202) --- .../paginators/strategies/offset_increment.py | 4 +- .../paginators/strategies/stop_condition.py | 5 ++- .../paginators/test_default_paginator.py | 39 +++++++++++++++++++ .../paginators/test_offset_increment.py | 5 +-- .../paginators/test_stop_condition.py | 2 +- 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index 37ba3bbfa..7b17c81b4 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -83,7 +83,9 @@ def next_page_token( return self._offset def reset(self, reset_value: Optional[Any] = 0) -> None: - if not isinstance(reset_value, int): + if reset_value is None: + self._offset = 0 + elif not isinstance(reset_value, int): raise ValueError( f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer" ) diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py index 7722c5e73..a3c977f18 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py @@ -53,7 +53,10 @@ def next_page_token( return self._delegate.next_page_token(response, last_page_size, last_record) def reset(self, reset_value: Optional[Any] = None) -> None: - self._delegate.reset(reset_value) + if reset_value: + self._delegate.reset(reset_value) + else: + self._delegate.reset() def get_page_size(self) -> Optional[int]: return self._delegate.get_page_size() diff --git a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index 1cd34c42f..fcf631f7f 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -9,6 +9,7 @@ import requests from airbyte_cdk.sources.declarative.decoders import JsonDecoder, XmlDecoder +from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( DefaultPaginator, @@ -22,6 +23,10 @@ from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import ( OffsetIncrement, ) +from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import ( + CursorStopCondition, + StopConditionPaginationStrategyDecorator, +) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath @@ -356,6 +361,40 @@ def test_reset(inject_on_first_request): assert request_parameters_for_second_request != request_parameters_after_reset +def test_data_feed_paginator_with_stop_page_condition(): + config = {} + + cursor = DatetimeBasedCursor( + cursor_field="updated_at", + datetime_format="%Y-%m-%d", + start_datetime="2024-01-01", + config=config, + parameters={}, + ) + + wrapped_strategy = StopConditionPaginationStrategyDecorator( + _delegate=OffsetIncrement( + config={}, page_size=2, inject_on_first_request=False, parameters={} + ), + stop_condition=CursorStopCondition(cursor=cursor), + ) + + paginator = DefaultPaginator( + pagination_strategy=wrapped_strategy, + config=config, + url_base="https://airbyte.io", + parameters={}, + page_size_option=RequestOption( + inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={} + ), + page_token_option=RequestOption( + inject_into=RequestOptionType.request_parameter, field_name="offset", parameters={} + ), + ) + + paginator.reset() + + def test_initial_token_with_offset_pagination(): page_size_request_option = RequestOption( inject_into=RequestOptionType.request_parameter, field_name="limit", parameters={} diff --git a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py index d443132ed..692b80272 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -104,8 +104,5 @@ def test_offset_increment_reset(reset_value, expected_initial_token, expected_er with pytest.raises(expected_error): paginator_strategy.reset(reset_value=reset_value) else: - if reset_value is None: - paginator_strategy.reset() - else: - paginator_strategy.reset(reset_value=reset_value) + paginator_strategy.reset(reset_value=reset_value) assert paginator_strategy.initial_token == expected_initial_token diff --git a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py index ea1d38e24..88da84589 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py @@ -108,7 +108,7 @@ def test_when_reset_then_delegate(mocked_pagination_strategy, mocked_stop_condit mocked_pagination_strategy, mocked_stop_condition ) decorator.reset() - mocked_pagination_strategy.reset.assert_called_once_with(None) + mocked_pagination_strategy.reset.assert_called_once_with() def test_when_get_page_size_then_delegate(mocked_pagination_strategy, mocked_stop_condition): From 3ee710d23d51ac065a8da35609775c7672167832 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Tue, 7 Jan 2025 19:48:26 +0100 Subject: [PATCH 2/4] feat: add `min` macros (#203) Signed-off-by: Artem Inzhyyants Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../declarative/interpolation/macros.py | 21 +++++++++++++++++++ .../declarative/interpolation/test_jinja.py | 3 +++ .../declarative/interpolation/test_macros.py | 1 + 3 files changed, 25 insertions(+) diff --git a/airbyte_cdk/sources/declarative/interpolation/macros.py b/airbyte_cdk/sources/declarative/interpolation/macros.py index e786f0116..1ca5b31f0 100644 --- a/airbyte_cdk/sources/declarative/interpolation/macros.py +++ b/airbyte_cdk/sources/declarative/interpolation/macros.py @@ -94,6 +94,26 @@ def max(*args: typing.Any) -> typing.Any: return builtins.max(*args) +def min(*args: typing.Any) -> typing.Any: + """ + Returns smallest object of an iterable, or two or more arguments. + + min(iterable, *[, default=obj, key=func]) -> value + min(arg1, arg2, *args, *[, key=func]) -> value + + Usage: + `"{{ min(2,3) }}" + + With a single iterable argument, return its smallest item. The + default keyword-only argument specifies an object to return if + the provided iterable is empty. + With two or more arguments, return the smallest argument. + :param args: args to compare + :return: smallest argument + """ + return builtins.min(*args) + + def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str: """ Returns datetime of now() + num_days @@ -147,6 +167,7 @@ def format_datetime( today_utc, timestamp, max, + min, day_delta, duration, format_datetime, diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index 1126520f9..a99236324 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -184,6 +184,7 @@ def test_to_string(test_name, input_value, expected_output): id="test_timestamp_from_rfc3339", ), pytest.param("{{ max(1,2) }}", 2, id="test_max"), + pytest.param("{{ min(1,2) }}", 1, id="test_min"), ], ) def test_macros(s, expected_value): @@ -291,6 +292,8 @@ def test_undeclared_variables(template_string, expected_error, expected_value): ), pytest.param("{{ max(2, 3) }}", 3, id="test_max_with_arguments"), pytest.param("{{ max([2, 3]) }}", 3, id="test_max_with_list"), + pytest.param("{{ min(2, 3) }}", 2, id="test_min_with_arguments"), + pytest.param("{{ min([2, 3]) }}", 2, id="test_min_with_list"), pytest.param("{{ day_delta(1) }}", "2021-09-02T00:00:00.000000+0000", id="test_day_delta"), pytest.param( "{{ day_delta(-1) }}", "2021-08-31T00:00:00.000000+0000", id="test_day_delta_negative" diff --git a/unit_tests/sources/declarative/interpolation/test_macros.py b/unit_tests/sources/declarative/interpolation/test_macros.py index 4aa2a0c08..3fcad5d15 100644 --- a/unit_tests/sources/declarative/interpolation/test_macros.py +++ b/unit_tests/sources/declarative/interpolation/test_macros.py @@ -15,6 +15,7 @@ ("test_now_utc", "now_utc", True), ("test_today_utc", "today_utc", True), ("test_max", "max", True), + ("test_min", "min", True), ("test_day_delta", "day_delta", True), ("test_format_datetime", "format_datetime", True), ("test_duration", "duration", True), From 3344441fb8f53d976539f0af1d8eef0729e83882 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:12:04 +0200 Subject: [PATCH 3/4] feat(low-code): added keys replace transformation (#183) Co-authored-by: octavia-squidington-iii --- .../declarative_component_schema.yaml | 45 +++++++ .../models/declarative_component_schema.py | 19 +++ .../parsers/model_to_component_factory.py | 14 +++ .../keys_replace_transformation.py | 61 ++++++++++ .../test_keys_replace_transformation.py | 112 ++++++++++++++++++ 5 files changed, 251 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py create mode 100644 unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 89c731075..a9d4f2558 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1241,6 +1241,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1785,6 +1786,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" $parameters: @@ -1883,6 +1885,49 @@ definitions: $parameters: type: object additionalProperties: true + KeysReplace: + title: Keys Replace + description: A transformation that replaces symbols in keys. + type: object + required: + - type + - old + - new + properties: + type: + type: string + enum: [KeysReplace] + old: + type: string + title: Old value + description: Old value to replace. + examples: + - " " + - "{{ record.id }}" + - "{{ config['id'] }}" + - "{{ stream_slice['id'] }}" + interpolation_context: + - config + - record + - stream_state + - stream_slice + new: + type: string + title: New value + description: New value to set. + examples: + - "_" + - "{{ record.id }}" + - "{{ config['id'] }}" + - "{{ stream_slice['id'] }}" + interpolation_context: + - config + - record + - stream_state + - stream_slice + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5823b34c1..6b70b3fdd 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class KeysReplace(BaseModel): + type: Literal["KeysReplace"] + old: str = Field( + ..., + description="Old value to replace.", + examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"], + title="Old value", + ) + new: str = Field( + ..., + description="New value to set.", + examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"], + title="New value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class FlattenFields(BaseModel): type: Literal["FlattenFields"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1701,6 +1718,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( @@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + KeysReplace, ] ] ] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 694cb1042..7cb04c2a6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -254,6 +254,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtPayload as JwtPayloadModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysReplace as KeysReplaceModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToLower as KeysToLowerModel, ) @@ -417,6 +420,9 @@ from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) @@ -509,6 +515,7 @@ def _init_mappings(self) -> None: GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, + KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -630,6 +637,13 @@ def create_keys_to_snake_transformation( ) -> KeysToSnakeCaseTransformation: return KeysToSnakeCaseTransformation() + def create_keys_replace_transformation( + self, model: KeysReplaceModel, config: Config, **kwargs: Any + ) -> KeysReplaceTransformation: + return KeysReplaceTransformation( + old=model.old, new=model.new, parameters=model.parameters or {} + ) + def create_flatten_fields( self, model: FlattenFieldsModel, config: Config, **kwargs: Any ) -> FlattenFields: diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py new file mode 100644 index 000000000..8fe0bbffb --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Dict, Mapping, Optional + +from airbyte_cdk import InterpolatedString +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class KeysReplaceTransformation(RecordTransformation): + """ + Transformation that applies keys names replacement. + + Example usage: + - type: KeysReplace + old: " " + new: "_" + Result: + from: {"created time": ..., "customer id": ..., "user id": ...} + to: {"created_time": ..., "customer_id": ..., "user_id": ...} + """ + + old: str + new: str + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._old = InterpolatedString.create(self.old, parameters=parameters) + self._new = InterpolatedString.create(self.new, parameters=parameters) + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + if config is None: + config = {} + + kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} + old_key = str(self._old.eval(config, **kwargs)) + new_key = str(self._new.eval(config, **kwargs)) + + def _transform(data: Dict[str, Any]) -> Dict[str, Any]: + result = {} + for key, value in data.items(): + updated_key = key.replace(old_key, new_key) + if isinstance(value, dict): + result[updated_key] = _transform(value) + else: + result[updated_key] = value + return result + + transformed_record = _transform(record) + record.clear() + record.update(transformed_record) diff --git a/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py new file mode 100644 index 000000000..417d992af --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_keys_replace_transformation.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# +import pytest + +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +_ANY_VALUE = -1 + + +@pytest.mark.parametrize( + [ + "input_record", + "config", + "stream_state", + "stream_slice", + "keys_replace_config", + "expected_record", + ], + [ + pytest.param( + {"date time": _ANY_VALUE, "customer id": _ANY_VALUE}, + {}, + {}, + {}, + {"old": " ", "new": "_"}, + {"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE}, + id="simple keys replace config", + ), + pytest.param( + { + "customer_id": 111111, + "customer_name": "MainCustomer", + "field_1_111111": _ANY_VALUE, + "field_2_111111": _ANY_VALUE, + }, + {}, + {}, + {}, + {"old": '{{ record["customer_id"] }}', "new": '{{ record["customer_name"] }}'}, + { + "customer_id": 111111, + "customer_name": "MainCustomer", + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from record", + ), + pytest.param( + {"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE}, + {}, + {}, + {"customer_name": "MainCustomer"}, + {"old": '{{ record["customer_id"] }}', "new": '{{ stream_slice["customer_name"] }}'}, + { + "customer_id": 111111, + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from slice", + ), + pytest.param( + {"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE}, + {"customer_name": "MainCustomer"}, + {}, + {}, + {"old": '{{ record["customer_id"] }}', "new": '{{ config["customer_name"] }}'}, + { + "customer_id": 111111, + "field_1_MainCustomer": _ANY_VALUE, + "field_2_MainCustomer": _ANY_VALUE, + }, + id="keys replace config uses values from config", + ), + pytest.param( + { + "date time": _ANY_VALUE, + "user id": _ANY_VALUE, + "customer": { + "customer name": _ANY_VALUE, + "customer id": _ANY_VALUE, + "contact info": {"email": _ANY_VALUE, "phone number": _ANY_VALUE}, + }, + }, + {}, + {}, + {}, + {"old": " ", "new": "_"}, + { + "customer": { + "contact_info": {"email": _ANY_VALUE, "phone_number": _ANY_VALUE}, + "customer_id": _ANY_VALUE, + "customer_name": _ANY_VALUE, + }, + "date_time": _ANY_VALUE, + "user_id": _ANY_VALUE, + }, + id="simple keys replace config with nested fields in record", + ), + ], +) +def test_transform( + input_record, config, stream_state, stream_slice, keys_replace_config, expected_record +): + KeysReplaceTransformation( + old=keys_replace_config["old"], new=keys_replace_config["new"], parameters={} + ).transform( + record=input_record, config=config, stream_state=stream_state, stream_slice=stream_slice + ) + assert input_record == expected_record From e78eaffd72f32288ea96a27b644c14cfc55aaaf5 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez <168454423+aldogonzalez8@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:53:09 -0600 Subject: [PATCH 4/4] fix(airbyte-cdk): unable to create custom retriever (#198) --- .../parsers/model_to_component_factory.py | 15 +++++--- .../test_model_to_component_factory.py | 36 +++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7cb04c2a6..bd164abc2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1574,7 +1574,12 @@ def create_exponential_backoff_strategy( ) def create_http_requester( - self, model: HttpRequesterModel, decoder: Decoder, config: Config, *, name: str + self, + model: HttpRequesterModel, + config: Config, + decoder: Decoder = JsonDecoder(parameters={}), + *, + name: str, ) -> HttpRequester: authenticator = ( self._create_component_from_model( @@ -1990,9 +1995,9 @@ def create_record_selector( config: Config, *, name: str, - transformations: List[RecordTransformation], - decoder: Optional[Decoder] = None, - client_side_incremental_sync: Optional[Dict[str, Any]] = None, + transformations: List[RecordTransformation] | None = None, + decoder: Decoder | None = None, + client_side_incremental_sync: Dict[str, Any] | None = None, **kwargs: Any, ) -> RecordSelector: assert model.schema_normalization is not None # for mypy @@ -2022,7 +2027,7 @@ def create_record_selector( name=name, config=config, record_filter=record_filter, - transformations=transformations, + transformations=transformations or [], schema_normalization=schema_normalization, parameters=model.parameters or {}, ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index c50cfd521..c50e9e6e9 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -2634,6 +2634,42 @@ def test_create_custom_schema_loader(): assert isinstance(component, MyCustomSchemaLoader) +class MyCustomRetriever(SimpleRetriever): + pass + + +def test_create_custom_retriever(): + stream_model = { + "type": "DeclarativeStream", + "retriever": { + "type": "CustomRetriever", + "class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.MyCustomRetriever", + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, + "$parameters": {"name": ""}, + }, + "requester": { + "type": "HttpRequester", + "name": "list", + "url_base": "orange.com", + "path": "/v1/api", + "$parameters": {"name": ""}, + }, + }, + } + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_model, config=input_config + ) + + assert isinstance(stream, DeclarativeStream) + assert isinstance(stream.retriever, MyCustomRetriever) + + @freezegun.freeze_time("2021-01-01 00:00:00") @pytest.mark.parametrize( "config, manifest, expected",