-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 low-code: Fix incremental substreams #35471
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
…hq/airbyte into alex/explore_substream_state_bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to block as overall changes look good, but I had some comments around the main part of the changes in substream_partition_router.py
that I wanted a little more clarity on
@@ -28,7 +28,7 @@ def get_request_params( | |||
stream_state: Optional[StreamState] = None, | |||
stream_slice: Optional[StreamSlice] = None, | |||
next_page_token: Optional[Mapping[str, Any]] = None, | |||
) -> MutableMapping[str, Any]: | |||
) -> Mapping[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This in theory could have an affect if we were relying on manipulating the contents of this return value. Do you by chance have context why this specific return was originally mutable?
parent_field = parent_stream_config.parent_key.eval(self.config) | ||
stream_state_field = parent_stream_config.partition_field.eval(self.config) | ||
parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string | ||
partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string | ||
for parent_stream_slice in parent_stream.stream_slices( | ||
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None | ||
): | ||
empty_parent_slice = True | ||
parent_slice = parent_stream_slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assignment looks redundant. We don't manipulate either of these variables
for parent_stream_slice in parent_stream.stream_slices( | ||
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None | ||
): | ||
empty_parent_slice = True | ||
parent_slice = parent_stream_slice | ||
if parent_slice: | ||
parent_partition = {k: v for k, v in parent_slice.items() if k in parent_slice.partition.keys()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- The
if parent_slice:
check is to protect againstNone
stream slices that could come from the parent stream right? - How come we do this complicated comprehension statement to create the dictionary if the parent slice item key is in the partition keys? Is there a reason why we can't just call
parent_slice.partition().items()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- That's right. I don't really expect this to happen for declarative stream, but it is technically possible for a stream's stream_slice method to return None...
- Good catch. Simplified to your suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL a few things 👀 thank you for cleaning this up.
if self.end_datetime and not isinstance(self.end_datetime, MinMaxDatetime): | ||
self.end_datetime = MinMaxDatetime(self.end_datetime, parameters) | ||
self._start_datetime = ( | ||
MinMaxDatetime(self.start_datetime, parameters) if not isinstance(self.start_datetime, MinMaxDatetime) else self.start_datetime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how exactly MinMaxDatetime
works, but could we make it's constructor always return an object of MinMaxTime
and accept MinMaxtime
too so that we can always wrap start_datetime into it, cutting on one if statement?
Sure, that's one more function call, but perhaps more readable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Added a factory method for MinMaxDatetime
and simplified this call
self._end_datetime = ( | ||
None | ||
if not self.end_datetime | ||
else MinMaxDatetime(self.end_datetime, parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, nested ifs are harder to read — perhaps if MinMaxtime is always called on that, we cut out one if, and it's easier to read instantly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Added a factory method for MinMaxDatetime
and simplified this call
@@ -109,17 +116,20 @@ def set_initial_state(self, stream_state: StreamState) -> None: | |||
|
|||
:param stream_state: The state of the stream as returned by get_stream_state | |||
""" | |||
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None | |||
self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None | |||
|
|||
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why narrow it?
@@ -179,16 +123,16 @@ def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[Str | |||
return None | |||
|
|||
@staticmethod | |||
def _is_new_state(stream_state): | |||
def _is_new_state(stream_state: Mapping[str, Any]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not be strict on type here? We don't have a special StreamState
type as that mapping yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's correct. We have types for StreamSlice
and Record
, but StreamState
is still TBD
…hq/airbyte into alex/explore_substream_state_bug
What
start_time
is not updated) #33854How
PerPartitionStreamSlice
toStreamSlice
and move it totypes.py
PerPartitionStreamSlice
instead of any mappingsparent_slice
fieldPerPartitionStreamSlice
Recommended reading order
airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py
Files with only cosmetic changes:
airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/cursor.py
🚨 User Impact 🚨
DeclarativeStream
sPre-merge Actions
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.0.0.1
Dockerfile
has version0.0.1
README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog with an entry for the initial version. See changelog exampledocs/integrations/README.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Updating a connector
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Connector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds
then checking in your changesUpdating the Python CDK
Airbyter
Before merging:
--use-local-cdk --name=source-<connector>
as optionsairbyte-ci connectors --use-local-cdk --name=source-<connector> test
After merging: