diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index ffa7d743d..626d50fef 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -31,6 +31,17 @@ class Config(OneOfOptionConfig): delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) + preserve_directory_structure: bool = Field( + title="Preserve Sub-Directories in File Paths", + description=( + "If enabled, sends subdirectory folder structure " + "along with source file names to the destination. " + "Otherwise, files will be synced by their names only. " + "This option is ignored when file-based replication is not enabled." + ), + default=True, + ) + class AbstractFileBasedSpec(BaseModel): """ diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index 1c5ce0b16..b0d38947f 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -111,6 +111,40 @@ class ErrorListingFiles(BaseFileBasedSourceError): pass +class DuplicatedFilesError(BaseFileBasedSourceError): + def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any): + self._duplicated_files_names = duplicated_files_names + self._stream_name: str = kwargs["stream"] + super().__init__(self._format_duplicate_files_error_message(), **kwargs) + + def _format_duplicate_files_error_message(self) -> str: + duplicated_files_messages = [] + for duplicated_file in self._duplicated_files_names: + for duplicated_file_name, file_paths in duplicated_file.items(): + file_duplicated_message = ( + f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" + + "".join(f"\n - {file_paths}") + ) + duplicated_files_messages.append(file_duplicated_message) + + error_message = ( + f"ERROR: Duplicate filenames found for stream {self._stream_name}. " + "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. " + "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" + + "\n".join(duplicated_files_messages) + ) + + return error_message + + def __repr__(self) -> str: + """Return a string representation of the exception.""" + class_name = self.__class__.__name__ + properties_str = ", ".join( + f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") + ) + return f"{class_name}({properties_str})" + + class CustomFileBasedException(AirbyteTracedException): """ A specialized exception for file-based connectors. diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 900e6a4da..0eb90ac24 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -242,7 +242,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream=self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ), source=self, logger=self.logger, @@ -273,7 +273,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream=self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ), source=self, logger=self.logger, @@ -285,7 +285,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream = self._make_default_stream( stream_config=stream_config, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), + parsed_config=parsed_config, ) streams.append(stream) @@ -298,7 +298,7 @@ def _make_default_stream( self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor], - use_file_transfer: bool = False, + parsed_config: AbstractFileBasedSpec, ) -> AbstractFileBasedStream: return DefaultFileBasedStream( config=stream_config, @@ -310,7 +310,8 @@ def _make_default_stream( validation_policy=self._validate_and_get_validation_policy(stream_config), errors_collector=self.errors_collector, cursor=cursor, - use_file_transfer=use_file_transfer, + use_file_transfer=self._use_file_transfer(parsed_config), + preserve_directory_structure=self._preserve_directory_structure(parsed_config), ) def _get_stream_from_catalog( @@ -385,3 +386,25 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: and parsed_config.delivery_method.delivery_type == "use_file_transfer" ) return use_file_transfer + + @staticmethod + def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: + """ + Determines whether to preserve directory structure during file transfer. + + When enabled, files maintain their subdirectory paths in the destination. + When disabled, files are flattened to the root of the destination. + + Args: + parsed_config: The parsed configuration containing delivery method settings + + Returns: + True if directory structure should be preserved (default), False otherwise + """ + if ( + FileBasedSource._use_file_transfer(parsed_config) + and hasattr(parsed_config.delivery_method, "preserve_directory_structure") + and parsed_config.delivery_method.preserve_directory_structure is not None + ): + return parsed_config.delivery_method.preserve_directory_structure + return True diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index ab1c428ce..065125621 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -135,6 +135,17 @@ def use_file_transfer(self) -> bool: return use_file_transfer return False + def preserve_directory_structure(self) -> bool: + # fall back to preserve subdirectories if config is not present or incomplete + if ( + self.use_file_transfer() + and self.config + and hasattr(self.config.delivery_method, "preserve_directory_structure") + and self.config.delivery_method.preserve_directory_structure is not None + ): + return self.config.delivery_method.preserve_directory_structure + return True + @abstractmethod def get_file( self, file: RemoteFile, local_directory: str, logger: logging.Logger @@ -159,10 +170,13 @@ def get_file( """ ... - @staticmethod - def _get_file_transfer_paths(file: RemoteFile, local_directory: str) -> List[str]: - # Remove left slashes from source path format to make relative path for writing locally - file_relative_path = file.uri.lstrip("/") + def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]: + preserve_directory_structure = self.preserve_directory_structure() + if preserve_directory_structure: + # Remove left slashes from source path format to make relative path for writing locally + file_relative_path = file.uri.lstrip("/") + else: + file_relative_path = path.basename(file.uri) local_file_path = path.join(local_directory, file_relative_path) # Ensure the local directory exists diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index 9dfde9ca0..f55675e0a 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # import logging +import os import traceback from datetime import datetime from io import BytesIO, IOBase @@ -42,12 +43,34 @@ unstructured_partition_docx = None unstructured_partition_pptx = None +AIRBYTE_NLTK_DATA_DIR = "/airbyte/nltk_data" +TMP_NLTK_DATA_DIR = "/tmp/nltk_data" + + +def get_nltk_temp_folder() -> str: + """ + For non-root connectors /tmp is not currently writable, but we should allow it in the future. + It's safe to use /airbyte for now. Fallback to /tmp for local development. + """ + try: + nltk_data_dir = AIRBYTE_NLTK_DATA_DIR + os.makedirs(nltk_data_dir, exist_ok=True) + except OSError: + nltk_data_dir = TMP_NLTK_DATA_DIR + os.makedirs(nltk_data_dir, exist_ok=True) + return nltk_data_dir + + try: + nltk_data_dir = get_nltk_temp_folder() + nltk.data.path.append(nltk_data_dir) nltk.data.find("tokenizers/punkt.zip") nltk.data.find("tokenizers/punkt_tab.zip") + nltk.data.find("tokenizers/averaged_perceptron_tagger_eng.zip") except LookupError: - nltk.download("punkt") - nltk.download("punkt_tab") + nltk.download("punkt", download_dir=nltk_data_dir, quiet=True) + nltk.download("punkt_tab", download_dir=nltk_data_dir, quiet=True) + nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir, quiet=True) def optional_decode(contents: Union[str, bytes]) -> str: diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index a5cae2e69..604322549 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -5,14 +5,17 @@ import asyncio import itertools import traceback +from collections import defaultdict from copy import deepcopy from functools import cache -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union +from os import path +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.exceptions import ( + DuplicatedFilesError, FileBasedSourceError, InvalidSchemaError, MissingSchemaError, @@ -43,6 +46,8 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): """ FILE_TRANSFER_KW = "use_file_transfer" + PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" + FILES_KEY = "files" DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" ab_last_mod_col = "_ab_source_file_last_modified" ab_file_name_col = "_ab_source_file_url" @@ -50,10 +55,15 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): source_file_url = "source_file_url" airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False + preserve_directory_structure = True def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) + if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs: + self.preserve_directory_structure = kwargs.pop( + self.PRESERVE_DIRECTORY_STRUCTURE_KW, True + ) super().__init__(**kwargs) @property @@ -98,15 +108,33 @@ def _filter_schema_invalid_properties( else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) + def _duplicated_files_names( + self, slices: List[dict[str, List[RemoteFile]]] + ) -> List[dict[str, List[str]]]: + seen_file_names: Dict[str, List[str]] = defaultdict(list) + for file_slice in slices: + for file_found in file_slice[self.FILES_KEY]: + file_name = path.basename(file_found.uri) + seen_file_names[file_name].append(file_found.uri) + return [ + {file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1 + ] + def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: # Sort files by last_modified, uri and return them grouped by last_modified all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ - {"files": list(group[1])} + {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] + if slices and not self.preserve_directory_structure: + duplicated_files_names = self._duplicated_files_names(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + stream=self.name, duplicated_files_names=duplicated_files_names + ) return slices def transform_record( diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 2f4f02cf8..9e919c911 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -523,7 +523,13 @@ "const": "use_file_transfer", "enum": ["use_file_transfer"], "type": "string", - } + }, + "preserve_directory_structure": { + "default": True, + "description": "If enabled, sends subdirectory folder structure along with source file names to the destination. Otherwise, files will be synced by their names only. This option is ignored when file-based replication is not enabled.", + "title": "Preserve Sub-Directories in File Paths", + "type": "boolean", + }, }, "description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.", "required": ["delivery_type"], diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index a7bde7755..1b85ed8dd 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -4,6 +4,7 @@ import traceback import unittest +from copy import deepcopy from datetime import datetime, timezone from typing import Any, Iterable, Iterator, Mapping from unittest import mock @@ -17,7 +18,11 @@ AbstractFileBasedAvailabilityStrategy, ) from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy -from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError +from airbyte_cdk.sources.file_based.exceptions import ( + DuplicatedFilesError, + FileBasedErrorsCollector, + FileBasedSourceError, +) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.file_types import FileTransfer from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser @@ -302,6 +307,20 @@ def setUp(self) -> None: use_file_transfer=True, ) + self._stream_not_mirroring = DefaultFileBasedStream( + config=self._stream_config, + catalog_schema=self._catalog_schema, + stream_reader=self._stream_reader, + availability_strategy=self._availability_strategy, + discovery_policy=self._discovery_policy, + parsers={MockFormat: self._parser}, + validation_policy=self._validation_policy, + cursor=self._cursor, + errors_collector=FileBasedErrorsCollector(), + use_file_transfer=True, + preserve_directory_structure=False, + ) + def test_when_read_records_from_slice_then_return_records(self) -> None: """Verify that we have the new file method and data is empty""" with mock.patch.object(FileTransfer, "get_file", return_value=[self._A_RECORD]): @@ -319,3 +338,132 @@ def test_when_transform_record_then_return_updated_record(self) -> None: transformed_record = self._stream.transform_record_for_file_transfer(self._A_RECORD, file) assert transformed_record[self._stream.modified] == last_updated assert transformed_record[self._stream.source_file_url] == file.uri + + def test_when_compute_slices(self) -> None: + all_files = [ + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/feb/monthly-kickoff-202401.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/mar/monthly-kickoff-202403.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + ] + with ( + mock.patch.object(DefaultFileBasedStream, "list_files", return_value=all_files), + mock.patch.object(self._stream._cursor, "get_files_to_sync", return_value=all_files), + ): + returned_slices = self._stream.compute_slices() + assert returned_slices == [ + {"files": sorted(all_files, key=lambda f: (f.last_modified, f.uri))} + ] + + +class DefaultFileBasedStreamFileTransferTestNotMirroringDirectories(unittest.TestCase): + _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc) + + def setUp(self) -> None: + self._stream_config = Mock() + self._stream_config.format = MockFormat() + self._stream_config.name = "a stream name" + self._catalog_schema = Mock() + self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) + self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy) + self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) + self._parser = Mock(spec=FileTypeParser) + self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy) + self._validation_policy.name = "validation policy name" + self._cursor = Mock(spec=AbstractFileBasedCursor) + + self._stream = DefaultFileBasedStream( + config=self._stream_config, + catalog_schema=self._catalog_schema, + stream_reader=self._stream_reader, + availability_strategy=self._availability_strategy, + discovery_policy=self._discovery_policy, + parsers={MockFormat: self._parser}, + validation_policy=self._validation_policy, + cursor=self._cursor, + errors_collector=FileBasedErrorsCollector(), + use_file_transfer=True, + preserve_directory_structure=False, + ) + + self._all_files = [ + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/feb/monthly-kickoff-202401.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/mar/monthly-kickoff-202403.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ), + ] + + def test_when_compute_slices_with_not_duplicates(self) -> None: + with ( + mock.patch.object(DefaultFileBasedStream, "list_files", return_value=self._all_files), + mock.patch.object( + self._stream._cursor, "get_files_to_sync", return_value=self._all_files + ), + ): + returned_slices = self._stream.compute_slices() + assert returned_slices == [ + {"files": sorted(self._all_files, key=lambda f: (f.last_modified, f.uri))} + ] + + def test_when_compute_slices_with_duplicates(self) -> None: + all_files = deepcopy(self._all_files) + all_files.append( + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/apr/monthly-kickoff-202402.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ) + ) + all_files.append( + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/may/monthly-kickoff-202401.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ) + ) + all_files.append( + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/jun/monthly-kickoff-202403.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ) + ) + all_files.append( + RemoteFile( + uri="mirror_paths_testing/not_duplicates/data/jul/monthly-kickoff-202403.mpeg", + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ) + ) + with ( + mock.patch.object(DefaultFileBasedStream, "list_files", return_value=all_files), + mock.patch.object(self._stream._cursor, "get_files_to_sync", return_value=all_files), + ): + with pytest.raises(DuplicatedFilesError) as exc_info: + self._stream.compute_slices() + assert "Duplicate filenames found for stream" in str(exc_info.value) + assert "2 duplicates found for file name monthly-kickoff-202402.mpeg" in str(exc_info.value) + assert "2 duplicates found for file name monthly-kickoff-202401.mpeg" in str(exc_info.value) + assert "3 duplicates found for file name monthly-kickoff-202403.mpeg" in str(exc_info.value) diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 196e207c8..725ce67b0 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -3,6 +3,7 @@ # import logging +from datetime import datetime from io import IOBase from typing import Any, Dict, Iterable, List, Mapping, Optional, Set @@ -365,3 +366,75 @@ def test_globs_and_prefixes_from_globs( == expected_matches ) assert set(reader.get_prefixes_from_globs(globs)) == expected_path_prefixes + + +@pytest.mark.parametrize( + "config, source_file, expected_file_relative_path, expected_local_file_path, expected_absolute_file_path", + [ + pytest.param( + { + "streams": [], + "delivery_method": { + "delivery_type": "use_file_transfer", + "preserve_directory_structure": True, + }, + }, + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + id="preserve_directories_present_and_true", + ), + pytest.param( + { + "streams": [], + "delivery_method": { + "delivery_type": "use_file_transfer", + "preserve_directory_structure": False, + }, + }, + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/monthly-kickoff-202402.mpeg", + id="preserve_directories_present_and_false", + ), + pytest.param( + {"streams": [], "delivery_method": {"delivery_type": "use_file_transfer"}}, + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + id="preserve_directories_not_present_defaults_true", + ), + pytest.param( + {"streams": []}, + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", + id="file_transfer_flag_not_present_defaults_true", + ), + ], +) +def test_preserve_sub_directories_scenarios( + config: Mapping[str, Any], + source_file: str, + expected_file_relative_path: str, + expected_local_file_path: str, + expected_absolute_file_path: str, +) -> None: + remote_file = RemoteFile( + uri=source_file, + last_modified=datetime(2025, 1, 9, 11, 27, 20), + mime_type=None, + ) + reader = TestStreamReader() + reader.config = TestSpec(**config) + file_relative_path, local_file_path, absolute_file_path = reader._get_file_transfer_paths( + remote_file, "/tmp/transfer-files/" + ) + + assert file_relative_path == expected_file_relative_path + assert local_file_path == expected_local_file_path + assert absolute_file_path == expected_absolute_file_path