Skip to content

Commit

Permalink
Add parent state handling to CartesianProductStreamSlicer
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed May 27, 2024
1 parent 47a6f21 commit 86d26b5
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,42 @@ def stream_slices(self) -> Iterable[StreamSlice]:
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the initial state for the cursors.
This method initializes the state for each partition cursor using the provided stream state.
If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
does not have parent streams, this step will be skipped due to the default StreamSlicer implementation.
Args:
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
{
"states": [
{
"partition": {
"partition_key": "value"
},
"cursor": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
],
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
}
"""
if not stream_state:
return

for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

# Set parent state for partition routers based on parent streams
self._partition_router.set_parent_state(stream_state)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,25 @@ def stream_slices(self) -> Iterable[StreamSlice]:

yield from stream_slices_for_parent

def set_parent_state(self, stream_state: Optional[StreamState]) -> None:
def set_parent_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
Args:
stream_state (Optional[StreamState]): The state of the streams to be set. If `parent_state` exists in the
stream_state (StreamState): The state of the streams to be set. If `parent_state` exists in the
stream_state, it will update the state of each parent stream with the corresponding state from the stream_state.
Example of state format:
{
"parent_state": {
"parent_stream_name1": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_stream_name2": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
}
"""
if not stream_state:
return
Expand All @@ -183,12 +195,21 @@ def set_parent_state(self, stream_state: Optional[StreamState]) -> None:
if parent_config.incremental_dependency:
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})

def get_parent_state(self) -> StreamState:
def get_parent_state(self) -> Optional[Mapping[str, StreamState]]:
"""
Get the state of the parent streams.
Returns:
StreamState: The current state of the parent streams.
Example of state format:
{
"parent_stream_name1": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_stream_name2": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
"""
parent_stream_name = self.parent_stream_configs[0].stream.name if self.parent_stream_configs else None
return self._parent_state
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,53 @@ def stream_slices(self) -> Iterable[StreamSlice]:
else:
cursor_slice = {}
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

def set_parent_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
This method tries to set the parent state for every stream slicer. If a stream slicer does not have parent streams,
this will be skipped due to the default StreamSlicer implementation.
Args:
stream_state (StreamState): The state of the streams to be set. If `parent_state` exists in the
stream_state, it will update the state of each parent stream with the corresponding state from the stream_state.
Example of state format:
{
"parent_state": {
"parent_stream_name_1": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_stream_name_2": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
}
"""
for stream_slicer in self.stream_slicers:
stream_slicer.set_parent_state(stream_state)

def get_parent_state(self) -> Optional[Mapping[str, StreamState]]:
"""
Get the state of the parent streams.
This method returns the combined parent states from all stream slicers. If a stream slicer does not have parent streams,
this will be skipped due to the default StreamSlicer implementation.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
The returned format will be:
{
"parent_stream_name1": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_stream_name2": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
"""
combined_state = {}
for s in self.stream_slicers:
combined_state.update(s.get_parent_state())
return combined_state
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Iterable, Optional
from typing import Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.types import StreamSlice, StreamState
Expand Down Expand Up @@ -33,16 +33,40 @@ def set_parent_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
This method should only be defined if the slicer is based on some parent stream and needs to read this stream
incrementally using the state.
Args:
stream_state: The state of the streams to be set. This method can be overridden by subclasses.
stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
'parent_state' which is a dictionary of parent state names to their corresponding state.
Example:
{
"parent_state": {
"parent_stream_name_1": { ... },
"parent_stream_name_2": { ... },
...
}
}
"""
pass

def get_parent_state(self) -> Optional[StreamState]:
def get_parent_state(self) -> Optional[Mapping[str, StreamState]]:
"""
Get the state of the parent streams.
This method should only be defined if the slicer is based on some parent stream and needs to read this stream
incrementally using the state.
Returns:
The current state of the parent streams. This method can be overridden by subclasses.
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
The returned format will be:
{
"parent_stream_name1": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_stream_name2": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
"""
return None

0 comments on commit 86d26b5

Please sign in to comment.