-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(AsyncRetriever): Allow for streams using AsyncRetriever and DatetimeBasedCursor to perform checkpointing #226
Conversation
…form checkpointing
📝 WalkthroughWalkthroughThe pull request introduces enhancements to the Airbyte CDK's declarative source components, focusing on improving async retriever and partition router functionality. The changes primarily involve updating cursor management, adding support for Changes
Sequence DiagramsequenceDiagram
participant AsyncRetriever
participant StreamSlicer
participant Cursor
AsyncRetriever->>StreamSlicer: Get stream slice
StreamSlicer-->>AsyncRetriever: Return stream slice
AsyncRetriever->>Cursor: Observe cursor state
AsyncRetriever->>AsyncRetriever: Process records
AsyncRetriever->>Cursor: Update cursor state
Possibly related PRs
Suggested labels
Suggested reviewers
Hey there! 👋 I noticed these changes look quite interesting. Would you like me to elaborate on any specific aspect of the modifications? The cursor management enhancements seem particularly intriguing. Wdyt? 🤔 Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (2)
40-43
: Consider adding type hints for better code clarityThe type hint for
self.stream_slicer
in the condition would make the code more maintainable. What do you think about this change?- if isinstance(self.stream_slicer, DatetimeBasedCursor): + if isinstance(self.stream_slicer, DatetimeBasedCursor): # type: StreamSlicer
45-47
: Add docstring to the cursor propertyWould you consider adding a docstring to explain the purpose and return type of this property? Something like:
@property def cursor(self) -> Optional[DeclarativeCursor]: + """ + Returns the cursor associated with this partition router if it exists. + + Returns: + Optional[DeclarativeCursor]: The cursor used for checkpointing, or None if not available. + """ return self._cursorairbyte_cdk/sources/declarative/retrievers/async_retriever.py (2)
88-113
: Consider extracting cursor-related logic to improve readabilityThe
read_records
method has grown complex with the addition of cursor management. Would you consider extracting the cursor-related logic into a separate method for better maintainability? Something like:+ def _handle_cursor_updates( + self, + stream_data: Optional[StreamData], + stream_slice: StreamSlice, + most_recent_record: Optional[Record], + ) -> Optional[Record]: + """Handle cursor updates for a single record.""" + if self.cursor and stream_data: + self.cursor.observe(stream_slice, stream_data) + return self._get_most_recent_record(most_recent_record, stream_data, stream_slice) + return most_recent_record def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) stream_state: StreamState = self.state partition: AsyncPartition = self._validate_and_get_stream_slice_partition(stream_slice) records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(partition) most_recent_record_from_slice = None for stream_data in self.record_selector.filter_and_transform( all_data=records, stream_state=stream_state, records_schema=records_schema, stream_slice=_slice, ): - if self.cursor and stream_data: - self.cursor.observe(_slice, stream_data) - - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, stream_data, _slice - ) + most_recent_record_from_slice = self._handle_cursor_updates( + stream_data, _slice, most_recent_record_from_slice + ) yield stream_data
115-131
: Add early return for optimizationIn
_get_most_recent_record
, we could optimize the logic with an early return. What do you think about this change?def _get_most_recent_record( self, current_most_recent: Optional[Record], current_record: Optional[Record], stream_slice: StreamSlice, ) -> Optional[Record]: + if not self.cursor or not current_record: + return None + if not current_most_recent: + return current_record + return ( + current_most_recent + if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) + else current_record + ) - if self.cursor and current_record: - if not current_most_recent: - return current_record - else: - return ( - current_most_recent - if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) - else current_record - ) - else: - return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/declarative_stream.py
(3 hunks)airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
(2 hunks)airbyte_cdk/sources/declarative/retrievers/async_retriever.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/declarative_stream.py (1)
193-196
: LGTM! Clean implementation of AsyncRetriever support.The extension of
get_cursor
to supportAsyncRetriever
is well-implemented and aligns with the PR objectives.
closing in favor of #228 |
What
Add the ability for streams that define an AsyncRetriever and a DatetimeBasedCursor to perform periodic checkpointing when the date time window of a stream slice is synced successfully.
This is needed to unblock
source-amazon-seller-partner
which supports incremental async jobs.How
It's worth noting that this PR goes a bit in the opposite direction where we want to deprecate the existing
DatetimeBasedCursor
because now its more closely integrated into theAsyncRetriever
stream slicing mechanism. However, due to the short time frame to deliver Amazon Seller Partner, trying to inject theConcurrentCursor
into the low-codeAsyncJobPartitionRouter
felt like the more difficult path.The tradeoff in the short term here is that our Async + Incremental streams will run synchronously (worth noting that this was already the existing behavior based on how we construct the
concurrent_declarative_source.py
) So we incur a little bit of tech debt in exchange for simplicity to implementSummary by CodeRabbit
New Features
Improvements
Technical Enhancements