Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent cdk): Properly call set_initial_state() on the cursor that is initialized on the ClientSideIncrementalRecordFilterDecorator #310

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Feb 2, 2025

What

While I was scoping some of the custom cursor deprecation issues, I noticed that our client side semi incremental filtering might not be working correctly on the CDK. This issue fixes how we assign state to the cursor so that it filters records correctly

How

The issue arises because in model_to_component_factory.py, we actual instantiate a separate DatetimeBasedCursor or any cursor when we do client side filtering. See this block of code https://github.com/airbytehq/airbyte-python-cdk/blob/main/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py#L1547-L1569 .

Because these are two separate instances, we need to call set_initial_state() on both. It feels a little odd to have to do this twice, but I was unsure if we made this separate for a reason so I wanted to keep the flow as close to before as possible

Tested against source-chargebee + CDK unit tests

Summary by CodeRabbit

  • Refactor
    • Improved the initialization process for managing state in incremental processing so that streams handle state more reliably.
  • Tests
    • Added new test coverage to ensure that streams using incremental processing correctly maintain their cursor state.

…n the ClientSideIncrementalRecordFilterDecorator instance
@brianjlai brianjlai requested a review from maxi297 February 2, 2025 02:49
@brianjlai brianjlai changed the title bug(concurrent cdk): Properly call set_initial_state() on the cursor that is initialized on the ClientSideIncrementalRecordFilterDecorator fix(concurrent cdk): Properly call set_initial_state() on the cursor that is initialized on the ClientSideIncrementalRecordFilterDecorator Feb 2, 2025
Copy link
Contributor

coderabbitai bot commented Feb 2, 2025

📝 Walkthrough

Walkthrough

The pull request refines the initialization comments in the ConcurrentDeclarativeSource class by separating the handling of the cursor state for the ClientSideIncrementalRecordFilterDecorator from others. Additionally, a new unit test has been added to verify that streams marked as client-side incremental correctly initialize and maintain their cursor state. The core functionality remains unchanged.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/.../concurrent_declarative_source.py Updated comments in the ConcurrentDeclarativeSource class to clearly distinguish the initialization of state for the ClientSideIncrementalRecordFilterDecorator.
unit_tests/sources/declarative/test_concurrent_declarative_source.py Added test_stream_using_is_client_side_incremental_has_cursor_state to verify that the stream uses the correct default stream and initializes the cursor state as expected.

Sequence Diagram(s)

sequenceDiagram
    participant T as Test
    participant CDS as ConcurrentDeclarativeSource
    participant CIFD as ClientSideIncrementalRecordFilterDecorator
    T ->> CDS: Initialize with manifest (client-side incremental enabled)
    CDS ->> CIFD: Initialize cursor state
    T ->> CDS: Retrieve stream
    CDS -->> T: Return DefaultStream with proper cursor state
Loading

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297, wdyt?
  • tolik0, wdyt?
  • aaronsteers, wdyt?
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)

1653-1689: LGTM! Comprehensive test coverage for client-side incremental cursor state.

The test thoroughly verifies:

  1. Proper state initialization
  2. Correct type casting of components
  3. Accurate cursor value propagation

One suggestion: Would you consider adding a test case with a null/empty state to verify the behavior when no previous state exists? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef97304 and f616450.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

478-479: LGTM! Clear documentation improvement.

The comment split provides better clarity about the state initialization for StopConditionPaginationStrategyDecorator.


483-492: LGTM! Good fix for client-side incremental filtering.

The addition ensures proper state initialization for the ClientSideIncrementalRecordFilterDecorator cursor, which is crucial for semi-incremental streams using is_client_side_incremental to filter properly.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I'm fine with this fix for now. Soon we should have a plan to remove declarative cursors. Maybe it'll be easier when we won't use DeclarativeStream anymore?

Also note that this seems weird to me as I would expect to have DatetimeBasedCursor too as part of the isinstance else we create a new cursor where set_initial_state won't be called. Should we update this?

@brianjlai
Copy link
Contributor Author

That's a good point. I'm fine with this fix for now. Soon we should have a plan to remove declarative cursors. Maybe it'll be easier when we won't use DeclarativeStream anymore?

I think it will get easier, but I worry there. may still be some gaps during deprecation where we find out we're using DatetimeBasedCursor niche places like this where the new ConcurrentCursor isn't compatible. But that's a little out of scope for us right here.

Also note that this seems weird to me as I would expect to have DatetimeBasedCursor too as part of the isinstance else we create a new cursor where set_initial_state won't be called. Should we update this?

My guess here is that its actually synonymous with this code within _merge_stream_slicers() where we end up creating a DatetimeBasedCursor here:

elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
else None
)

So if it doesn't match those either (PerPartitionWithGlobalCursor, GlobalSubstreamCursor), we end up just recreating a DatetimeBasedCursor (or custom cursor) on the last line. I think there's probably a good argument for just simplifying the block to:

if combined_slicers and not isinstance(combined_slicers, supported_slicers):
  supported_slicers = (
                  DatetimeBasedCursor,
                  GlobalSubstreamCursor,
                  PerPartitionWithGlobalCursor,
              )
  raise ValueError(
      "Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead"
  )
client_side_incremental_sync = {"cursor": combined_slicers}

I think I'll investigate this separately because its a good callout, but I want to get this current PR in to unblock the community dev work we're slating in the coming days which depends on this.

@brianjlai brianjlai merged commit ca68c5c into main Feb 4, 2025
24 of 25 checks passed
@brianjlai brianjlai deleted the brian/concurrent_fix_client_side_incremental_set_initial_state branch February 4, 2025 21:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants