Skip to content

Commit

Permalink
Update other partition routers
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jun 6, 2024
1 parent c745075 commit 8875233
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState


Expand Down Expand Up @@ -62,7 +62,7 @@ class PerPartitionCursor(DeclarativeCursor):
_KEY = 0
_VALUE = 1

def __init__(self, cursor_factory: CursorFactory, partition_router: StreamSlicer):
def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
self._cursor_factory = cursor_factory
self._partition_router = partition_router
self._cursor_per_partition: MutableMapping[str, DeclarativeCursor] = {}
Expand All @@ -87,7 +87,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
does not have parent streams, this step will be skipped due to the default StreamSlicer implementation.
does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Args:
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
Expand Down Expand Up @@ -116,7 +116,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

# Set parent state for partition routers based on parent streams
self._partition_router.set_parent_state(stream_state)
self._partition_router.set_initial_state(stream_state)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
Expand Down Expand Up @@ -147,7 +147,7 @@ def get_stream_state(self) -> StreamState:
)
state: dict[str, Any] = {"states": states}

parent_state = self._partition_router.get_parent_state()
parent_state = self._partition_router.get_stream_state()
if parent_state:
state["parent_state"] = parent_state
return state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.requesters import RequestOption
Expand Down Expand Up @@ -49,7 +50,6 @@
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.transformations import RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition, AddFields
from airbyte_cdk.sources.streams.http.requests_native_auth.oauth import SingleUseRefreshTokenOauth2Authenticator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
SinglePartitionRouter,
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.declarative.requesters import HttpRequester, RequestOption
from airbyte_cdk.sources.declarative.requesters.error_handlers import CompositeErrorHandler, DefaultErrorHandler, HttpResponseFilter
Expand All @@ -123,7 +128,7 @@
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer, StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter

__all__ = ["ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"]
__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"]
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from typing import Any, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class ListPartitionRouter(StreamSlicer):
class ListPartitionRouter(PartitionRouter):
"""
Partition router that iterates over the values of a list
If values is a string, then evaluate it as literal and assert the resulting literal is a list
Expand Down Expand Up @@ -87,3 +87,15 @@ def _get_request_option(self, request_option_type: RequestOptionType, stream_sli
return {}
else:
return {}

def set_initial_state(self, stream_state: StreamState) -> None:
"""
ListPartitionRouter doesn't have parent streams
"""
pass

def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""
ListPartitionRouter doesn't have parent streams
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import StreamSlice, StreamState


@dataclass
class SinglePartitionRouter(StreamSlicer):
class SinglePartitionRouter(PartitionRouter):
"""Partition router returning only a stream slice"""

parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -49,3 +49,15 @@ def get_request_body_json(

def stream_slices(self) -> Iterable[StreamSlice]:
yield StreamSlice(partition={}, cursor_slice={})

def set_initial_state(self, stream_state: StreamState) -> None:
"""
SinglePartitionRouter doesn't have parent streams
"""
pass

def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""
SinglePartitionRouter doesn't have parent streams
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import dpath.util
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState

if TYPE_CHECKING:
Expand Down Expand Up @@ -42,7 +42,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:


@dataclass
class SubstreamPartitionRouter(StreamSlicer):
class SubstreamPartitionRouter(PartitionRouter):
"""
Partition router that iterates over the parent's stream records and emits slices
Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components
Expand Down Expand Up @@ -164,7 +164,7 @@ def stream_slices(self) -> Iterable[StreamSlice]:

yield from stream_slices_for_parent

def set_parent_state(self, stream_state: StreamState) -> None:
def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
Expand Down Expand Up @@ -195,7 +195,7 @@ def set_parent_state(self, stream_state: StreamState) -> None:
if parent_config.incremental_dependency:
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})

def get_parent_state(self) -> Optional[Mapping[str, StreamState]]:
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""
Get the state of the parent streams.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import PerPartitionCursor, PerPartitionKeySerializer, StreamSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record

PARTITION = {
Expand Down Expand Up @@ -105,7 +105,7 @@ def build(self):

@pytest.fixture()
def mocked_partition_router():
return Mock(spec=StreamSlicer)
return Mock(spec=PartitionRouter)


@pytest.fixture()
Expand Down Expand Up @@ -192,7 +192,7 @@ def test_given_stream_slices_when_get_stream_state_then_return_updated_state(moc
]

# Mock the get_parent_state method to return the parent state
mocked_partition_router.get_parent_state.return_value = {}
mocked_partition_router.get_stream_state.return_value = {}

cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
list(cursor.stream_slices())
Expand Down Expand Up @@ -453,7 +453,7 @@ def test_parent_state_is_set_for_per_partition_cursor(mocked_cursor_factory, moc
]

# Mock the get_parent_state method to return the parent state
mocked_partition_router.get_parent_state.return_value = parent_state
mocked_partition_router.get_stream_state.return_value = parent_state

# Initialize the PerPartitionCursor with the mocked cursor factory and partition router
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
Expand All @@ -469,7 +469,7 @@ def test_parent_state_is_set_for_per_partition_cursor(mocked_cursor_factory, moc
assert cursor.get_stream_state()["parent_state"] == parent_state

# Verify that set_parent_state was called on the partition router with the initial state
mocked_partition_router.set_parent_state.assert_called_once_with(initial_state)
mocked_partition_router.set_initial_state.assert_called_once_with(initial_state)


def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_partition_router):
Expand All @@ -495,7 +495,7 @@ def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_pa
]

# Mock the get_parent_state method to return the parent state
mocked_partition_router.get_parent_state.return_value = parent_state
mocked_partition_router.get_stream_state.return_value = parent_state

# Initialize the PerPartitionCursor with the mocked cursor factory and partition router
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
SinglePartitionRouter,
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.error_handlers import CompositeErrorHandler, DefaultErrorHandler, HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
Expand All @@ -71,7 +76,6 @@
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def test_substream_slicer_parent_state_update_with_cursor(parent_stream_config,
pass # This will process the slices and should update the parent state

# Check if the parent state has been updated correctly
parent_state = partition_router.get_parent_state()
parent_state = partition_router.get_stream_state()
assert parent_state == expected_state


Expand Down

0 comments on commit 8875233

Please sign in to comment.