Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code concurrent): Add use_global_cursor flag to ConcurrentPerPartitionCursor #279

Merged
merged 3 commits into from
Jan 28, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Jan 28, 2025

Summary by CodeRabbit

  • New Features

    • Enhanced state management for concurrent partition cursors.
    • Added ability to switch between global and per-partition state handling.
    • Introduced mechanism to track partition cursor limits.
  • Tests

    • Updated test cases to validate new state management scenarios.
    • Added test configurations for global and per-partition cursor states.

@github-actions github-actions bot added the enhancement New feature or request label Jan 28, 2025
Copy link
Contributor

coderabbitai bot commented Jan 28, 2025

📝 Walkthrough

Walkthrough

The pull request introduces modifications to the ConcurrentPerPartitionCursor class in the Airbyte CDK, focusing on enhancing state management capabilities. The changes add a new boolean attribute _use_global_cursor to control how cursor states are handled, allowing the system to dynamically switch between global and per-partition state management based on specific conditions.

Changes

File Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py - Added _use_global_cursor attribute
- Modified state property to include global cursor flag
- Updated close_partition method to conditionally emit state
- Added limit_reached() method to check partition limit
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py - Added new state migration constants
- Updated test parameterizations
- Expanded test scenarios for state management

Sequence Diagram

sequenceDiagram
    participant Cursor as ConcurrentPerPartitionCursor
    participant Partition as Partition
    
    Cursor->>Cursor: Initialize _use_global_cursor
    Cursor->>Partition: Observe partition
    alt Partition limit reached
        Cursor->>Cursor: Set _use_global_cursor = True
    else Partition limit not reached
        Cursor->>Cursor: Maintain per-partition state
    end
    Cursor->>Cursor: Emit state based on _use_global_cursor
Loading

Possibly related PRs

Suggested reviewers

  • maxi297
  • aaronsteers

Hey there! 👋 I noticed some interesting state management improvements. What do you think about the new _use_global_cursor approach? It seems like a flexible way to handle different cursor scenarios. Wdyt? 🤔

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (15)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (11)

724-724: Consider a dedicated test scenario for this setting
Would you consider adding a separate test scenario for when "use_global_cursor" is True just to ensure coverage of that path? wdyt?


1125-1125: Add a quick rationale
Could you leave a brief inline comment on why "use_global_cursor" is False here, so future readers quickly grasp the reasoning? wdyt?


1175-1227: Reduce nesting for clarity
This dictionary is quite large. Would you consider factoring out nested chunks or using helper methods to make it easier to read and maintain? wdyt?


1228-1228: Check necessity of deepcopy
Is it crucial to use copy.deepcopy here rather than a direct assignment? If so, could you add a short comment explaining why? wdyt?


1230-1230: Clarify test purpose
This line forces "use_global_cursor" to True. Could you note which exact behavior this is meant to verify? wdyt?


1393-1425: Cover empty-partition edge case
Might it be beneficial to include a test with "use_global_cursor": True and zero partitions in initial_state to ensure the fallback logic works as expected? wdyt?


1538-1538: Maintain consistency
We set "use_global_cursor": False in multiple tests. Would you prefer a single constant or fixture to keep them in sync and reduce duplication? wdyt?


1713-1713: Possible refactor
We keep repeating "use_global_cursor": False. Shall we define a shared helper or fixture to avoid accidental typos or mismatch? wdyt?


2251-2251: Centralize settings
Would it be simpler to store "use_global_cursor" in a single place or constant to adjust it for every test if needed? wdyt?


2350-2350: Consider environment-driven approach
Would you consider reading "use_global_cursor" from an environment variable for more flexible local testing or automation? wdyt?


2878-2878: Confirm default consistency
Is defaulting to False for "use_global_cursor" consistent across other parts of the codebase? Maybe unify to avoid confusion. wdyt?

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

98-98: Constructor default
Would it be clearer to accept "use_global_cursor" as an explicit constructor parameter instead of always defaulting to False? wdyt?


109-120: Behavior when use_global_cursor is True
Would it help to log or warn if we have non-empty partition states while use_global_cursor is active, to highlight potentially conflicting usage? wdyt?


232-232: Naming _over_limit
Could _over_limit be renamed to something like _excess_partition_count to clarify its purpose? wdyt?


305-306: Type validation
What if stream_state.get("use_global_cursor") is not a bool? Maybe you could raise a warning or exception to avoid unexpected behavior? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a055174 and be6b8ae.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (6)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

Line range hint 1234-1391: Expand coverage in parameterization
Could you consider adding a case where "use_global_cursor" is set to a non-boolean to ensure validation or error handling? wdyt?


1706-1706: Mid-sync toggles
Are there tests covering a mid-sync transition from False to True? That scenario might reveal corner cases. wdyt?


1983-1983: Check partial sync states
How does toggling "use_global_cursor" after some partitions have synced affect the final state? Adding a targeted test might clarify. wdyt?

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

153-154: Potential missed states
When use_global_cursor is True, we skip this emission. Do we risk losing intermediate states? Perhaps a comment clarifying would help. wdyt?


330-332: Switching to global cursor
Switching to use_global_cursor once the limit is reached might drop or consolidate partition states unexpectedly. Would you consider logging a clear message about this transition? wdyt?


372-373: Comparison operator
Shouldn’t this be >= to indicate the limit is reached exactly at DEFAULT_MAX_PARTITIONS_NUMBER? wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

109-120: Consider extracting state building logic for better maintainability?

The state property could be more maintainable by extracting the per-partition state building logic into a separate method. This would make the code easier to test and modify. Additionally, we could optimize the dictionary creation. WDYT about something like this?

 @property
 def state(self) -> MutableMapping[str, Any]:
-    state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
+    state = self._build_base_state()
     if not self._use_global_cursor:
-        states = []
-        for partition_tuple, cursor in self._cursor_per_partition.items():
-            if cursor.state:
-                states.append(
-                    {
-                        "partition": self._to_dict(partition_tuple),
-                        "cursor": copy.deepcopy(cursor.state),
-                    }
-                )
-        state[self._PERPARTITION_STATE_KEY] = states
+        state[self._PERPARTITION_STATE_KEY] = self._build_partition_states()
     # ... rest of the method

+ def _build_base_state(self) -> dict[str, Any]:
+     return {"use_global_cursor": self._use_global_cursor}
+
+ def _build_partition_states(self) -> list[dict[str, Any]]:
+     return [
+         {
+             "partition": self._to_dict(partition_tuple),
+             "cursor": copy.deepcopy(cursor.state),
+         }
+         for partition_tuple, cursor in self._cursor_per_partition.items()
+         if cursor.state
+     ]
🧰 Tools
🪛 GitHub Actions: Linters

[warning] File requires formatting. The code does not conform to the project's formatting standards.


153-154: Add a comment explaining the conditional state emission?

The condition if not self._use_global_cursor determines when to emit state messages, but it might not be immediately clear why. Would you consider adding a brief comment explaining this behavior? Something like:

+            # Only emit state messages for per-partition state management
+            # When using global cursor, state emission is deferred until all partitions are processed
             if not self._use_global_cursor:
                 self._emit_state_message()
🧰 Tools
🪛 GitHub Actions: Linters

[warning] File requires formatting. The code does not conform to the project's formatting standards.


232-240: Consider improving the over-limit tracking?

A few suggestions to make this more robust:

  1. The over-limit counter (self._over_limit) is an unbounded integer. For long-running syncs, should we consider using a bounded counter or resetting it periodically to prevent potential overflow?

  2. The log message includes the limit value directly. Would it be clearer to use a class constant or format string? Something like:

