Skip to content

Commit

Permalink
file-based: adding initial changes for not mirroring paths
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 committed Jan 7, 2025
1 parent f8cb659 commit c4150ee
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 12 deletions.
18 changes: 18 additions & 0 deletions airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from airbyte_cdk.sources.utils import schema_helpers


class DeliveryOptions(BaseModel):
preserve_subdirectories_directories: bool = Field(
True,
description="Flag indicating we should preserve subdirectories directories",
)


class DeliverRecords(BaseModel):
class Config(OneOfOptionConfig):
title = "Replicate Records"
Expand All @@ -30,6 +37,11 @@ class Config(OneOfOptionConfig):
discriminator = "delivery_type"

delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)
delivery_options: Optional[DeliveryOptions] = Field(
title="Delivery Options",
type="object",
order=2,
)


class AbstractFileBasedSpec(BaseModel):
Expand Down Expand Up @@ -65,6 +77,12 @@ class AbstractFileBasedSpec(BaseModel):
airbyte_hidden=True,
)

delivery_options: Optional[DeliveryOptions] = Field(
title="Delivery Options",
type="object",
order=8,
)

@classmethod
@abstractmethod
def documentation_url(cls) -> AnyUrl:
Expand Down
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class ErrorListingFiles(BaseFileBasedSourceError):
pass


class DuplicatedFilesError(BaseFileBasedSourceError):
pass


class CustomFileBasedException(AirbyteTracedException):
"""
A specialized exception for file-based connectors.
Expand All @@ -123,3 +127,23 @@ class CustomFileBasedException(AirbyteTracedException):

class FileSizeLimitError(CustomFileBasedException):
pass


def format_duplicate_files_error_message(stream_name: str, duplicated_files_names: List):
duplicated_files_messages = []
for duplicated_file in duplicated_files_names:
for duplicated_file_name, file_paths in duplicated_file.items():
file_duplicated_message = (
f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
+ "".join(f"\n - {file_paths}")
)
duplicated_files_messages.append(file_duplicated_message)

error_message = (
f"ERROR: Duplicate filenames found for stream {stream_name}. "
"Duplicate file names are not allowed if the Preserve Subdirectories in File Paths option is disabled. "
"Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
+ "\n".join(duplicated_files_messages)
)

return error_message
22 changes: 17 additions & 5 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream=self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
),
source=self,
logger=self.logger,
Expand Down Expand Up @@ -273,7 +273,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream=self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
),
source=self,
logger=self.logger,
Expand All @@ -285,7 +285,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream = self._make_default_stream(
stream_config=stream_config,
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
parsed_config=parsed_config,
)

streams.append(stream)
Expand All @@ -298,7 +298,7 @@ def _make_default_stream(
self,
stream_config: FileBasedStreamConfig,
cursor: Optional[AbstractFileBasedCursor],
use_file_transfer: bool = False,
parsed_config: AbstractFileBasedSpec,
) -> AbstractFileBasedStream:
return DefaultFileBasedStream(
config=stream_config,
Expand All @@ -310,7 +310,10 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer,
use_file_transfer=self._use_file_transfer(parsed_config),
preserve_subdirectories_directories=self._preserve_subdirectories_directories(
parsed_config
),
)

def _get_stream_from_catalog(
Expand Down Expand Up @@ -385,3 +388,12 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer

@staticmethod
def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec):
# fall back to preserve subdirectories if config is not present or incomplete
if hasattr(parsed_config, "delivery_options") and hasattr(
parsed_config.delivery_options, "preserve_subdirectories_directories"
):
return parsed_config.delivery_options.preserve_subdirectories_directories
return True
21 changes: 17 additions & 4 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ def use_file_transfer(self) -> bool:
return use_file_transfer
return False

def preserve_subdirectories_directories(self) -> bool:
# fall back to preserve subdirectories if config is not present or incomplete
if (
self.config
and hasattr(self.config, "delivery_options")
and hasattr(self.config.delivery_options, "preserve_subdirectories_directories")
):
return self.config.delivery_options.preserve_subdirectories_directories
return True

@abstractmethod
def get_file(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
Expand All @@ -159,10 +169,13 @@ def get_file(
"""
...

@staticmethod
def _get_file_transfer_paths(file: RemoteFile, local_directory: str) -> List[str]:
# Remove left slashes from source path format to make relative path for writing locally
file_relative_path = file.uri.lstrip("/")
def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]:
preserve_subdirectories_directories = self.preserve_subdirectories_directories()
if preserve_subdirectories_directories:
# Remove left slashes from source path format to make relative path for writing locally
file_relative_path = file.uri.lstrip("/")
else:
file_relative_path = path.basename(file.uri)
local_file_path = path.join(local_directory, file_relative_path)

# Ensure the local directory exists
Expand Down
40 changes: 38 additions & 2 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@
import itertools
import traceback
from copy import deepcopy
from collections import defaultdict
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union
from os import path
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union, Tuple

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
FileBasedSourceError,
DuplicatedFilesError,
InvalidSchemaError,
MissingSchemaError,
RecordParseError,
SchemaInferenceError,
StopSyncPerValidationPolicy,
format_duplicate_files_error_message,
)
from airbyte_cdk.sources.file_based.file_types import FileTransfer
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand All @@ -43,17 +47,23 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
"""

FILE_TRANSFER_KW = "use_file_transfer"
PRESERVE_SUBDIRECTORIES_KW = "preserve_subdirectories_directories"
FILES_KEY = "files"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
ab_file_name_col = "_ab_source_file_url"
modified = "modified"
source_file_url = "source_file_url"
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_subdirectories_directories = True

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False)
self.preserve_subdirectories_directories = kwargs.pop(
self.PRESERVE_SUBDIRECTORIES_KW, True
)
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -98,15 +108,41 @@ def _filter_schema_invalid_properties(
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

def _duplicated_files_names(self, slices: List) -> list[dict]:
seen_file_names = set()
duplicates_file_names = set()
file_paths = defaultdict(list)
for file_slice in slices:
for file_found in file_slice[self.FILES_KEY]:
file_name = path.basename(file_found.uri)
if file_name not in seen_file_names:
seen_file_names.add(file_name)
else:
duplicates_file_names.add(file_name)
file_paths[file_name].append(file_found.uri)
return [
{duplicated_file: file_paths[duplicated_file]}
for duplicated_file in duplicates_file_names
]

def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
# Sort files by last_modified, uri and return them grouped by last_modified
all_files = self.list_files()
files_to_read = self._cursor.get_files_to_sync(all_files, self.logger)
sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri))
slices = [
{"files": list(group[1])}
{self.FILES_KEY: list(group[1])}
for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)
]
if slices and not self.preserve_subdirectories_directories:
duplicated_files_names = self._duplicated_files_names(slices)
if duplicated_files_names:
raise DuplicatedFilesError(
format_duplicate_files_error_message(
stream_name=self.name, duplicated_files_names=duplicated_files_names
),
stream=self.name,
)
return slices

def transform_record(
Expand Down
40 changes: 39 additions & 1 deletion unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,19 +517,57 @@
"title": "Copy Raw Files",
"type": "object",
"properties": {
"delivery_options": {
"allOf": [
{
"properties": {
"preserve_subdirectories_directories": {
"default": True,
"description": "Flag indicating we should preserve subdirectories directories",
"title": "Preserve Subdirectories Directories",
"type": "boolean",
}
},
"title": "DeliveryOptions",
"type": "object",
}
],
"order": 2,
"title": "Delivery Options",
"type": "object",
},
"delivery_type": {
"title": "Delivery Type",
"default": "use_file_transfer",
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string",
}
},
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
"required": ["delivery_type"],
},
],
},
"delivery_options": {
"allOf": [
{
"properties": {
"preserve_subdirectories_directories": {
"default": True,
"description": "Flag indicating we should preserve subdirectories directories",
"title": "Preserve Subdirectories Directories",
"type": "boolean",
}
},
"title": "DeliveryOptions",
"type": "object",
}
],
"order": 8,
"title": "Delivery Options",
"type": "object",
},
},
"required": ["streams"],
},
Expand Down

0 comments on commit c4150ee

Please sign in to comment.