diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 8ac77f4fd55e..4233816a504c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -6,7 +6,7 @@ from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional, Union from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record, StreamSlice, StreamState @@ -62,7 +62,7 @@ class PerPartitionCursor(DeclarativeCursor): _KEY = 0 _VALUE = 1 - def __init__(self, cursor_factory: CursorFactory, partition_router: StreamSlicer): + def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): self._cursor_factory = cursor_factory self._partition_router = partition_router self._cursor_per_partition: MutableMapping[str, DeclarativeCursor] = {} @@ -87,7 +87,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router - does not have parent streams, this step will be skipped due to the default StreamSlicer implementation. + does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. Args: stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: @@ -116,7 +116,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) # Set parent state for partition routers based on parent streams - self._partition_router.set_parent_state(stream_state) + self._partition_router.set_initial_state(stream_state) def observe(self, stream_slice: StreamSlice, record: Record) -> None: self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( @@ -147,7 +147,7 @@ def get_stream_state(self) -> StreamState: ) state: dict[str, Any] = {"states": states} - parent_state = self._partition_router.get_parent_state() + parent_state = self._partition_router.get_stream_state() if parent_state: state["parent_state"] = parent_state return state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 694dacb0c742..b1861142b569 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -22,6 +22,7 @@ from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig, SubstreamPartitionRouter from airbyte_cdk.sources.declarative.requesters import RequestOption @@ -49,7 +50,6 @@ from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.transformations import RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition, AddFields from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import SingleUseRefreshTokenOauth2Authenticator diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 544f8e6f73d2..4ae2ea07bbd0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -97,7 +97,12 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel -from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.partition_routers import ( + CartesianProductStreamSlicer, + ListPartitionRouter, + SinglePartitionRouter, + SubstreamPartitionRouter, +) from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig from airbyte_cdk.sources.declarative.requesters import HttpRequester, RequestOption from airbyte_cdk.sources.declarative.requesters.error_handlers import CompositeErrorHandler, DefaultErrorHandler, HttpResponseFilter @@ -123,7 +128,7 @@ from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader from airbyte_cdk.sources.declarative.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer, StreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/__init__.py index 5a0b64841cdb..86e472a42c52 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/__init__.py @@ -2,8 +2,9 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter -__all__ = ["ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"] +__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py index 6057bfe62caa..564a3119e25b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py @@ -6,13 +6,13 @@ from typing import Any, Iterable, List, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @dataclass -class ListPartitionRouter(StreamSlicer): +class ListPartitionRouter(PartitionRouter): """ Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list @@ -87,3 +87,15 @@ def _get_request_option(self, request_option_type: RequestOptionType, stream_sli return {} else: return {} + + def set_initial_state(self, stream_state: StreamState) -> None: + """ + ListPartitionRouter doesn't have parent streams + """ + pass + + def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + """ + ListPartitionRouter doesn't have parent streams + """ + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py index eb5cb15315b5..32e6a353dedf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py @@ -5,12 +5,12 @@ from dataclasses import InitVar, dataclass from typing import Any, Iterable, Mapping, Optional -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import StreamSlice, StreamState @dataclass -class SinglePartitionRouter(StreamSlicer): +class SinglePartitionRouter(PartitionRouter): """Partition router returning only a stream slice""" parameters: InitVar[Mapping[str, Any]] @@ -49,3 +49,15 @@ def get_request_body_json( def stream_slices(self) -> Iterable[StreamSlice]: yield StreamSlice(partition={}, cursor_slice={}) + + def set_initial_state(self, stream_state: StreamState) -> None: + """ + SinglePartitionRouter doesn't have parent streams + """ + pass + + def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + """ + SinglePartitionRouter doesn't have parent streams + """ + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 71f40a2b8a06..732697a2a3fa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -8,8 +8,8 @@ import dpath.util from airbyte_cdk.models import AirbyteMessage, SyncMode, Type from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState if TYPE_CHECKING: @@ -42,7 +42,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: @dataclass -class SubstreamPartitionRouter(StreamSlicer): +class SubstreamPartitionRouter(PartitionRouter): """ Partition router that iterates over the parent's stream records and emits slices Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components @@ -164,7 +164,7 @@ def stream_slices(self) -> Iterable[StreamSlice]: yield from stream_slices_for_parent - def set_parent_state(self, stream_state: StreamState) -> None: + def set_initial_state(self, stream_state: StreamState) -> None: """ Set the state of the parent streams. @@ -195,7 +195,7 @@ def set_parent_state(self, stream_state: StreamState) -> None: if parent_config.incremental_dependency: parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) - def get_parent_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: """ Get the state of the parent streams. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py index 505aea5253cb..92c774098dae 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor.py @@ -8,7 +8,7 @@ import pytest from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor, PerPartitionKeySerializer, StreamSlice -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record PARTITION = { @@ -105,7 +105,7 @@ def build(self): @pytest.fixture() def mocked_partition_router(): - return Mock(spec=StreamSlicer) + return Mock(spec=PartitionRouter) @pytest.fixture() @@ -192,7 +192,7 @@ def test_given_stream_slices_when_get_stream_state_then_return_updated_state(moc ] # Mock the get_parent_state method to return the parent state - mocked_partition_router.get_parent_state.return_value = {} + mocked_partition_router.get_stream_state.return_value = {} cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) list(cursor.stream_slices()) @@ -453,7 +453,7 @@ def test_parent_state_is_set_for_per_partition_cursor(mocked_cursor_factory, moc ] # Mock the get_parent_state method to return the parent state - mocked_partition_router.get_parent_state.return_value = parent_state + mocked_partition_router.get_stream_state.return_value = parent_state # Initialize the PerPartitionCursor with the mocked cursor factory and partition router cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) @@ -469,7 +469,7 @@ def test_parent_state_is_set_for_per_partition_cursor(mocked_cursor_factory, moc assert cursor.get_stream_state()["parent_state"] == parent_state # Verify that set_parent_state was called on the partition router with the initial state - mocked_partition_router.set_parent_state.assert_called_once_with(initial_state) + mocked_partition_router.set_initial_state.assert_called_once_with(initial_state) def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_partition_router): @@ -495,7 +495,7 @@ def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_pa ] # Mock the get_parent_state method to return the parent state - mocked_partition_router.get_parent_state.return_value = parent_state + mocked_partition_router.get_stream_state.return_value = parent_state # Initialize the PerPartitionCursor with the mocked cursor factory and partition router cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 6306a854ec6e..b6a84c6373a5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -46,7 +46,12 @@ from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory -from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.partition_routers import ( + CartesianProductStreamSlicer, + ListPartitionRouter, + SinglePartitionRouter, + SubstreamPartitionRouter, +) from airbyte_cdk.sources.declarative.requesters import HttpRequester from airbyte_cdk.sources.declarative.requesters.error_handlers import CompositeErrorHandler, DefaultErrorHandler, HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ( @@ -71,7 +76,6 @@ from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader from airbyte_cdk.sources.declarative.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index e00b8cb0dd7b..9f25c627a76e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -360,7 +360,7 @@ def test_substream_slicer_parent_state_update_with_cursor(parent_stream_config, pass # This will process the slices and should update the parent state # Check if the parent state has been updated correctly - parent_state = partition_router.get_parent_state() + parent_state = partition_router.get_stream_state() assert parent_state == expected_state