Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 low-code: Fix incremental substreams #35471

Merged
merged 74 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
10d06bc
first hack
girarda Feb 21, 2024
7b9af90
Update
girarda Feb 21, 2024
82f8915
Update
girarda Feb 21, 2024
b7f45eb
this looks ok with all the hardcoded stuff...
girarda Feb 21, 2024
0dc78b7
less hardcoding
girarda Feb 21, 2024
5b90e2b
update
girarda Feb 21, 2024
9f78777
delete
girarda Feb 21, 2024
c503691
reset to master
girarda Feb 21, 2024
624eddc
update
girarda Feb 21, 2024
35db4cf
undo rename
girarda Feb 21, 2024
94f2c40
rename
girarda Feb 21, 2024
e316f5f
update
girarda Feb 21, 2024
3bdc41e
Update
girarda Feb 21, 2024
ccd0645
update
girarda Feb 21, 2024
6dce076
remove prints
girarda Feb 21, 2024
7360b8c
reset files
girarda Feb 21, 2024
601d15d
remove unused file
girarda Feb 21, 2024
c94ef81
small update
girarda Feb 21, 2024
b8eb8db
move code around
girarda Feb 21, 2024
288e56c
format
girarda Feb 21, 2024
c5fac71
fix about half the mypy issues
girarda Feb 22, 2024
af8f439
fix more type issues
girarda Feb 22, 2024
49b8a26
more fixes
girarda Feb 22, 2024
90254dd
update
girarda Feb 22, 2024
cfb69eb
fix most unit tests
girarda Feb 22, 2024
401518c
fix lingering tests
girarda Feb 22, 2024
3df9648
update
girarda Feb 22, 2024
8eb4276
missing test for datetime based cursor
girarda Feb 22, 2024
525843a
more tests
girarda Feb 22, 2024
53d1b86
more tests
girarda Feb 22, 2024
0e90bf5
more tests
girarda Feb 22, 2024
b350af8
test
girarda Feb 22, 2024
78e4364
format
girarda Feb 22, 2024
5ca282a
rename
girarda Feb 23, 2024
203819c
Revert "rename"
girarda Feb 23, 2024
2d2156f
remove unused annotation
girarda Feb 23, 2024
529c8b3
kv param
girarda Feb 23, 2024
401c16d
format
girarda Feb 23, 2024
02b0216
update
girarda Feb 23, 2024
87ba418
Merge branch 'master' into alex/explore_substream_state_bug
girarda Feb 23, 2024
6c97f09
fixes
girarda Feb 23, 2024
da47861
format
girarda Feb 23, 2024
916767a
reset to master
girarda Feb 23, 2024
430f1da
update interface in the test
girarda Feb 26, 2024
0fc4d85
small cleanup
girarda Feb 26, 2024
4542df2
remove unused import
girarda Feb 26, 2024
5d5be67
rename variables for clarity
girarda Feb 26, 2024
9d29852
Revert "reset to master"
girarda Feb 26, 2024
f57c722
Update
girarda Feb 26, 2024
3864c75
update test
girarda Feb 27, 2024
9f1165d
raise an exception if the stream slice has a partition component
girarda Feb 27, 2024
0dd1c66
update
girarda Feb 27, 2024
65a8dc9
first pass at unit test
girarda Feb 29, 2024
72c3104
small refactor
girarda Feb 29, 2024
b625d52
rename to StreamSlice
girarda Mar 1, 2024
5e4ce20
Merge branch 'master' into alex/explore_substream_state_bug
girarda Mar 1, 2024
125424a
comment
girarda Mar 1, 2024
2626467
format
girarda Mar 1, 2024
3dd24e3
Merge branch 'alex/explore_substream_state_bug' of github.com:airbyte…
girarda Mar 1, 2024
87ebdbc
simplify assignment
girarda Mar 5, 2024
2edd16c
handle none stream slice
girarda Mar 5, 2024
2aa68d5
flake
girarda Mar 5, 2024
6c3df15
reformat
girarda Mar 5, 2024
728cd62
reset to master
girarda Mar 5, 2024
f1a415e
format
girarda Mar 6, 2024
dc01934
small cleanup
girarda Mar 6, 2024
75a851c
missing spaces
girarda Mar 6, 2024
9560422
fix test
girarda Mar 6, 2024
6dbc44e
try fixing test on ci
girarda Mar 6, 2024
c149d6d
try adding another filter
girarda Mar 6, 2024
64e408d
Merge branch 'master' into alex/explore_substream_state_bug
girarda Mar 6, 2024
d3dddfa
stop referring to the legacy state
girarda Mar 6, 2024
c7b70e8
Merge branch 'alex/explore_substream_state_bug' of github.com:airbyte…
girarda Mar 6, 2024
641d735
airbyte-ci format
girarda Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
from airbyte_cdk.sources.streams.core import Stream


Expand Down Expand Up @@ -101,6 +101,8 @@ def read_records(
"""
:param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
"""
if not isinstance(stream_slice, StreamSlice):
raise ValueError(f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}")
yield from self.retriever.read_records(self.get_json_schema(), stream_slice)

def get_json_schema(self) -> Mapping[str, Any]: # type: ignore
Expand All @@ -114,7 +116,7 @@ def get_json_schema(self) -> Mapping[str, Any]: # type: ignore

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
) -> Iterable[Optional[StreamSlice]]:
"""
Override to define the slices for this stream. See the stream slicing section of the docs for more information.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Optional

from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import StreamSlice, Record, StreamState


class Cursor(ABC, StreamSlicer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import datetime
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, Optional, Union
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
Expand All @@ -13,7 +13,7 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import Config, Record, StreamState, StreamSlice
from airbyte_cdk.sources.message import MessageRepository
from isodate import Duration, parse_duration

Expand Down Expand Up @@ -70,10 +70,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
f"If step is defined, cursor_granularity should be as well and vice-versa. "
f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`"
)
if not isinstance(self.start_datetime, MinMaxDatetime):
self.start_datetime = MinMaxDatetime(self.start_datetime, parameters)
if self.end_datetime and not isinstance(self.end_datetime, MinMaxDatetime):
self.end_datetime = MinMaxDatetime(self.end_datetime, parameters)
self._start_datetime = (
MinMaxDatetime(self.start_datetime, parameters) if not isinstance(self.start_datetime, MinMaxDatetime) else self.start_datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how exactly MinMaxDatetime works, but could we make it's constructor always return an object of MinMaxTime and accept MinMaxtime too so that we can always wrap start_datetime into it, cutting on one if statement?

Sure, that's one more function call, but perhaps more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. Added a factory method for MinMaxDatetime and simplified this call

)
self._end_datetime = (
None
if not self.end_datetime
else MinMaxDatetime(self.end_datetime, parameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, nested ifs are harder to read — perhaps if MinMaxtime is always called on that, we cut out one if, and it's easier to read instantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. Added a factory method for MinMaxDatetime and simplified this call

if not isinstance(self.end_datetime, MinMaxDatetime)
else self.end_datetime
)

self._timezone = datetime.timezone.utc
self._interpolation = JinjaInterpolation()
Expand All @@ -84,23 +90,23 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else datetime.timedelta.max
)
self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self.lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters)
self.partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
self.partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)
self._cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a breaking change. I understand the want to make more things private but can we confirm this won't have negative impact at least in our repo. Else, is this necessary for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class should only be used by a retriever and the only implementation in our codebase in the SimpleRetriever.

Context for the change was satisfying mypy. The alternative if we don't want to start using a private variable is to add # type: ignore in the file. I can do that if it feels safer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context! I'm fine with this being breaking and we move one step in the right direction

self._lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters) if self.lookback_window else None
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)
self._parser = DatetimeParser()

# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
if not self.start_datetime.datetime_format:
self.start_datetime.datetime_format = self.datetime_format
if self.end_datetime and not self.end_datetime.datetime_format:
self.end_datetime.datetime_format = self.datetime_format
if not self._start_datetime.datetime_format:
self._start_datetime.datetime_format = self.datetime_format
if self._end_datetime and not self._end_datetime.datetime_format:
self._end_datetime.datetime_format = self.datetime_format

if not self.cursor_datetime_formats:
self.cursor_datetime_formats = [self.datetime_format]

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}
return {self._cursor_field.eval(self.config): self._cursor} if self._cursor else {}

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand All @@ -109,17 +115,22 @@ def set_initial_state(self, stream_state: StreamState) -> None:

:param stream_state: The state of the stream as returned by get_stream_state
"""
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None
self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this interface be changed to stream_slice: PerPartitionStreamSlice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why narrow it?

last_record_cursor_value = most_recent_record.get(self.cursor_field.eval(self.config)) if most_recent_record else None
stream_slice_value_end = stream_slice.get(self.partition_field_end.eval(self.config))
if stream_slice.partition:
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
last_record_cursor_value = most_recent_record.get(self._cursor_field.eval(self.config)) if most_recent_record else None
stream_slice_value_end = stream_slice.get(self._partition_field_end.eval(self.config))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be worth having visibility on the case where the partition part of the PerPartitionStreamSlice would be populated. This would mean that we are closing a slice for something the DatetimeBasedCursor did not create which would be an issue.

Note that it's not only visibility for us but also for devs trying to write custom Python code and that might do mistakes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential_cursor_values = [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled into a variable to filter out potentially None values before passing them to parse_date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean the filter on line 132 is not necessary anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the extra filter

cursor_value for cursor_value in [self._cursor, last_record_cursor_value, stream_slice_value_end] if cursor_value
]
cursor_value_str_by_cursor_value_datetime = dict(
map(
# we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like
# 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z'
lambda datetime_str: (self.parse_date(datetime_str), datetime_str),
filter(lambda item: item, [self._cursor, last_record_cursor_value, stream_slice_value_end]),
potential_cursor_values
)
)
self._cursor = (
Expand All @@ -142,37 +153,43 @@ def stream_slices(self) -> Iterable[StreamSlice]:
return self._partition_daterange(start_datetime, end_datetime, self._step)

def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) -> datetime.datetime:
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config) if self.lookback_window else "P0D")
earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config), end_datetime)
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self.config) if self.lookback_window else "P0D")
earliest_possible_start_datetime = min(self._start_datetime.get_datetime(self.config), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
return max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta

def _select_best_end_datetime(self) -> datetime.datetime:
now = datetime.datetime.now(tz=self._timezone)
if not self.end_datetime:
if not self._end_datetime:
return now
return min(self.end_datetime.get_datetime(self.config), now)
return min(self._end_datetime.get_datetime(self.config), now)

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self.cursor_field.eval(self.config)])
if self._cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime) -> str:
return self._parser.format(dt, self.datetime_format)

def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]):
start_field = self.partition_field_start.eval(self.config)
end_field = self.partition_field_end.eval(self.config)
def _partition_daterange(
self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]
) -> List[StreamSlice]:
start_field = self._partition_field_start.eval(self.config)
end_field = self._partition_field_end.eval(self.config)
dates = []
while start <= end:
next_start = self._evaluate_next_start_date_safely(start, step)
end_date = self._get_date(next_start - self._cursor_granularity, end, min)
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
dates.append(
StreamSlice(
partition={}, cursor_slice={start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)}
)
)
start = next_start
return dates

def _evaluate_next_start_date_safely(self, start, step):
def _evaluate_next_start_date_safely(self, start: datetime.datetime, step: datetime.timedelta) -> datetime.datetime:
"""
Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
Expand All @@ -183,7 +200,12 @@ def _evaluate_next_start_date_safely(self, start, step):
except OverflowError:
return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)

def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -> datetime.datetime:
def _get_date(
self,
cursor_value: datetime.datetime,
default_date: datetime.datetime,
comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime],
) -> datetime.datetime:
cursor_date = cursor_value or default_date
return comparator(cursor_date, default_date)

Expand All @@ -196,7 +218,7 @@ def parse_date(self, date: str) -> datetime.datetime:
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")

@classmethod
def _parse_timedelta(cls, time_str) -> Union[datetime.timedelta, Duration]:
def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:
"""
:return Parses an ISO 8601 durations into datetime.timedelta or Duration objects.
"""
Expand Down Expand Up @@ -244,18 +266,20 @@ def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

def _get_request_options(self, option_type: RequestOptionType, stream_slice: StreamSlice):
options = {}
def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]:
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options
if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get(
self.partition_field_start.eval(self.config)
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string
self._partition_field_start.eval(self.config)
)
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self.partition_field_end.eval(self.config))
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string
return options

def should_be_synced(self, record: Record) -> bool:
cursor_field = self.cursor_field.eval(self.config)
cursor_field = self._cursor_field.eval(self.config)
record_cursor_value = record.get(cursor_field)
if not record_cursor_value:
self._send_log(
Expand All @@ -278,7 +302,7 @@ def _send_log(self, level: Level, message: str) -> None:
)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
cursor_field = self.cursor_field.eval(self.config)
cursor_field = self._cursor_field.eval(self.config)
first_cursor_value = first.get(cursor_field)
second_cursor_value = second.get(cursor_field)
if first_cursor_value and second_cursor_value:
Expand Down
Loading
Loading