+    _PARTITION_LIMIT_MESSAGE = "The maximum number of partitions ({limit}) has been reached. Dropping the oldest {state} partition: {partition}. Over limit: {count}."
+
     def _ensure_partition_limit(self) -> None:
         with self._lock:
             while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
                 self._over_limit += 1
                 for partition_key in list(self._cursor_per_partition.keys()):
                     if partition_key in self._finished_partitions and self._semaphore_per_partition[partition_key]._value == 0:
                         oldest_partition = self._cursor_per_partition.pop(partition_key)
                         logger.warning(
-                            f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
+                            self._PARTITION_LIMIT_MESSAGE.format(
+                                limit=self.DEFAULT_MAX_PARTITIONS_NUMBER,
+                                state="finished",
+                                partition=oldest_partition,
+                                count=self._over_limit
+                            )
                         )
🧰 Tools
🪛 GitHub Actions: Linters

[warning] File requires formatting. The code does not conform to the project's formatting standards.


330-332: Consider refining the global cursor transition logic?

A few thoughts on the current implementation:

  1. The transition to global cursor is one-way (irreversible). Should we consider conditions where we might want to switch back to per-partition state?

  2. The limit_reached method uses a simple comparison. Would it be more accurate to consider both the current partition count and the over-limit counter? Something like:

     def limit_reached(self) -> bool:
-        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
+        return (
+            len(self._cursor_per_partition) >= self.DEFAULT_MAX_PARTITIONS_NUMBER and
+            self._over_limit > 0
+        )

What are your thoughts on these suggestions?

Also applies to: 372-373

🧰 Tools
🪛 GitHub Actions: Linters

[warning] File requires formatting. The code does not conform to the project's formatting standards.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be6b8ae and dc20bf6.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] File requires formatting. The code does not conform to the project's formatting standards.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Python Package
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

Line range hint 1-373: Run the formatter to fix formatting issues?

The pipeline indicates that the file requires formatting. Would you mind running the project's formatter to ensure the code conforms to the formatting standards?

🧰 Tools
🪛 GitHub Actions: Linters

[warning] File requires formatting. The code does not conform to the project's formatting standards.

@tolik0 tolik0 requested a review from maxi297 January 28, 2025 17:52
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question regarding thread safety

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@maxi297
Copy link
Contributor

maxi297 commented Jan 28, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5)

109-120: Consider documenting the use_global_cursor field in the state dictionary.

The state dictionary now includes a new field that affects behavior, but its purpose isn't immediately clear from the code. Would you consider adding a docstring explaining when and why this flag is set? wdyt?


153-154: Consider adding a comment explaining the state emission condition.

The state emission is skipped when using global cursor. Would it be helpful to add a brief comment explaining why we don't emit state in this case? This could help future maintainers understand the design decision. wdyt?


232-243: Consider making the warning message more actionable.

The warning message now includes the over limit count, which is great for observability. Would it be helpful to include guidance on what users should do when they see this warning? For example, suggesting they adjust their partition strategy or increase the limit if needed? wdyt?


333-335: Consider logging the switch to global cursor.

When we switch to using the global cursor due to reaching the limit, it might be helpful to log this significant change in behavior. This could help with debugging and monitoring. wdyt?

 if not self._use_global_cursor and self.limit_reached():
+    logger.info("Switching to global cursor as partition limit has been reached")
     self._use_global_cursor = True

375-376: Consider caching the limit check result.

The limit_reached method is called frequently in the observe method. Would it make sense to cache the result once we exceed the limit since it can't go back down? This could avoid unnecessary comparisons. wdyt?

+    _limit_reached: bool = False
+
     def limit_reached(self) -> bool:
-        return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
+        if not self._limit_reached:
+            self._limit_reached = self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
+        return self._limit_reached
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc20bf6 and 843d780.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

308-309: Consider the implications of the default value.

The use_global_cursor is initialized with a default of False. Have you considered if this is the safest default for all scenarios? For example, what happens if we're migrating from an older version that didn't have this flag? wdyt?

@tolik0 tolik0 merged commit d318618 into main Jan 28, 2025
22 of 23 checks passed
@tolik0 tolik0 deleted the tolik0/fix-state-emitting-concurrent-perpartitioncursor branch January 28, 2025 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants