Skip to content

Commit

Permalink
feat(file-based): changes for not mirroring paths (#205)
Browse files Browse the repository at this point in the history
The Source config receives a new option: Preserve Sub-Directories in File Paths. By default this is enabled (the current behavior).

The new option should only appear when "Copy Raw Files" sync mode is enabled.
When enabled, the sync will:
Validate uniqueness. During at the start of each read operation, the source will check all files that exist and are defined in the stream. This will be performed once per stream. If any files exist with the same file name, the operation will fail.
Sync without intermediate subdirectory information. During sync, the source will send relative filenames which exclude any path info between the extract root and the filename. To the destination, each file will appear to exist at the root of the extract location.
  • Loading branch information
aldogonzalez8 authored Jan 15, 2025
1 parent 76b5306 commit c109297
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 15 deletions.
11 changes: 11 additions & 0 deletions airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ class Config(OneOfOptionConfig):

delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)

preserve_directory_structure: bool = Field(
title="Preserve Sub-Directories in File Paths",
description=(
"If enabled, sends subdirectory folder structure "
"along with source file names to the destination. "
"Otherwise, files will be synced by their names only. "
"This option is ignored when file-based replication is not enabled."
),
default=True,
)


class AbstractFileBasedSpec(BaseModel):
"""
Expand Down
34 changes: 34 additions & 0 deletions airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,40 @@ class ErrorListingFiles(BaseFileBasedSourceError):
pass


class DuplicatedFilesError(BaseFileBasedSourceError):
def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
self._duplicated_files_names = duplicated_files_names
self._stream_name: str = kwargs["stream"]
super().__init__(self._format_duplicate_files_error_message(), **kwargs)

def _format_duplicate_files_error_message(self) -> str:
duplicated_files_messages = []
for duplicated_file in self._duplicated_files_names:
for duplicated_file_name, file_paths in duplicated_file.items():
file_duplicated_message = (
f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
+ "".join(f"\n - {file_paths}")
)
duplicated_files_messages.append(file_duplicated_message)

error_message = (
f"ERROR: Duplicate filenames found for stream {self._stream_name}. "
"Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. "
"Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
+ "\n".join(duplicated_files_messages)
)

return error_message

def __repr__(self) -> str:
"""Return a string representation of the exception."""
class_name = self.__class__.__name__
properties_str = ", ".join(
f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
)
return f"{class_name}({properties_str})"


class CustomFileBasedException(AirbyteTracedException):
"""
A specialized exception for file-based connectors.
Expand Down
33 changes: 28 additions & 5 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream=self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
),
source=self,
logger=self.logger,
Expand Down Expand Up @@ -273,7 +273,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream=self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
),
source=self,
logger=self.logger,
Expand All @@ -285,7 +285,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream = self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
)

