-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(file-based): changes for not mirroring paths #205
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThe pull request introduces a new boolean field, Changes
Possibly related PRs
Hey there! 👋 I noticed you've added some really cool configuration options for file delivery. Quick question: have you considered how this might impact existing connectors that don't explicitly set 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (3)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
airbyte_cdk/sources/file_based/exceptions.py (1)
132-149
: Add type annotations to improve type safety, wdyt?The message formatting looks great! Consider adding type annotations:
-def format_duplicate_files_error_message(stream_name: str, duplicated_files_names: List): +def format_duplicate_files_error_message(stream_name: str, duplicated_files_names: List[Dict[str, List[str]]]) -> str:🧰 Tools
🪛 GitHub Actions: Linters
[error] 132-132: Function is missing a return type annotation
[error] 132-132: Missing type parameters for generic type "List"
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
138-146
: Consider improving null safety in the config check, wdyt?The logic looks good, but we could make the null checks more explicit:
def preserve_subdirectories_directories(self) -> bool: # fall back to preserve subdirectories if config is not present or incomplete if ( self.config and hasattr(self.config, "delivery_options") - and hasattr(self.config.delivery_options, "preserve_subdirectories_directories") + and self.config.delivery_options is not None + and hasattr(self.config.delivery_options, "preserve_subdirectories_directories") ): return self.config.delivery_options.preserve_subdirectories_directories return True🧰 Tools
🪛 GitHub Actions: Linters
[error] 145-145: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py (1)
17-22
: Consider enhancing the field description for clarity, wdyt?The implementation looks good! Consider making the description more descriptive:
preserve_subdirectories_directories: bool = Field( True, - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, preserves the subdirectory structure of files during transfer. When disabled, all files are stored in the root directory.", )airbyte_cdk/sources/file_based/file_based_source.py (1)
392-399
: Add return type and improve null safety, wdyt?The logic looks good, but we could improve type safety:
@staticmethod - def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec): + def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec) -> bool: # fall back to preserve subdirectories if config is not present or incomplete if hasattr(parsed_config, "delivery_options") and hasattr( - parsed_config.delivery_options, "preserve_subdirectories_directories" + parsed_config.delivery_options, "preserve_subdirectories_directories" + ) and parsed_config.delivery_options is not None: ): return parsed_config.delivery_options.preserve_subdirectories_directories return True🧰 Tools
🪛 GitHub Actions: Linters
[error] 393-393: Function is missing a return type annotation
[error] 398-398: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
50-51
: Consider a more concise property name.The property name
preserve_subdirectories_directories
seems redundant with "directories". What do you think about shortening it to justpreserve_subdirectories
? wdyt?- preserve_subdirectories_directories = True + preserve_subdirectories = TrueAlso applies to: 59-59
64-66
: Update initialization if property name changes.If you agree with shortening the property name, we should update the initialization too. wdyt?
- self.preserve_subdirectories_directories = kwargs.pop( - self.PRESERVE_SUBDIRECTORIES_KW, True - ) + self.preserve_subdirectories = kwargs.pop(self.PRESERVE_SUBDIRECTORIES_KW, True)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(3 hunks)airbyte_cdk/sources/file_based/exceptions.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/exceptions.py
[error] 132-132: Function is missing a return type annotation
[error] 132-132: Missing type parameters for generic type "List"
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
[error] 5-44: Import block is un-sorted or un-formatted. This can be fixed automatically using the --fix
option with ruff.
airbyte_cdk/sources/file_based/file_based_stream_reader.py
[error] 145-145: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/file_based_source.py
[error] 393-393: Function is missing a return type annotation
[error] 398-398: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (5)
airbyte_cdk/sources/file_based/exceptions.py (1)
114-116
: LGTM! Clean and consistent error class implementation.The error class follows the established pattern in the codebase.
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
111-126
: LGTM! Well-structured implementation.The
_duplicated_files_names
method is well-implemented:
- Uses appropriate data structures (set for O(1) lookups, defaultdict for grouping)
- Clear variable names
- Efficient single-pass algorithm
137-145
: LGTM! Good error handling.The duplicate file check in
compute_slices
is well-integrated:
- Only checks when preserve_subdirectories_directories is False
- Provides clear error messages with file details
- Proper error propagation
unit_tests/sources/file_based/scenarios/csv_scenarios.py (2)
520-538
: LGTM! Well-structured delivery options.The delivery options under "Copy Raw Files" are well-defined:
- Clear property structure
- Descriptive title and description
- Appropriate default value
552-570
: Consider consolidating delivery options.I notice we have delivery options defined both under "Copy Raw Files" and at the root level. Is this intentional duplication or should we consolidate them? wdyt?
✅ Verification successful
The delivery options duplication is intentional and correct.
The delivery_options appear in two different contexts:
- At the root level for global configuration (order=8)
- Within the "Copy Raw Files" delivery method for specific file transfer settings (order=2)
This allows users to configure delivery options both globally and specifically for file transfers when needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if both delivery options are used in the codebase rg -A 5 "delivery_options" | grep -v "test"Length of output: 2562
Script:
#!/bin/bash # Search for class definitions containing delivery_options ast-grep --pattern 'class $_ { $$$ delivery_options: $_ $$$ }'Length of output: 70
Script:
#!/bin/bash # Search for class definitions and their context rg -B 2 -A 10 "class.*DeliveryOptions"Length of output: 1522
Script:
#!/bin/bash # Search for the complete schema structure rg -B 10 -A 20 "class.*FileBasedSpec"Length of output: 17481
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Outdated
Show resolved
Hide resolved
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (3)
59-66
: Consider simplifying the property name.The property name
preserve_subdirectories_directories
seems to have a redundant "directories" suffix. Wouldpreserve_subdirectories
be clearer and more concise? The initialization logic looks good though! Wdyt?- preserve_subdirectories_directories = True + preserve_subdirectories = True def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) - self.preserve_subdirectories_directories = kwargs.pop( + self.preserve_subdirectories = kwargs.pop( self.PRESERVE_SUBDIRECTORIES_KW, True )
111-126
: Add type annotations and consider renaming the method.The duplicate detection logic looks solid! A few suggestions to make it even better:
- The method name could be more Pythonic. Maybe
_get_duplicated_filenames
?- Let's add proper type annotations to fix the pipeline failures.
- def _duplicated_files_names(self, slices: List) -> list[dict]: + def _get_duplicated_filenames( + self, + slices: List[Dict[str, List[RemoteFile]]] + ) -> List[Dict[str, List[str]]]:🧰 Tools
🪛 GitHub Actions: Linters
[error] 111-111: Missing type parameters for generic type "List"
[error] 111-111: Missing type parameters for generic type "dict"
133-145
: Consider extracting the duplicate check into a guard clause.The logic looks correct, but we could make it more readable by extracting the duplicate check into a guard clause at the beginning of the method. Wdyt?
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: # Sort files by last_modified, uri and return them grouped by last_modified all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] - if slices and not self.preserve_subdirectories_directories: - duplicated_files_names = self._duplicated_files_names(slices) - if duplicated_files_names: - raise DuplicatedFilesError( - format_duplicate_files_error_message( - stream_name=self.name, duplicated_files_names=duplicated_files_names - ), - stream=self.name, - ) + if slices and not self.preserve_subdirectories_directories: + self._check_for_duplicates(slices) return slices + def _check_for_duplicates(self, slices: List[Dict[str, List[RemoteFile]]]) -> None: + duplicated_files_names = self._get_duplicated_filenames(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + format_duplicate_files_error_message( + stream_name=self.name, + duplicated_files_names=duplicated_files_names + ), + stream=self.name, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
[error] 111-111: Missing type parameters for generic type "List"
[error] 111-111: Missing type parameters for generic type "dict"
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
8-25
: LGTM! The new imports and constants are well-organized.The new constants
PRESERVE_SUBDIRECTORIES_KW
andFILES_KEY
follow the existing naming convention and are descriptive. The imports are logically grouped, though we could consider organizing them alphabetically within their groups. Wdyt?Also applies to: 50-51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/file_based/file_based_source.py (1)
392-401
: Well-structured configuration handling.The method follows good practices:
- Safely handles missing or incomplete configuration
- Preserves backward compatibility by defaulting to
True
- Clear and descriptive attribute checks
What do you think about adding a docstring to explain the default behavior? wdyt?
@staticmethod def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec) -> bool: + """ + Determine whether to preserve subdirectories based on the configuration. + + Returns True if: + - The configuration is missing + - The delivery_options is not set + - The preserve_subdirectories_directories flag is not set + """ # fall back to preserve subdirectories if config is not present or incompleteairbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
111-128
: Well-implemented duplicate detection.The implementation is efficient using sets and defaultdict. A few suggestions to make it even better:
- Would you consider adding type hints for the return value's inner types? wdyt?
- How about adding a docstring to explain the method's purpose and return format?
def _duplicated_files_names( self, slices: List[dict[str, List[RemoteFile]]] -) -> List[dict[str, List[str]]]: +) -> List[dict[str, list[str]]]: + """ + Identify duplicate file names across all slices. + + Args: + slices: List of slices containing RemoteFile objects + + Returns: + List of dictionaries mapping duplicate file names to their full paths + """
139-147
: Clean integration of duplicate detection.The duplicate check is well-integrated into the existing slice computation logic. However, consider extracting the duplicate check into a separate method for better readability. wdyt?
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: + def _check_for_duplicates(slices: List[dict[str, List[RemoteFile]]]) -> None: + if not self.preserve_subdirectories_directories: + duplicated_files_names = self._duplicated_files_names(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + format_duplicate_files_error_message( + stream_name=self.name, duplicated_files_names=duplicated_files_names + ), + stream=self.name, + ) + all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] - if slices and not self.preserve_subdirectories_directories: - duplicated_files_names = self._duplicated_files_names(slices) - if duplicated_files_names: - raise DuplicatedFilesError( - format_duplicate_files_error_message( - stream_name=self.name, duplicated_files_names=duplicated_files_names - ), - stream=self.name, - ) + if slices: + _check_for_duplicates(slices) return slices
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/exceptions.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/file_based/file_based_stream_reader.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (3)
airbyte_cdk/sources/file_based/exceptions.py (2)
114-115
: LGTM! Clean exception class implementation.The
DuplicatedFilesError
follows the established pattern of extendingBaseFileBasedSourceError
.
132-151
: Great error message formatting!The error message is clear, actionable and includes:
- The specific stream where duplicates were found
- The number of duplicates for each file
- Clear guidance on how to resolve the issue
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
50-51
: Good use of constants!Using constants instead of magic strings improves maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
Line range hint
172-182
: The logic looks good! Would you consider some readability improvements?The implementation correctly uses the new configuration option, but we could make it even clearer with:
- More descriptive variable names (e.g.,
relative_file_path
instead offile_relative_path
)- A more detailed comment explaining the path transformation logic
What do you think about this diff?
- # Remove left slashes from source path format to make relative path for writing locally - file_relative_path = file.uri.lstrip("/") + # Transform the source file path into a relative path for local writing: + # - If preserving directories: maintain the path structure but remove leading slashes + # - If not preserving: use only the filename + relative_file_path = file.uri.lstrip("/") if preserve_subdirectories_directories else path.basename(file.uri)airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py (2)
34-38
: The field definition looks good! Should we enhance the description?The implementation is solid, but the description could be more informative about what this setting actually does. What do you think about making it clearer for users?
preserve_subdirectories_directories: bool = Field( title="Preserve Subdirectories Directories", - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, maintains the original directory structure of files when copying them to the destination. When disabled, all files are copied to a flat structure in the destination directory.", default=True, )
74-78
: Should we use the same enhanced description here for consistency?preserve_subdirectories_directories: bool = Field( title="Preserve Subdirectories Directories", - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, maintains the original directory structure of files when copying them to the destination. When disabled, all files are copied to a flat structure in the destination directory.", default=True, )unit_tests/sources/file_based/scenarios/csv_scenarios.py (1)
526-544
: The test configuration looks good! Should we add more test coverage?The implementation correctly includes the new field in the test scenarios. However, we might want to add test cases that specifically verify the behavior when:
preserve_subdirectories_directories
is set toFalse
- Files are in nested subdirectories
- Edge cases like empty directories or files with identical names in different subdirectories
Would you like me to help draft these additional test scenarios?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/file_based/file_based_source.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
138-146
: LGTM! The implementation looks solid.The method handles all edge cases gracefully and maintains backward compatibility with the default True value.
… (when raw file is selected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)
393-393
: What do you think about simplifying the method name? 🤔The current name
_preserve_subdirectories_directories
seems a bit redundant with "directories" appearing twice. Maybe we could simplify it to_preserve_subdirectories
? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(1 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
- unit_tests/sources/file_based/scenarios/csv_scenarios.py
- airbyte_cdk/sources/file_based/file_based_stream_reader.py
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Publish SDM to DockerHub
- GitHub Check: Publish CDK version to PyPI
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/file_based/file_based_source.py (3)
245-245
: Nice refactoring of the_make_default_stream
method! 👍The change to pass
parsed_config
instead ofuse_file_transfer
is a good improvement. It consolidates configuration handling and makes the code more maintainable for future extensions.Also applies to: 276-276, 288-288, 301-301, 313-316
392-401
: Great defensive programming! 💪The implementation safely handles all edge cases and provides a sensible default. I particularly like how it:
- Validates file transfer usage first
- Checks attribute existence
- Has null-safety checks
- Provides a safe default
392-401
: Shall we verify the configuration usage? 🔍Let's check if this new configuration option is properly documented and consistently used across the codebase.
Summary by CodeRabbit
New Features
Bug Fixes
Improvements