From 79c5f4068536df3e07870125d386efb57c9182e6 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 23 Jan 2025 16:28:57 -0600 Subject: [PATCH 01/18] file-based: initial implementation to sync metadarecords --- .../config/abstract_file_based_spec.py | 7 ++++ .../sources/file_based/file_based_source.py | 19 +++++++++ .../file_based/file_based_stream_reader.py | 42 +++++++++++++++++++ .../stream/default_file_based_stream.py | 14 +++++++ .../file_based/scenarios/csv_scenarios.py | 9 +++- 5 files changed, 90 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index 626d50fef..c08a46536 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -22,6 +22,13 @@ class Config(OneOfOptionConfig): delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True) + sync_metadata: bool = Field( + title="Make stream sync files metadata", + description="If enabled, streams will sync files metadata instead of files data.", + default=False, + airbyte_hidden=True, + ) + class DeliverRawFiles(BaseModel): class Config(OneOfOptionConfig): diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 0eb90ac24..8baae8ee8 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -312,6 +312,7 @@ def _make_default_stream( cursor=cursor, use_file_transfer=self._use_file_transfer(parsed_config), preserve_directory_structure=self._preserve_directory_structure(parsed_config), + sync_metadata=self._sync_metadata(parsed_config), ) def _get_stream_from_catalog( @@ -387,6 +388,14 @@ def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: ) return use_file_transfer + @staticmethod + def _use_records_transfer(parsed_config: AbstractFileBasedSpec) -> bool: + use_records_transfer = ( + hasattr(parsed_config.delivery_method, "delivery_type") + and parsed_config.delivery_method.delivery_type == "use_records_transfer" + ) + return use_records_transfer + @staticmethod def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: """ @@ -408,3 +417,13 @@ def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: ): return parsed_config.delivery_method.preserve_directory_structure return True + + @staticmethod + def _sync_metadata(parsed_config: AbstractFileBasedSpec) -> bool: + if ( + FileBasedSource._use_records_transfer(parsed_config) + and hasattr(parsed_config.delivery_method, "sync_metadata") + and parsed_config.delivery_method.sync_metadata is not None + ): + return parsed_config.delivery_method.sync_metadata + return False diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 065125621..7fd5fff5c 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -135,6 +135,15 @@ def use_file_transfer(self) -> bool: return use_file_transfer return False + def use_records_transfer(self) -> bool: + if self.config: + use_records_transfer = ( + hasattr(self.config.delivery_method, "delivery_type") + and self.config.delivery_method.delivery_type == "use_records_transfer" + ) + return use_records_transfer + return False + def preserve_directory_structure(self) -> bool: # fall back to preserve subdirectories if config is not present or incomplete if ( @@ -146,6 +155,16 @@ def preserve_directory_structure(self) -> bool: return self.config.delivery_method.preserve_directory_structure return True + def sync_metadata(self) -> bool: + if ( + self.config + and self.use_records_transfer() + and hasattr(self.config.delivery_method, "sync_metadata") + and self.config.delivery_method.sync_metadata is not None + ): + return self.config.delivery_method.sync_metadata + return False + @abstractmethod def get_file( self, file: RemoteFile, local_directory: str, logger: logging.Logger @@ -183,3 +202,26 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li makedirs(path.dirname(local_file_path), exist_ok=True) absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] + + def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: + """ + This is required for connectors that will support syncing + metadata from files. + """ + ... + + def get_metadata_schema(self) -> Dict[str, Any]: + """ " + Base schema to emit metadata records for a file, + override in stream reader implementation if the requirements + are different. + """ + return { + "type": "object", + "properties": { + "id": {"type": "string"}, + "file_path": {"type": "string"}, + "allowed_identity_remote_ids": {"type": "array", "items": "string"}, + "is_public": {"type": "boolean"}, + }, + } diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 604322549..062362dc3 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -47,6 +47,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): FILE_TRANSFER_KW = "use_file_transfer" PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" + SYNC_METADATA_KW = "sync_metadata" FILES_KEY = "files" DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" ab_last_mod_col = "_ab_source_file_last_modified" @@ -56,6 +57,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False preserve_directory_structure = True + sync_metadata = False def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: @@ -64,6 +66,8 @@ def __init__(self, **kwargs: Any): self.preserve_directory_structure = kwargs.pop( self.PRESERVE_DIRECTORY_STRUCTURE_KW, True ) + if self.SYNC_METADATA_KW in kwargs: + self.sync_metadata = kwargs.pop(self.SYNC_METADATA_KW, False) super().__init__(**kwargs) @property @@ -105,6 +109,8 @@ def _filter_schema_invalid_properties( self.ab_file_name_col: {"type": "string"}, }, } + elif self.sync_metadata: + return self.stream_reader.get_metadata_schema() else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) @@ -187,6 +193,12 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte yield stream_data_to_airbyte_message( self.name, record, is_file_transfer_message=True ) + elif self.sync_metadata: + metadata_record = self.stream_reader.get_file_metadata(file, logger=self.logger) + yield stream_data_to_airbyte_message( + self.name, metadata_record, is_file_transfer_message=False + ) + else: for record in parser.parse_records( self.config, file, self.stream_reader, self.logger, schema @@ -284,6 +296,8 @@ def get_json_schema(self) -> JsonSchema: def _get_raw_json_schema(self) -> JsonSchema: if self.use_file_transfer: return file_transfer_schema + elif self.sync_metadata: + self.stream_reader.get_metadata_schema() elif self.config.input_schema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index 9e919c911..c609b4096 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -508,7 +508,14 @@ "const": "use_records_transfer", "enum": ["use_records_transfer"], "type": "string", - } + }, + "sync_metadata": { + "airbyte_hidden": True, + "default": False, + "description": "If enabled, streams will sync files metadata instead of files data.", + "title": "Make stream sync files metadata", + "type": "boolean", + }, }, "description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.", "required": ["delivery_type"], From 4638f891c3f5935a4a105f40728d71200177b13f Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 23 Jan 2025 16:37:46 -0600 Subject: [PATCH 02/18] file-based: fix lint --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 7fd5fff5c..db22f27de 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -203,6 +203,7 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] + @abstractmethod def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: """ This is required for connectors that will support syncing From 266c0cdbe53220e23a6a41f6c023b354f935aa84 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 23 Jan 2025 16:45:27 -0600 Subject: [PATCH 03/18] file-based: fix errors --- .../stream/default_file_based_stream.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 062362dc3..a0dd67e10 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -194,11 +194,21 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte self.name, record, is_file_transfer_message=True ) elif self.sync_metadata: - metadata_record = self.stream_reader.get_file_metadata(file, logger=self.logger) - yield stream_data_to_airbyte_message( - self.name, metadata_record, is_file_transfer_message=False - ) - + try: + metadata_record = self.stream_reader.get_file_metadata(file, logger=self.logger) + yield stream_data_to_airbyte_message( + self.name, metadata_record, is_file_transfer_message=False + ) + except Exception as e: + self.logger.error(f"Failed to retrieve metadata for file {file.uri}: {str(e)}") + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage( + level=Level.ERROR, + message = f"Error retrieving metadata: stream={self.name} file={file.uri}", + stack_trace = traceback.format_exc(), + ) + ) else: for record in parser.parse_records( self.config, file, self.stream_reader, self.logger, schema @@ -297,7 +307,7 @@ def _get_raw_json_schema(self) -> JsonSchema: if self.use_file_transfer: return file_transfer_schema elif self.sync_metadata: - self.stream_reader.get_metadata_schema() + return self.stream_reader.get_metadata_schema() elif self.config.input_schema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: @@ -428,3 +438,4 @@ async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: format=str(self.config.format), stream=self.name, ) from exc + From 7bfb8c3a200aa30f738b7421dbf85cc6c163f7c1 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 23 Jan 2025 22:54:45 +0000 Subject: [PATCH 04/18] Auto-fix lint and format issues --- .../stream/default_file_based_stream.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index a0dd67e10..6e8664c64 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -195,19 +195,23 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte ) elif self.sync_metadata: try: - metadata_record = self.stream_reader.get_file_metadata(file, logger=self.logger) + metadata_record = self.stream_reader.get_file_metadata( + file, logger=self.logger + ) yield stream_data_to_airbyte_message( self.name, metadata_record, is_file_transfer_message=False ) except Exception as e: - self.logger.error(f"Failed to retrieve metadata for file {file.uri}: {str(e)}") + self.logger.error( + f"Failed to retrieve metadata for file {file.uri}: {str(e)}" + ) yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage( - level=Level.ERROR, - message = f"Error retrieving metadata: stream={self.name} file={file.uri}", - stack_trace = traceback.format_exc(), - ) + level=Level.ERROR, + message=f"Error retrieving metadata: stream={self.name} file={file.uri}", + stack_trace=traceback.format_exc(), + ), ) else: for record in parser.parse_records( @@ -438,4 +442,3 @@ async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: format=str(self.config.format), stream=self.name, ) from exc - From 88af54364ae7c2b6ae0e1e4d801099a2432ea529 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 23 Jan 2025 16:57:44 -0600 Subject: [PATCH 05/18] file-based: remove abstract decorator --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index db22f27de..a9003a07c 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -203,13 +203,12 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] - @abstractmethod def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: """ This is required for connectors that will support syncing metadata from files. """ - ... + return {} def get_metadata_schema(self) -> Dict[str, Any]: """ " From edd6f69bd6871f8f9696804a8d277368a02f0d8c Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 24 Jan 2025 12:00:48 -0600 Subject: [PATCH 06/18] file-based: fix check --- airbyte_cdk/sources/file_based/file_based_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 8baae8ee8..381797e90 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -163,7 +163,7 @@ def check_connection( parsed_config = self._get_parsed_config(config) availability_method = ( stream.availability_strategy.check_availability - if self._use_file_transfer(parsed_config) + if self._use_file_transfer(parsed_config) or self._sync_metadata(parsed_config) else stream.availability_strategy.check_availability_and_parsability ) ( From 35e0e684077aba7951e3c63dbfc417d2709efdac Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 16:48:39 -0600 Subject: [PATCH 07/18] file-based: add identities stream and rename acl toggle --- .../config/abstract_file_based_spec.py | 14 ++- .../config/identities_based_stream_config.py | 7 ++ .../sources/file_based/config/permissions.py | 33 +++++++ .../sources/file_based/file_based_source.py | 51 ++++++++-- .../file_based/file_based_stream_reader.py | 33 +++---- .../sources/file_based/schema_helpers.py | 25 +++++ .../sources/file_based/stream/__init__.py | 3 +- .../stream/default_file_based_stream.py | 21 ++-- .../file_based/stream/identities_stream.py | 99 +++++++++++++++++++ .../file_based/scenarios/csv_scenarios.py | 33 ++++++- 10 files changed, 274 insertions(+), 45 deletions(-) create mode 100644 airbyte_cdk/sources/file_based/config/identities_based_stream_config.py create mode 100644 airbyte_cdk/sources/file_based/config/permissions.py create mode 100644 airbyte_cdk/sources/file_based/stream/identities_stream.py diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index c08a46536..5bda45421 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -11,6 +11,9 @@ from airbyte_cdk import OneOfOptionConfig from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig +from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( + IdentitiesStreamConfig, +) from airbyte_cdk.sources.utils import schema_helpers @@ -22,12 +25,17 @@ class Config(OneOfOptionConfig): delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True) - sync_metadata: bool = Field( - title="Make stream sync files metadata", - description="If enabled, streams will sync files metadata instead of files data.", + sync_acl_permissions: bool = Field( + title="Include ACL Permissions", + description="Joins Document allowlists to each stream.", default=False, airbyte_hidden=True, ) + identities: Optional[IdentitiesStreamConfig] = Field( + title="Identities configuration", + description="Configuration for identities", + airbyte_hidden=True, + ) class DeliverRawFiles(BaseModel): diff --git a/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py b/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py new file mode 100644 index 000000000..6df27f492 --- /dev/null +++ b/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py @@ -0,0 +1,7 @@ +from pydantic.v1 import BaseModel, Field +from typing import Literal + + +class IdentitiesStreamConfig(BaseModel): + name: Literal["identities"] = Field("identities", const=True, airbyte_hidden=True) + domain: str = Field(title="Domain", description="The domain of the identities.") diff --git a/airbyte_cdk/sources/file_based/config/permissions.py b/airbyte_cdk/sources/file_based/config/permissions.py new file mode 100644 index 000000000..534ac56e4 --- /dev/null +++ b/airbyte_cdk/sources/file_based/config/permissions.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import uuid +from datetime import datetime +from enum import Enum +from pydantic.v1 import BaseModel + + +class RemoteFileIdentityType(Enum): + USER = "user" + GROUP = "group" + + +class RemoteFileIdentity(BaseModel): + id: uuid.UUID + remote_id: str + parent_id: str | None = None + name: str | None = None + description: str | None = None + email_address: str | None = None + member_email_addresses: list[str] | None = None + type: RemoteFileIdentityType + modified_at: datetime + + +class RemoteFilePermissions(BaseModel): + id: str + file_path: str + allowed_identity_remote_ids: list[str] | None = None + denied_identity_remote_ids: list[str] | None = None + publicly_accessible: bool = False diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 381797e90..3a889811a 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -33,6 +33,9 @@ FileBasedStreamConfig, ValidationPolicy, ) +from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( + IdentitiesStreamConfig, +) from airbyte_cdk.sources.file_based.discovery_policy import ( AbstractDiscoveryPolicy, DefaultDiscoveryPolicy, @@ -49,7 +52,11 @@ DEFAULT_SCHEMA_VALIDATION_POLICIES, AbstractSchemaValidationPolicy, ) -from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream +from airbyte_cdk.sources.file_based.stream import ( + AbstractFileBasedStream, + DefaultFileBasedStream, + IdentitiesStream, +) from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade from airbyte_cdk.sources.file_based.stream.concurrent.cursor import ( AbstractConcurrentFileBasedCursor, @@ -157,13 +164,17 @@ def check_connection( errors = [] tracebacks = [] for stream in streams: + if isinstance(stream, IdentitiesStream): + # Probably need to check identities endpoint/api access but will skip for now. + continue if not isinstance(stream, AbstractFileBasedStream): raise ValueError(f"Stream {stream} is not a file-based stream.") try: parsed_config = self._get_parsed_config(config) availability_method = ( stream.availability_strategy.check_availability - if self._use_file_transfer(parsed_config) or self._sync_metadata(parsed_config) + if self._use_file_transfer(parsed_config) + or self._sync_acl_permissions(parsed_config) else stream.availability_strategy.check_availability_and_parsability ) ( @@ -289,6 +300,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ) streams.append(stream) + + if self._add_identities_stream(parsed_config): + identities_stream = self._make_identities_stream( + stream_config=parsed_config.delivery_method.identities + ) + streams.append(identities_stream) return streams except ValidationError as exc: @@ -312,7 +329,19 @@ def _make_default_stream( cursor=cursor, use_file_transfer=self._use_file_transfer(parsed_config), preserve_directory_structure=self._preserve_directory_structure(parsed_config), - sync_metadata=self._sync_metadata(parsed_config), + sync_acl_permissions=self._sync_acl_permissions(parsed_config), + ) + + def _make_identities_stream( + self, + stream_config: IdentitiesStreamConfig, + ) -> Stream: + return IdentitiesStream( + config=stream_config, + catalog_schema=self.stream_schemas.get(stream_config.name), + stream_reader=self.stream_reader, + discovery_policy=self.discovery_policy, + errors_collector=self.errors_collector, ) def _get_stream_from_catalog( @@ -419,11 +448,19 @@ def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: return True @staticmethod - def _sync_metadata(parsed_config: AbstractFileBasedSpec) -> bool: + def _sync_acl_permissions(parsed_config: AbstractFileBasedSpec) -> bool: if ( FileBasedSource._use_records_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "sync_metadata") - and parsed_config.delivery_method.sync_metadata is not None + and hasattr(parsed_config.delivery_method, "sync_acl_permissions") + and parsed_config.delivery_method.sync_acl_permissions is not None ): - return parsed_config.delivery_method.sync_metadata + return parsed_config.delivery_method.sync_acl_permissions return False + + @staticmethod + def _add_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool: + return ( + FileBasedSource._sync_acl_permissions(parsed_config) + and parsed_config.delivery_method.identities is not None + and parsed_config.delivery_method.identities.domain + ) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index a9003a07c..3f804f24a 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -155,14 +155,14 @@ def preserve_directory_structure(self) -> bool: return self.config.delivery_method.preserve_directory_structure return True - def sync_metadata(self) -> bool: + def sync_acl_permissions(self) -> bool: if ( self.config and self.use_records_transfer() - and hasattr(self.config.delivery_method, "sync_metadata") - and self.config.delivery_method.sync_metadata is not None + and hasattr(self.config.delivery_method, "sync_acl_permissions") + and self.config.delivery_method.sync_acl_permissions is not None ): - return self.config.delivery_method.sync_metadata + return self.config.delivery_method.sync_acl_permissions return False @abstractmethod @@ -203,25 +203,16 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] - def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: + def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: """ This is required for connectors that will support syncing - metadata from files. + ACL Permissions from files. """ return {} - def get_metadata_schema(self) -> Dict[str, Any]: - """ " - Base schema to emit metadata records for a file, - override in stream reader implementation if the requirements - are different. - """ - return { - "type": "object", - "properties": { - "id": {"type": "string"}, - "file_path": {"type": "string"}, - "allowed_identity_remote_ids": {"type": "array", "items": "string"}, - "is_public": {"type": "boolean"}, - }, - } + def load_identity_groups(self) -> Iterable[Dict[str, Any]]: + """ + This is required for connectors that will support syncing + identities. + """ + yield {} diff --git a/airbyte_cdk/sources/file_based/schema_helpers.py b/airbyte_cdk/sources/file_based/schema_helpers.py index 1b653db67..fb12efe5e 100644 --- a/airbyte_cdk/sources/file_based/schema_helpers.py +++ b/airbyte_cdk/sources/file_based/schema_helpers.py @@ -23,6 +23,31 @@ "properties": {"data": {"type": "object"}, "file": {"type": "object"}}, } +remote_file_permissions_schema = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "file_path": {"type": "string"}, + "allowed_identity_remote_ids": {"type": "array", "items": "string"}, + "publicly_accessible": {"type": "boolean"}, + }, +} + +remote_file_identity_schema = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "remote_id": {"type": "string"}, + "parent_id": {"type": ["null", "string"]}, + "name": {"type": ["null", "string"]}, + "description": {"type": ["null", "string"]}, + "email_address": {"type": ["null", "string"]}, + "member_email_addresses": {"type": ["null", "array"]}, + "type": {"type": "string"}, + "modified_at": {"type": "string"}, + }, +} + @total_ordering class ComparableType(Enum): diff --git a/airbyte_cdk/sources/file_based/stream/__init__.py b/airbyte_cdk/sources/file_based/stream/__init__.py index 4b5c4bc2e..78c2b1062 100644 --- a/airbyte_cdk/sources/file_based/stream/__init__.py +++ b/airbyte_cdk/sources/file_based/stream/__init__.py @@ -1,4 +1,5 @@ from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream +from airbyte_cdk.sources.file_based.stream.identities_stream import IdentitiesStream -__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream"] +__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream", "IdentitiesStream"] diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 6e8664c64..743b51e10 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -30,6 +30,7 @@ file_transfer_schema, merge_schemas, schemaless_schema, + remote_file_permissions_schema, ) from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor @@ -47,7 +48,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): FILE_TRANSFER_KW = "use_file_transfer" PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" - SYNC_METADATA_KW = "sync_metadata" + SYNC_ACL_PERMISSIONS_KW = "sync_acl_permissions" FILES_KEY = "files" DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" ab_last_mod_col = "_ab_source_file_last_modified" @@ -57,7 +58,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False preserve_directory_structure = True - sync_metadata = False + sync_acl_permissions = False def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: @@ -66,8 +67,8 @@ def __init__(self, **kwargs: Any): self.preserve_directory_structure = kwargs.pop( self.PRESERVE_DIRECTORY_STRUCTURE_KW, True ) - if self.SYNC_METADATA_KW in kwargs: - self.sync_metadata = kwargs.pop(self.SYNC_METADATA_KW, False) + if self.SYNC_ACL_PERMISSIONS_KW in kwargs: + self.sync_acl_permissions = kwargs.pop(self.SYNC_ACL_PERMISSIONS_KW, False) super().__init__(**kwargs) @property @@ -109,8 +110,8 @@ def _filter_schema_invalid_properties( self.ab_file_name_col: {"type": "string"}, }, } - elif self.sync_metadata: - return self.stream_reader.get_metadata_schema() + elif self.sync_acl_permissions: + return remote_file_permissions_schema else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) @@ -193,9 +194,9 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte yield stream_data_to_airbyte_message( self.name, record, is_file_transfer_message=True ) - elif self.sync_metadata: + elif self.sync_acl_permissions: try: - metadata_record = self.stream_reader.get_file_metadata( + metadata_record = self.stream_reader.get_file_acl_permissions( file, logger=self.logger ) yield stream_data_to_airbyte_message( @@ -310,8 +311,8 @@ def get_json_schema(self) -> JsonSchema: def _get_raw_json_schema(self) -> JsonSchema: if self.use_file_transfer: return file_transfer_schema - elif self.sync_metadata: - return self.stream_reader.get_metadata_schema() + elif self.sync_acl_permissions: + return remote_file_permissions_schema elif self.config.input_schema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py new file mode 100644 index 000000000..b70f20519 --- /dev/null +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import traceback +from functools import cache +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional + +from airbyte_protocol_dataclasses.models import SyncMode + +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level +from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType +from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( + IdentitiesStreamConfig, +) +from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.types import StreamSlice +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import JsonSchema +from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message +from airbyte_cdk.utils.traced_exception import AirbyteTracedException +from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy +from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError +from airbyte_cdk.sources.file_based.schema_helpers import remote_file_identity_schema +from airbyte_cdk.sources.streams.checkpoint import Cursor + + +class IdentitiesStream(Stream): + """ + The identities stream. A full refresh stream to sync identities from a certain domain. + The stream reader manage the logic to get such data, which is implemented on connector side. + """ + + is_resumable = False + + def __init__( + self, + config: IdentitiesStreamConfig, + catalog_schema: Optional[Mapping[str, Any]], + stream_reader: AbstractFileBasedStreamReader, + discovery_policy: AbstractDiscoveryPolicy, + errors_collector: FileBasedErrorsCollector, + ): + super().__init__() + self.config = config + self.catalog_schema = catalog_schema + self.stream_reader = stream_reader + self._discovery_policy = discovery_policy + self.errors_collector = errors_collector + self._cursor = {} + + @property + def state(self) -> MutableMapping[str, Any]: + return self._cursor + + @state.setter + def state(self, value: MutableMapping[str, Any]) -> None: + """State setter, accept state serialized by state getter.""" + self._cursor = value + + @property + def primary_key(self) -> PrimaryKeyType: + return None + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[StreamSlice] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: + try: + identity_groups = self.stream_reader.load_identity_groups() + for record in identity_groups: + yield stream_data_to_airbyte_message(self.name, record) + except AirbyteTracedException as exc: + # Re-raise the exception to stop the whole sync immediately as this is a fatal error + raise exc + except Exception: + yield AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage( + level=Level.ERROR, + message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name}", + stack_trace=traceback.format_exc(), + ), + ) + + @cache + def get_json_schema(self) -> JsonSchema: + return remote_file_identity_schema + + @property + def name(self) -> str: + return self.config.name + + def get_cursor(self) -> Optional[Cursor]: + return None diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index c609b4096..e618a6296 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -509,13 +509,40 @@ "enum": ["use_records_transfer"], "type": "string", }, - "sync_metadata": { + "sync_acl_permissions": { "airbyte_hidden": True, "default": False, - "description": "If enabled, streams will sync files metadata instead of files data.", - "title": "Make stream sync files metadata", + "description": "Joins Document allowlists to each stream.", + "title": "Include ACL Permissions", "type": "boolean", }, + "identities": { + "airbyte_hidden": True, + "allOf": [ + { + "properties": { + "domain": { + "description": "The domain of the identities.", + "title": "Domain", + "type": "string", + }, + "name": { + "airbyte_hidden": True, + "const": "identities", + "default": "identities", + "enum": ["identities"], + "title": "Name", + "type": "string", + }, + }, + "required": ["domain"], + "title": "IdentitiesStreamConfig", + "type": "object", + } + ], + "title": "Identities configuration", + "description": "Configuration for identities", + }, }, "description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.", "required": ["delivery_type"], From 0ae4267bdc054231ae7aeee19b37138f08a8c62a Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Sun, 26 Jan 2025 22:51:37 +0000 Subject: [PATCH 08/18] Auto-fix lint and format issues --- .../file_based/config/identities_based_stream_config.py | 3 ++- airbyte_cdk/sources/file_based/config/permissions.py | 1 + .../file_based/stream/default_file_based_stream.py | 2 +- .../sources/file_based/stream/identities_stream.py | 8 ++++---- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py b/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py index 6df27f492..bf5955fa6 100644 --- a/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py @@ -1,6 +1,7 @@ -from pydantic.v1 import BaseModel, Field from typing import Literal +from pydantic.v1 import BaseModel, Field + class IdentitiesStreamConfig(BaseModel): name: Literal["identities"] = Field("identities", const=True, airbyte_hidden=True) diff --git a/airbyte_cdk/sources/file_based/config/permissions.py b/airbyte_cdk/sources/file_based/config/permissions.py index 534ac56e4..d0aef044a 100644 --- a/airbyte_cdk/sources/file_based/config/permissions.py +++ b/airbyte_cdk/sources/file_based/config/permissions.py @@ -5,6 +5,7 @@ import uuid from datetime import datetime from enum import Enum + from pydantic.v1 import BaseModel diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 743b51e10..f2205dc07 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -29,8 +29,8 @@ SchemaType, file_transfer_schema, merge_schemas, - schemaless_schema, remote_file_permissions_schema, + schemaless_schema, ) from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index b70f20519..0a2cf22d1 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -14,16 +14,16 @@ from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( IdentitiesStreamConfig, ) +from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy +from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.schema_helpers import remote_file_identity_schema from airbyte_cdk.sources.file_based.types import StreamSlice from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.core import JsonSchema from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy -from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError -from airbyte_cdk.sources.file_based.schema_helpers import remote_file_identity_schema -from airbyte_cdk.sources.streams.checkpoint import Cursor class IdentitiesStream(Stream): From 43e3ea38deb6749506dff571149269731ef0e503 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 17:15:21 -0600 Subject: [PATCH 09/18] file-based: fix annoying mypy issues --- .../sources/file_based/file_based_source.py | 18 +++++++++++++----- .../file_based/stream/identities_stream.py | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 3a889811a..68907b942 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -301,9 +301,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams.append(stream) - if self._add_identities_stream(parsed_config): + identities_stream_config = self._get_identities_stream_config(parsed_config) + if identities_stream_config: identities_stream = self._make_identities_stream( - stream_config=parsed_config.delivery_method.identities + stream_config=identities_stream_config ) streams.append(identities_stream) return streams @@ -458,9 +459,16 @@ def _sync_acl_permissions(parsed_config: AbstractFileBasedSpec) -> bool: return False @staticmethod - def _add_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool: - return ( + def _get_identities_stream_config( + parsed_config: AbstractFileBasedSpec, + ) -> Optional[IdentitiesStreamConfig]: + identities_stream_config = None + if ( FileBasedSource._sync_acl_permissions(parsed_config) + and hasattr(parsed_config.delivery_method, "identities") and parsed_config.delivery_method.identities is not None + and isinstance(parsed_config.delivery_method.identities, IdentitiesStreamConfig) and parsed_config.delivery_method.identities.domain - ) + ): + identities_stream_config = parsed_config.delivery_method.identities + return identities_stream_config diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index 0a2cf22d1..0b7ac8c5c 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -48,7 +48,7 @@ def __init__( self.stream_reader = stream_reader self._discovery_policy = discovery_policy self.errors_collector = errors_collector - self._cursor = {} + self._cursor: MutableMapping[str, Any] = {} @property def state(self) -> MutableMapping[str, Any]: From 4aee2c9ab772b52b6e975717a14b08318ef141bb Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 17:34:45 -0600 Subject: [PATCH 10/18] file-based: minor fix to schema --- airbyte_cdk/sources/file_based/schema_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/schema_helpers.py b/airbyte_cdk/sources/file_based/schema_helpers.py index fb12efe5e..459e6de73 100644 --- a/airbyte_cdk/sources/file_based/schema_helpers.py +++ b/airbyte_cdk/sources/file_based/schema_helpers.py @@ -28,7 +28,7 @@ "properties": { "id": {"type": "string"}, "file_path": {"type": "string"}, - "allowed_identity_remote_ids": {"type": "array", "items": "string"}, + "allowed_identity_remote_ids": {"type": "array", "items": {"type": "string"}}, "publicly_accessible": {"type": "boolean"}, }, } From 7b5c245ce35a5777cafa4dc3c2a94035971cb0a3 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 19:18:38 -0600 Subject: [PATCH 11/18] file-based: add logger to load_identity_groups method --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 2 +- airbyte_cdk/sources/file_based/stream/identities_stream.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 3f804f24a..69b110e67 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -210,7 +210,7 @@ def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> """ return {} - def load_identity_groups(self) -> Iterable[Dict[str, Any]]: + def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: """ This is required for connectors that will support syncing identities. diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index 0b7ac8c5c..d00973d44 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -71,7 +71,7 @@ def read_records( stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: try: - identity_groups = self.stream_reader.load_identity_groups() + identity_groups = self.stream_reader.load_identity_groups(logger=self.logger) for record in identity_groups: yield stream_data_to_airbyte_message(self.name, record) except AirbyteTracedException as exc: From f022b4c085b81764e58ad63fbb579b0abfdc9aa5 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 20:42:55 -0600 Subject: [PATCH 12/18] file-based: simplify sync permissions schema --- .../config/abstract_file_based_spec.py | 6 ++-- .../sources/file_based/file_based_source.py | 31 +++---------------- .../file_based/stream/identities_stream.py | 6 ++-- 3 files changed, 10 insertions(+), 33 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index 5bda45421..f4892f20b 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -31,10 +31,8 @@ class Config(OneOfOptionConfig): default=False, airbyte_hidden=True, ) - identities: Optional[IdentitiesStreamConfig] = Field( - title="Identities configuration", - description="Configuration for identities", - airbyte_hidden=True, + domain: Optional[str] = Field( + title="Domain", description="The domain of the identities.", airbyte_hidden=True ) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 68907b942..3d3a8b4d6 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -33,9 +33,6 @@ FileBasedStreamConfig, ValidationPolicy, ) -from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( - IdentitiesStreamConfig, -) from airbyte_cdk.sources.file_based.discovery_policy import ( AbstractDiscoveryPolicy, DefaultDiscoveryPolicy, @@ -64,6 +61,7 @@ FileBasedFinalStateCursor, ) from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor +from airbyte_cdk.sources.file_based.stream.identities_stream import IDENTITIES_STREAM_NAME from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.cursor import CursorField @@ -73,6 +71,7 @@ DEFAULT_CONCURRENCY = 100 MAX_CONCURRENCY = 100 INITIAL_N_PARTITIONS = MAX_CONCURRENCY // 2 +IDENTITIES_STREAM = "identities" class FileBasedSource(ConcurrentSourceAdapter, ABC): @@ -301,11 +300,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams.append(stream) - identities_stream_config = self._get_identities_stream_config(parsed_config) - if identities_stream_config: - identities_stream = self._make_identities_stream( - stream_config=identities_stream_config - ) + if self._sync_acl_permissions(parsed_config): + identities_stream = self._make_identities_stream() streams.append(identities_stream) return streams @@ -335,11 +331,9 @@ def _make_default_stream( def _make_identities_stream( self, - stream_config: IdentitiesStreamConfig, ) -> Stream: return IdentitiesStream( - config=stream_config, - catalog_schema=self.stream_schemas.get(stream_config.name), + catalog_schema=self.stream_schemas.get(IDENTITIES_STREAM_NAME), stream_reader=self.stream_reader, discovery_policy=self.discovery_policy, errors_collector=self.errors_collector, @@ -457,18 +451,3 @@ def _sync_acl_permissions(parsed_config: AbstractFileBasedSpec) -> bool: ): return parsed_config.delivery_method.sync_acl_permissions return False - - @staticmethod - def _get_identities_stream_config( - parsed_config: AbstractFileBasedSpec, - ) -> Optional[IdentitiesStreamConfig]: - identities_stream_config = None - if ( - FileBasedSource._sync_acl_permissions(parsed_config) - and hasattr(parsed_config.delivery_method, "identities") - and parsed_config.delivery_method.identities is not None - and isinstance(parsed_config.delivery_method.identities, IdentitiesStreamConfig) - and parsed_config.delivery_method.identities.domain - ): - identities_stream_config = parsed_config.delivery_method.identities - return identities_stream_config diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index d00973d44..0d23a6dbf 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -25,6 +25,8 @@ from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException +IDENTITIES_STREAM_NAME = "identities" + class IdentitiesStream(Stream): """ @@ -36,14 +38,12 @@ class IdentitiesStream(Stream): def __init__( self, - config: IdentitiesStreamConfig, catalog_schema: Optional[Mapping[str, Any]], stream_reader: AbstractFileBasedStreamReader, discovery_policy: AbstractDiscoveryPolicy, errors_collector: FileBasedErrorsCollector, ): super().__init__() - self.config = config self.catalog_schema = catalog_schema self.stream_reader = stream_reader self._discovery_policy = discovery_policy @@ -93,7 +93,7 @@ def get_json_schema(self) -> JsonSchema: @property def name(self) -> str: - return self.config.name + return IDENTITIES_STREAM_NAME def get_cursor(self) -> Optional[Cursor]: return None From 597e458b7a1b7c67d1eddf0f149bc794bd9cd536 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 26 Jan 2025 20:53:38 -0600 Subject: [PATCH 13/18] file-based: remove unused config and fix unit tests --- .../config/abstract_file_based_spec.py | 3 -- .../config/identities_based_stream_config.py | 8 ----- .../file_based/stream/identities_stream.py | 3 -- .../file_based/scenarios/csv_scenarios.py | 29 +++---------------- 4 files changed, 4 insertions(+), 39 deletions(-) delete mode 100644 airbyte_cdk/sources/file_based/config/identities_based_stream_config.py diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index f4892f20b..b28a41b46 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -11,9 +11,6 @@ from airbyte_cdk import OneOfOptionConfig from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig -from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( - IdentitiesStreamConfig, -) from airbyte_cdk.sources.utils import schema_helpers diff --git a/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py b/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py deleted file mode 100644 index bf5955fa6..000000000 --- a/airbyte_cdk/sources/file_based/config/identities_based_stream_config.py +++ /dev/null @@ -1,8 +0,0 @@ -from typing import Literal - -from pydantic.v1 import BaseModel, Field - - -class IdentitiesStreamConfig(BaseModel): - name: Literal["identities"] = Field("identities", const=True, airbyte_hidden=True) - domain: str = Field(title="Domain", description="The domain of the identities.") diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index 0d23a6dbf..cbf461e91 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -11,9 +11,6 @@ from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType -from airbyte_cdk.sources.file_based.config.identities_based_stream_config import ( - IdentitiesStreamConfig, -) from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector, FileBasedSourceError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index e618a6296..c1f2898f9 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -516,32 +516,11 @@ "title": "Include ACL Permissions", "type": "boolean", }, - "identities": { + "domain": { "airbyte_hidden": True, - "allOf": [ - { - "properties": { - "domain": { - "description": "The domain of the identities.", - "title": "Domain", - "type": "string", - }, - "name": { - "airbyte_hidden": True, - "const": "identities", - "default": "identities", - "enum": ["identities"], - "title": "Name", - "type": "string", - }, - }, - "required": ["domain"], - "title": "IdentitiesStreamConfig", - "type": "object", - } - ], - "title": "Identities configuration", - "description": "Configuration for identities", + "description": "The domain of the identities.", + "title": "Domain", + "type": "string", }, }, "description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.", From 2430eaea3c12b2787f4010535a9325df0bb0d064 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Wed, 29 Jan 2025 10:25:56 -0600 Subject: [PATCH 14/18] file-based: format record to have file last modified data --- .../file_based/stream/default_file_based_stream.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index f2205dc07..ddbc97abb 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -196,21 +196,24 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte ) elif self.sync_acl_permissions: try: - metadata_record = self.stream_reader.get_file_acl_permissions( + permissions_record = self.stream_reader.get_file_acl_permissions( file, logger=self.logger ) + permissions_record = self.transform_record( + permissions_record, file, file_datetime_string + ) yield stream_data_to_airbyte_message( - self.name, metadata_record, is_file_transfer_message=False + self.name, permissions_record, is_file_transfer_message=False ) except Exception as e: self.logger.error( - f"Failed to retrieve metadata for file {file.uri}: {str(e)}" + f"Failed to retrieve permissions for file {file.uri}: {str(e)}" ) yield AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage( level=Level.ERROR, - message=f"Error retrieving metadata: stream={self.name} file={file.uri}", + message=f"Error retrieving files permissions: stream={self.name} file={file.uri}", stack_trace=traceback.format_exc(), ), ) From 24a93badbcc7fe2179f7f572587971feeef4aa71 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Mon, 3 Feb 2025 15:34:06 -0600 Subject: [PATCH 15/18] file-based: create three toggle instead of option below transfer records --- .../config/abstract_file_based_spec.py | 29 +++++---- .../sources/file_based/config/permissions.py | 34 ---------- .../config/validate_config_transfer_modes.py | 51 +++++++++++++++ .../sources/file_based/file_based_source.py | 65 ++++--------------- .../file_based/file_based_stream_reader.py | 40 ++++-------- .../stream/default_file_based_stream.py | 14 ++-- .../file_based/scenarios/csv_scenarios.py | 37 +++++++---- 7 files changed, 121 insertions(+), 149 deletions(-) delete mode 100644 airbyte_cdk/sources/file_based/config/permissions.py create mode 100644 airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py diff --git a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index b28a41b46..19d0a075b 100644 --- a/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -14,6 +14,23 @@ from airbyte_cdk.sources.utils import schema_helpers +class DeliverPermissions(BaseModel): + class Config(OneOfOptionConfig): + title = "Replicate Permissions ACL" + description = "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source." + discriminator = "delivery_type" + + delivery_type: Literal["use_permissions_transfer"] = Field( + "use_permissions_transfer", const=True + ) + + include_identities_stream: bool = Field( + title="Include Identity Stream", + description="This data can be used in downstream systems to recreate permission restrictions mirroring the original source", + default=True, + ) + + class DeliverRecords(BaseModel): class Config(OneOfOptionConfig): title = "Replicate Records" @@ -22,16 +39,6 @@ class Config(OneOfOptionConfig): delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True) - sync_acl_permissions: bool = Field( - title="Include ACL Permissions", - description="Joins Document allowlists to each stream.", - default=False, - airbyte_hidden=True, - ) - domain: Optional[str] = Field( - title="Domain", description="The domain of the identities.", airbyte_hidden=True - ) - class DeliverRawFiles(BaseModel): class Config(OneOfOptionConfig): @@ -75,7 +82,7 @@ class AbstractFileBasedSpec(BaseModel): order=10, ) - delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field( + delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( title="Delivery Method", discriminator="delivery_type", type="object", diff --git a/airbyte_cdk/sources/file_based/config/permissions.py b/airbyte_cdk/sources/file_based/config/permissions.py deleted file mode 100644 index d0aef044a..000000000 --- a/airbyte_cdk/sources/file_based/config/permissions.py +++ /dev/null @@ -1,34 +0,0 @@ -# -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -# - -import uuid -from datetime import datetime -from enum import Enum - -from pydantic.v1 import BaseModel - - -class RemoteFileIdentityType(Enum): - USER = "user" - GROUP = "group" - - -class RemoteFileIdentity(BaseModel): - id: uuid.UUID - remote_id: str - parent_id: str | None = None - name: str | None = None - description: str | None = None - email_address: str | None = None - member_email_addresses: list[str] | None = None - type: RemoteFileIdentityType - modified_at: datetime - - -class RemoteFilePermissions(BaseModel): - id: str - file_path: str - allowed_identity_remote_ids: list[str] | None = None - denied_identity_remote_ids: list[str] | None = None - publicly_accessible: bool = False diff --git a/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py b/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py new file mode 100644 index 000000000..f14c36899 --- /dev/null +++ b/airbyte_cdk/sources/file_based/config/validate_config_transfer_modes.py @@ -0,0 +1,51 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec + + +def use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: + return ( + hasattr(parsed_config.delivery_method, "delivery_type") + and parsed_config.delivery_method.delivery_type == "use_file_transfer" + ) + + +def preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: + """ + Determines whether to preserve directory structure during file transfer. + + When enabled, files maintain their subdirectory paths in the destination. + When disabled, files are flattened to the root of the destination. + + Args: + parsed_config: The parsed configuration containing delivery method settings + + Returns: + True if directory structure should be preserved (default), False otherwise + """ + if ( + use_file_transfer(parsed_config) + and hasattr(parsed_config.delivery_method, "preserve_directory_structure") + and parsed_config.delivery_method.preserve_directory_structure is not None + ): + return parsed_config.delivery_method.preserve_directory_structure + return True + + +def use_permissions_transfer(parsed_config: AbstractFileBasedSpec) -> bool: + return ( + hasattr(parsed_config.delivery_method, "delivery_type") + and parsed_config.delivery_method.delivery_type == "use_permissions_transfer" + ) + + +def include_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool: + if ( + use_permissions_transfer(parsed_config) + and hasattr(parsed_config.delivery_method, "include_identities_stream") + and parsed_config.delivery_method.include_identities_stream is not None + ): + return parsed_config.delivery_method.include_identities_stream + return False diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 3d3a8b4d6..e5cc7445f 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -33,6 +33,12 @@ FileBasedStreamConfig, ValidationPolicy, ) +from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import ( + use_file_transfer, + preserve_directory_structure, + use_permissions_transfer, + include_identities_stream, +) from airbyte_cdk.sources.file_based.discovery_policy import ( AbstractDiscoveryPolicy, DefaultDiscoveryPolicy, @@ -172,8 +178,7 @@ def check_connection( parsed_config = self._get_parsed_config(config) availability_method = ( stream.availability_strategy.check_availability - if self._use_file_transfer(parsed_config) - or self._sync_acl_permissions(parsed_config) + if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config) else stream.availability_strategy.check_availability_and_parsability ) ( @@ -300,7 +305,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams.append(stream) - if self._sync_acl_permissions(parsed_config): + if include_identities_stream(parsed_config): identities_stream = self._make_identities_stream() streams.append(identities_stream) return streams @@ -324,9 +329,9 @@ def _make_default_stream( validation_policy=self._validate_and_get_validation_policy(stream_config), errors_collector=self.errors_collector, cursor=cursor, - use_file_transfer=self._use_file_transfer(parsed_config), - preserve_directory_structure=self._preserve_directory_structure(parsed_config), - sync_acl_permissions=self._sync_acl_permissions(parsed_config), + use_file_transfer=use_file_transfer(parsed_config), + preserve_directory_structure=preserve_directory_structure(parsed_config), + use_permissions_transfer=use_permissions_transfer(parsed_config), ) def _make_identities_stream( @@ -403,51 +408,3 @@ def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None: "`input_schema` and `schemaless` options cannot both be set", model=FileBasedStreamConfig, ) - - @staticmethod - def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: - use_file_transfer = ( - hasattr(parsed_config.delivery_method, "delivery_type") - and parsed_config.delivery_method.delivery_type == "use_file_transfer" - ) - return use_file_transfer - - @staticmethod - def _use_records_transfer(parsed_config: AbstractFileBasedSpec) -> bool: - use_records_transfer = ( - hasattr(parsed_config.delivery_method, "delivery_type") - and parsed_config.delivery_method.delivery_type == "use_records_transfer" - ) - return use_records_transfer - - @staticmethod - def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: - """ - Determines whether to preserve directory structure during file transfer. - - When enabled, files maintain their subdirectory paths in the destination. - When disabled, files are flattened to the root of the destination. - - Args: - parsed_config: The parsed configuration containing delivery method settings - - Returns: - True if directory structure should be preserved (default), False otherwise - """ - if ( - FileBasedSource._use_file_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "preserve_directory_structure") - and parsed_config.delivery_method.preserve_directory_structure is not None - ): - return parsed_config.delivery_method.preserve_directory_structure - return True - - @staticmethod - def _sync_acl_permissions(parsed_config: AbstractFileBasedSpec) -> bool: - if ( - FileBasedSource._use_records_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "sync_acl_permissions") - and parsed_config.delivery_method.sync_acl_permissions is not None - ): - return parsed_config.delivery_method.sync_acl_permissions - return False diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 69b110e67..12faac751 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -13,6 +13,11 @@ from wcmatch.glob import GLOBSTAR, globmatch from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec +from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import ( + use_file_transfer, + preserve_directory_structure, + include_identities_stream, +) from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -128,41 +133,18 @@ def get_prefixes_from_globs(globs: List[str]) -> Set[str]: def use_file_transfer(self) -> bool: if self.config: - use_file_transfer = ( - hasattr(self.config.delivery_method, "delivery_type") - and self.config.delivery_method.delivery_type == "use_file_transfer" - ) - return use_file_transfer - return False - - def use_records_transfer(self) -> bool: - if self.config: - use_records_transfer = ( - hasattr(self.config.delivery_method, "delivery_type") - and self.config.delivery_method.delivery_type == "use_records_transfer" - ) - return use_records_transfer + return use_file_transfer(self.config) return False def preserve_directory_structure(self) -> bool: # fall back to preserve subdirectories if config is not present or incomplete - if ( - self.use_file_transfer() - and self.config - and hasattr(self.config.delivery_method, "preserve_directory_structure") - and self.config.delivery_method.preserve_directory_structure is not None - ): - return self.config.delivery_method.preserve_directory_structure + if self.config: + return preserve_directory_structure(self.config) return True - def sync_acl_permissions(self) -> bool: - if ( - self.config - and self.use_records_transfer() - and hasattr(self.config.delivery_method, "sync_acl_permissions") - and self.config.delivery_method.sync_acl_permissions is not None - ): - return self.config.delivery_method.sync_acl_permissions + def include_identities_stream(self) -> bool: + if self.config: + return include_identities_stream(self.config) return False @abstractmethod diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index ddbc97abb..29bb24e19 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -48,7 +48,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): FILE_TRANSFER_KW = "use_file_transfer" PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure" - SYNC_ACL_PERMISSIONS_KW = "sync_acl_permissions" + PERMISSIONS_TRANSFER_KW = "use_permissions_transfer" FILES_KEY = "files" DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" ab_last_mod_col = "_ab_source_file_last_modified" @@ -58,7 +58,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False preserve_directory_structure = True - sync_acl_permissions = False + use_permissions_transfer = False def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: @@ -67,8 +67,8 @@ def __init__(self, **kwargs: Any): self.preserve_directory_structure = kwargs.pop( self.PRESERVE_DIRECTORY_STRUCTURE_KW, True ) - if self.SYNC_ACL_PERMISSIONS_KW in kwargs: - self.sync_acl_permissions = kwargs.pop(self.SYNC_ACL_PERMISSIONS_KW, False) + if self.PERMISSIONS_TRANSFER_KW in kwargs: + self.use_permissions_transfer = kwargs.pop(self.PERMISSIONS_TRANSFER_KW, False) super().__init__(**kwargs) @property @@ -110,7 +110,7 @@ def _filter_schema_invalid_properties( self.ab_file_name_col: {"type": "string"}, }, } - elif self.sync_acl_permissions: + elif self.use_permissions_transfer: return remote_file_permissions_schema else: return super()._filter_schema_invalid_properties(configured_catalog_json_schema) @@ -194,7 +194,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte yield stream_data_to_airbyte_message( self.name, record, is_file_transfer_message=True ) - elif self.sync_acl_permissions: + elif self.use_permissions_transfer: try: permissions_record = self.stream_reader.get_file_acl_permissions( file, logger=self.logger @@ -314,7 +314,7 @@ def get_json_schema(self) -> JsonSchema: def _get_raw_json_schema(self) -> JsonSchema: if self.use_file_transfer: return file_transfer_schema - elif self.sync_acl_permissions: + elif self.use_permissions_transfer: return remote_file_permissions_schema elif self.config.input_schema: return self.config.get_input_schema() # type: ignore diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index c1f2898f9..f75dbe481 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -508,20 +508,7 @@ "const": "use_records_transfer", "enum": ["use_records_transfer"], "type": "string", - }, - "sync_acl_permissions": { - "airbyte_hidden": True, - "default": False, - "description": "Joins Document allowlists to each stream.", - "title": "Include ACL Permissions", - "type": "boolean", - }, - "domain": { - "airbyte_hidden": True, - "description": "The domain of the identities.", - "title": "Domain", - "type": "string", - }, + } }, "description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.", "required": ["delivery_type"], @@ -547,6 +534,28 @@ "description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.", "required": ["delivery_type"], }, + { + "description": "Sends one identity stream and one for more permissions (ACL) streams to the destination. This data can be used in downstream systems to recreate permission restrictions mirroring the original source.", + "properties": { + "delivery_type": { + "const": "use_permissions_transfer", + "default": "use_permissions_transfer", + "enum": ["use_permissions_transfer"], + "title": "Delivery Type", + "type": "string", + }, + "include_identities_stream": { + "airbyte_hidden": True, + "default": True, + "description": "This data can be used in downstream systems to recreate permission restrictions mirroring the original source", + "title": "Include Identity Stream", + "type": "boolean", + }, + }, + "required": ["delivery_type"], + "title": "Replicate Permissions ACL", + "type": "object", + }, ], }, }, From 40c17871f8e5822a9ca232fd493484096f4f9a0f Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Mon, 3 Feb 2025 15:36:04 -0600 Subject: [PATCH 16/18] file-based: fix csv test --- unit_tests/sources/file_based/scenarios/csv_scenarios.py | 1 - 1 file changed, 1 deletion(-) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index f75dbe481..b1d74334a 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -545,7 +545,6 @@ "type": "string", }, "include_identities_stream": { - "airbyte_hidden": True, "default": True, "description": "This data can be used in downstream systems to recreate permission restrictions mirroring the original source", "title": "Include Identity Stream", From 36e0bca5aa3faf66833a0f159d270eadb4a25e42 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 3 Feb 2025 21:41:27 +0000 Subject: [PATCH 17/18] Auto-fix lint and format issues --- airbyte_cdk/sources/file_based/file_based_source.py | 4 ++-- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index e5cc7445f..24f9c7a27 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -34,10 +34,10 @@ ValidationPolicy, ) from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import ( - use_file_transfer, + include_identities_stream, preserve_directory_structure, + use_file_transfer, use_permissions_transfer, - include_identities_stream, ) from airbyte_cdk.sources.file_based.discovery_policy import ( AbstractDiscoveryPolicy, diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 12faac751..1c762205f 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -14,9 +14,9 @@ from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import ( - use_file_transfer, - preserve_directory_structure, include_identities_stream, + preserve_directory_structure, + use_file_transfer, ) from airbyte_cdk.sources.file_based.remote_file import RemoteFile From d30b1ad8932617769a230e5d0a8c972ae47c43de Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Mon, 3 Feb 2025 15:50:21 -0600 Subject: [PATCH 18/18] file-based: make new methods abstract --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 1c762205f..099b401f7 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -185,16 +185,18 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] + @abstractmethod def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: """ This is required for connectors that will support syncing ACL Permissions from files. """ - return {} + ... + @abstractmethod def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: """ This is required for connectors that will support syncing identities. """ - yield {} + ...