streams.append(stream)
Expand All @@ -298,7 +298,7 @@ def _make_default_stream(
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
use_file_transfer: bool = False,
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
return DefaultFileBasedStream(
config=stream_config,
Expand All @@ -310,7 +310,8 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer,
use_file_transfer=self._use_file_transfer(parsed_config),
preserve_directory_structure=self._preserve_directory_structure(parsed_config),
)

def _get_stream_from_catalog(
Expand Down Expand Up @@ -385,3 +386,25 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer

@staticmethod
def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Determines whether to preserve directory structure during file transfer.
When enabled, files maintain their subdirectory paths in the destination.
When disabled, files are flattened to the root of the destination.
Args:
parsed_config: The parsed configuration containing delivery method settings
Returns:
True if directory structure should be preserved (default), False otherwise
"""
if (
FileBasedSource._use_file_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "preserve_directory_structure")
and parsed_config.delivery_method.preserve_directory_structure is not None
):
return parsed_config.delivery_method.preserve_directory_structure
return True
22 changes: 18 additions & 4 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ def use_file_transfer(self) -> bool:
return use_file_transfer
return False

def preserve_directory_structure(self) -> bool:
# fall back to preserve subdirectories if config is not present or incomplete
if (
self.use_file_transfer()
and self.config
and hasattr(self.config.delivery_method, "preserve_directory_structure")
and self.config.delivery_method.preserve_directory_structure is not None
):
return self.config.delivery_method.preserve_directory_structure
return True

@abstractmethod
def get_file(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
Expand All @@ -159,10 +170,13 @@ def get_file(
"""
...

@staticmethod
def _get_file_transfer_paths(file: RemoteFile, local_directory: str) -> List[str]:
# Remove left slashes from source path format to make relative path for writing locally
file_relative_path = file.uri.lstrip("/")
def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]:
preserve_directory_structure = self.preserve_directory_structure()
if preserve_directory_structure:
# Remove left slashes from source path format to make relative path for writing locally
file_relative_path = file.uri.lstrip("/")
else:
file_relative_path = path.basename(file.uri)
local_file_path = path.join(local_directory, file_relative_path)

# Ensure the local directory exists
Expand Down
27 changes: 25 additions & 2 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
import os
import traceback
from datetime import datetime
from io import BytesIO, IOBase
Expand Down Expand Up @@ -42,12 +43,34 @@
unstructured_partition_docx = None
unstructured_partition_pptx = None

AIRBYTE_NLTK_DATA_DIR = "/airbyte/nltk_data"
TMP_NLTK_DATA_DIR = "/tmp/nltk_data"


def get_nltk_temp_folder() -> str:
"""
For non-root connectors /tmp is not currently writable, but we should allow it in the future.
It's safe to use /airbyte for now. Fallback to /tmp for local development.
"""
try:
nltk_data_dir = AIRBYTE_NLTK_DATA_DIR
os.makedirs(nltk_data_dir, exist_ok=True)
except OSError:
nltk_data_dir = TMP_NLTK_DATA_DIR
os.makedirs(nltk_data_dir, exist_ok=True)
return nltk_data_dir


try:
nltk_data_dir = get_nltk_temp_folder()
nltk.data.path.append(nltk_data_dir)
nltk.data.find("tokenizers/punkt.zip")
nltk.data.find("tokenizers/punkt_tab.zip")
nltk.data.find("tokenizers/averaged_perceptron_tagger_eng.zip")
except LookupError:
nltk.download("punkt")
nltk.download("punkt_tab")
nltk.download("punkt", download_dir=nltk_data_dir, quiet=True)
nltk.download("punkt_tab", download_dir=nltk_data_dir, quiet=True)
nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir, quiet=True)


def optional_decode(contents: Union[str, bytes]) -> str:
Expand Down
32 changes: 30 additions & 2 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import asyncio
import itertools
import traceback
from collections import defaultdict
from copy import deepcopy
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union
from os import path
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
DuplicatedFilesError,
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
Expand Down Expand Up @@ -43,17 +46,24 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
"""

FILE_TRANSFER_KW = "use_file_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
FILES_KEY = "files"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
ab_file_name_col = "_ab_source_file_url"
modified = "modified"
source_file_url = "source_file_url"
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
if self.PRESERVE_DIRECTORY_STRUCTURE_KW in kwargs:
self.preserve_directory_structure = kwargs.pop(
self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
)
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -98,15 +108,33 @@ def _filter_schema_invalid_properties(
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

def _duplicated_files_names(
self, slices: List[dict[str, List[RemoteFile]]]
) -> List[dict[str, List[str]]]:
seen_file_names: Dict[str, List[str]] = defaultdict(list)
for file_slice in slices:
for file_found in file_slice[self.FILES_KEY]:
file_name = path.basename(file_found.uri)
seen_file_names[file_name].append(file_found.uri)
return [
{file_name: paths} for file_name, paths in seen_file_names.items() if len(paths) > 1
]

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
# Sort files by last_modified, uri and return them grouped by last_modified
all_files = self.list_files()
files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
slices = [
{"files": list(group[1])}
{self.FILES_KEY: list(group[1])}
for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
]
if slices and not self.preserve_directory_structure:
duplicated_files_names = self._duplicated_files_names(slices)
if duplicated_files_names:
raise DuplicatedFilesError(
stream=self.name, duplicated_files_names=duplicated_files_names
)
return slices

def transform_record(
Expand Down
8 changes: 7 additions & 1 deletion unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,13 @@
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string",
}
},
"preserve_directory_structure": {
"default": True,
"description": "If enabled, sends subdirectory folder structure along with source file names to the destination. Otherwise, files will be synced by their names only. This option is ignored when file-based replication is not enabled.",
"title": "Preserve Sub-Directories in File Paths",
"type": "boolean",
},
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
"required": ["delivery_type"],
Expand Down
Loading

0 comments on commit c109297

Please sign in to comment.