Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(file-based): sync metadarecords #260

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class Config(OneOfOptionConfig):

delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)

sync_metadata: bool = Field(
title="Make stream sync files metadata",
description="If enabled, streams will sync files metadata instead of files data.",
default=False,
airbyte_hidden=True,
)


class DeliverRawFiles(BaseModel):
class Config(OneOfOptionConfig):
Expand Down
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ def _make_default_stream(
cursor=cursor,
use_file_transfer=self._use_file_transfer(parsed_config),
preserve_directory_structure=self._preserve_directory_structure(parsed_config),
sync_metadata=self._sync_metadata(parsed_config),
)

def _get_stream_from_catalog(
Expand Down Expand Up @@ -387,6 +388,14 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
)
return use_file_transfer

@staticmethod
def _use_records_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
use_records_transfer = (
hasattr(parsed_config.delivery_method, "delivery_type")
and parsed_config.delivery_method.delivery_type == "use_records_transfer"
)
return use_records_transfer

@staticmethod
def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
"""
Expand All @@ -408,3 +417,13 @@ def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
):
return parsed_config.delivery_method.preserve_directory_structure
return True

@staticmethod
def _sync_metadata(parsed_config: AbstractFileBasedSpec) -> bool:
if (
FileBasedSource._use_records_transfer(parsed_config)
and hasattr(parsed_config.delivery_method, "sync_metadata")
and parsed_config.delivery_method.sync_metadata is not None
):
return parsed_config.delivery_method.sync_metadata
return False
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ def use_file_transfer(self) -> bool:
return use_file_transfer
return False

def use_records_transfer(self) -> bool:
if self.config:
use_records_transfer = (
hasattr(self.config.delivery_method, "delivery_type")
and self.config.delivery_method.delivery_type == "use_records_transfer"
)
return use_records_transfer
return False

def preserve_directory_structure(self) -> bool:
# fall back to preserve subdirectories if config is not present or incomplete
if (
Expand All @@ -146,6 +155,16 @@ def preserve_directory_structure(self) -> bool:
return self.config.delivery_method.preserve_directory_structure
return True

def sync_metadata(self) -> bool:
if (
self.config
and self.use_records_transfer()
and hasattr(self.config.delivery_method, "sync_metadata")
and self.config.delivery_method.sync_metadata is not None
):
return self.config.delivery_method.sync_metadata
return False

@abstractmethod
def get_file(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
Expand Down Expand Up @@ -183,3 +202,27 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li
makedirs(path.dirname(local_file_path), exist_ok=True)
absolute_file_path = path.abspath(local_file_path)
return [file_relative_path, local_file_path, absolute_file_path]

@abstractmethod
def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
"""
This is required for connectors that will support syncing
metadata from files.
"""
...
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved

def get_metadata_schema(self) -> Dict[str, Any]:
""" "
Base schema to emit metadata records for a file,
override in stream reader implementation if the requirements
are different.
"""
return {
"type": "object",
"properties": {
"id": {"type": "string"},
"file_path": {"type": "string"},
"allowed_identity_remote_ids": {"type": "array", "items": "string"},
"is_public": {"type": "boolean"},
},
}
14 changes: 14 additions & 0 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):

FILE_TRANSFER_KW = "use_file_transfer"
PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
SYNC_METADATA_KW = "sync_metadata"
FILES_KEY = "files"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
Expand All @@ -56,6 +57,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True
sync_metadata = False

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
Expand All @@ -64,6 +66,8 @@ def __init__(self, **kwargs: Any):
self.preserve_directory_structure = kwargs.pop(
self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
)
if self.SYNC_METADATA_KW in kwargs:
self.sync_metadata = kwargs.pop(self.SYNC_METADATA_KW, False)
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -105,6 +109,8 @@ def _filter_schema_invalid_properties(
self.ab_file_name_col: {"type": "string"},
},
}
elif self.sync_metadata:
return self.stream_reader.get_metadata_schema()
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

Expand Down Expand Up @@ -187,6 +193,12 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
yield stream_data_to_airbyte_message(
self.name, record, is_file_transfer_message=True
)
elif self.sync_metadata:
metadata_record = self.stream_reader.get_file_metadata(file, logger=self.logger)
yield stream_data_to_airbyte_message(
self.name, metadata_record, is_file_transfer_message=False
)

aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
else:
for record in parser.parse_records(
self.config, file, self.stream_reader, self.logger, schema
Expand Down Expand Up @@ -284,6 +296,8 @@ def get_json_schema(self) -> JsonSchema:
def _get_raw_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
elif self.sync_metadata:
self.stream_reader.get_metadata_schema()
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
elif self.config.input_schema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
Expand Down
9 changes: 8 additions & 1 deletion unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,14 @@
"const": "use_records_transfer",
"enum": ["use_records_transfer"],
"type": "string",
}
},
"sync_metadata": {
"airbyte_hidden": True,
"default": False,
"description": "If enabled, streams will sync files metadata instead of files data.",
"title": "Make stream sync files metadata",
"type": "boolean",
},
},
"description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.",
"required": ["delivery_type"],
Expand Down
Loading